|
| 1 | +import asyncio |
| 2 | +import logging |
| 3 | +import os |
| 4 | +import re |
| 5 | +from concurrent.futures import ThreadPoolExecutor |
| 6 | +from typing import Optional, Union |
| 7 | +from polygon import RESTClient, WebSocketClient |
| 8 | +from polygon.websocket.models import Market, Feed |
| 9 | + |
| 10 | + |
| 11 | +class ApiCallHandler: |
| 12 | + def __init__(self): |
| 13 | + self.api_call_queue = asyncio.Queue() |
| 14 | + self.executor = ThreadPoolExecutor() # Thread pool for running synchronous code |
| 15 | + |
| 16 | + async def enqueue_api_call(self, options_ticker): |
| 17 | + await self.api_call_queue.put(options_ticker) |
| 18 | + |
| 19 | + async def start_processing_api_calls(self): |
| 20 | + while True: |
| 21 | + options_ticker = await self.api_call_queue.get() |
| 22 | + try: |
| 23 | + # TODO: |
| 24 | + # Here, you can process the rest api requets as needed |
| 25 | + # Example: Get the options contract for this |
| 26 | + contract = await asyncio.get_running_loop().run_in_executor( |
| 27 | + self.executor, self.get_options_contract, options_ticker |
| 28 | + ) |
| 29 | + print(contract) # Or process the contract data as needed |
| 30 | + except Exception as e: |
| 31 | + logging.error(f"Error processing API call for {options_ticker}: {e}") |
| 32 | + finally: |
| 33 | + self.api_call_queue.task_done() |
| 34 | + |
| 35 | + def get_options_contract(self, options_ticker): |
| 36 | + client = RESTClient() # Assumes POLYGON_API_KEY is set in the environment |
| 37 | + return client.get_options_contract(options_ticker) |
| 38 | + |
| 39 | + |
| 40 | +class MessageHandler: |
| 41 | + def __init__(self, api_call_handler): |
| 42 | + self.handler_queue = asyncio.Queue() |
| 43 | + self.api_call_handler = api_call_handler |
| 44 | + |
| 45 | + async def add(self, message_response: Optional[Union[str, bytes]]) -> None: |
| 46 | + await self.handler_queue.put(message_response) |
| 47 | + |
| 48 | + async def start_handling(self) -> None: |
| 49 | + while True: |
| 50 | + message_response = await self.handler_queue.get() |
| 51 | + logging.info(f"Received message: {message_response}") |
| 52 | + try: |
| 53 | + # TODO: |
| 54 | + # Here, you can process the websocket messages as needed |
| 55 | + # Example: Extract ticker symbol and enqueue REST API call |
| 56 | + # to get the options contract for this trade (non-blocking) |
| 57 | + for trade in message_response: |
| 58 | + ticker = self.extract_symbol(trade.symbol) |
| 59 | + if ticker == "NVDA": |
| 60 | + asyncio.create_task( |
| 61 | + self.api_call_handler.enqueue_api_call(trade.symbol) |
| 62 | + ) |
| 63 | + except Exception as e: |
| 64 | + logging.error(f"Error handling message: {e}") |
| 65 | + finally: |
| 66 | + self.handler_queue.task_done() |
| 67 | + |
| 68 | + def extract_symbol(self, input_string): |
| 69 | + match = re.search(r"O:([A-Z]+)", input_string) |
| 70 | + if match: |
| 71 | + return match.group(1) |
| 72 | + else: |
| 73 | + return None |
| 74 | + |
| 75 | + |
| 76 | +class MyClient: |
| 77 | + def __init__(self, feed, market, subscriptions): |
| 78 | + api_key = os.getenv("POLYGON_API_KEY") |
| 79 | + self.polygon_websocket_client = WebSocketClient( |
| 80 | + api_key=api_key, |
| 81 | + feed=feed, |
| 82 | + market=market, |
| 83 | + verbose=True, |
| 84 | + subscriptions=subscriptions, |
| 85 | + ) |
| 86 | + self.api_call_handler = ApiCallHandler() |
| 87 | + self.message_handler = MessageHandler(self.api_call_handler) |
| 88 | + |
| 89 | + async def start_event_stream(self): |
| 90 | + try: |
| 91 | + await asyncio.gather( |
| 92 | + self.polygon_websocket_client.connect(self.message_handler.add), |
| 93 | + self.message_handler.start_handling(), |
| 94 | + self.api_call_handler.start_processing_api_calls(), |
| 95 | + ) |
| 96 | + except Exception as e: |
| 97 | + logging.error(f"Error in event stream: {e}") |
| 98 | + |
| 99 | + |
| 100 | +async def main(): |
| 101 | + logging.basicConfig(level=logging.INFO) |
| 102 | + my_client = MyClient( |
| 103 | + feed=Feed.RealTime, market=Market.Options, subscriptions=["T.*"] |
| 104 | + ) |
| 105 | + await my_client.start_event_stream() |
| 106 | + |
| 107 | + |
| 108 | +# Entry point for the asyncio program |
| 109 | +asyncio.run(main()) |
0 commit comments