Skip to main content
Glama
tiger_worker.py•18.4 kB
""" Tiger Worker Process. Isolated worker process that loads Tiger SDK for one specific account. Handles API calls from the main process via queue communication. """ import json import multiprocessing as mp import sys import time import traceback import uuid from datetime import datetime from typing import Any, Dict, List # Set up logging for worker process from loguru import logger # Add the Tiger SDK path to sys.path TIGER_SDK_PATH = ( "/Volumes/extdisk/MyRepos/cctrading-ws/tiger-mcp/references/openapi-python-sdk" ) if TIGER_SDK_PATH not in sys.path: sys.path.insert(0, TIGER_SDK_PATH) try: # Import Tiger SDK components from tigeropen.push.push_client import PushClient from tigeropen.quote.quote_client import QuoteClient from tigeropen.tiger_open_client import TigerOpenClient from tigeropen.tiger_open_config import TigerOpenClientConfig from tigeropen.trade.trade_client import TradeClient except ImportError as e: logger.error(f"Failed to import Tiger SDK: {e}") raise class TigerWorker: """ Tiger worker process that handles one specific account. Loads Tiger SDK credentials at startup and processes API calls from the main process via queue communication. """ def __init__(self, process_id: str, account_id: str): """ Initialize Tiger worker. Args: process_id: Unique process identifier account_id: Tiger account ID """ self.process_id = process_id self.account_id = account_id self.account = None self.credentials = None # Tiger SDK clients self.config = None self.client = None self.trade_client = None self.quote_client = None self.push_client = None # State management self.is_initialized = False self.last_heartbeat = datetime.utcnow() self.task_count = 0 logger.info(f"TigerWorker {process_id} initialized for account {account_id}") async def initialize(self) -> bool: """ Initialize Tiger SDK with account credentials. Returns: True if initialization successful """ try: logger.info(f"Initializing Tiger SDK for account {self.account_id}") # Load account and credentials await self._load_account_credentials() # Configure Tiger SDK await self._configure_tiger_sdk() # Initialize clients await self._initialize_clients() self.is_initialized = True logger.info( f"Tiger SDK initialized successfully for account {self.account.account_number}" ) return True except Exception as e: logger.error(f"Failed to initialize Tiger SDK: {e}") logger.error(traceback.format_exc()) return False async def process_task(self, task_data: Dict[str, Any]) -> Dict[str, Any]: """ Process a task request. Args: task_data: Task request data Returns: Task response data """ task_id = task_data.get("task_id") method = task_data.get("method") args = task_data.get("args", []) kwargs = task_data.get("kwargs", {}) start_time = time.time() try: logger.debug(f"Processing task {task_id}: {method}") # Check initialization if not self.is_initialized: raise RuntimeError("Worker not initialized") # Route method call if method.startswith("trade."): result = await self._execute_trade_method(method[6:], args, kwargs) elif method.startswith("quote."): result = await self._execute_quote_method(method[6:], args, kwargs) elif method.startswith("push."): result = await self._execute_push_method(method[5:], args, kwargs) elif method == "health_check": result = await self._health_check() else: raise ValueError(f"Unknown method: {method}") execution_time = time.time() - start_time self.task_count += 1 logger.debug(f"Task {task_id} completed in {execution_time:.2f}s") return { "task_id": task_id, "success": True, "result": result, "execution_time": execution_time, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: execution_time = time.time() - start_time error_msg = str(e) logger.error(f"Task {task_id} failed: {error_msg}") logger.error(traceback.format_exc()) return { "task_id": task_id, "success": False, "error": error_msg, "execution_time": execution_time, "timestamp": datetime.utcnow().isoformat(), } async def heartbeat(self) -> Dict[str, Any]: """ Respond to heartbeat check. Returns: Heartbeat response """ self.last_heartbeat = datetime.utcnow() return { "type": "heartbeat_response", "process_id": self.process_id, "account_id": self.account_id, "timestamp": self.last_heartbeat.isoformat(), "task_count": self.task_count, "is_initialized": self.is_initialized, } async def shutdown(self) -> None: """Graceful shutdown of the worker.""" try: logger.info(f"Shutting down TigerWorker {self.process_id}") # Close Tiger SDK clients if self.push_client: try: self.push_client.disconnect() except: pass # Clean up resources self.is_initialized = False logger.info(f"TigerWorker {self.process_id} shutdown complete") except Exception as e: logger.error(f"Error during shutdown: {e}") # Private methods async def _load_account_credentials(self) -> None: """Load account and decrypt credentials.""" # Import account manager in worker process sys.path.insert( 0, "/Volumes/extdisk/MyRepos/cctrading-ws/tiger-mcp/packages/shared/src" ) sys.path.insert( 0, "/Volumes/extdisk/MyRepos/cctrading-ws/tiger-mcp/packages/database/src" ) from shared.account_manager import get_account_manager account_manager = get_account_manager() # Get account by ID self.account = await account_manager.get_account_by_id( uuid.UUID(self.account_id) ) if not self.account: raise RuntimeError(f"Account {self.account_id} not found") # Decrypt credentials self.credentials = await account_manager.decrypt_credentials(self.account) if not self.credentials.get("tiger_id") or not self.credentials.get( "private_key" ): raise RuntimeError("Missing or invalid credentials") logger.info(f"Loaded credentials for account {self.account.account_number}") async def _configure_tiger_sdk(self) -> None: """Configure Tiger SDK with account credentials.""" try: # Determine server URL if self.account.environment == "production": server_url = ( self.account.server_url or "https://openapi.tigerfintech.com" ) else: server_url = ( self.account.server_url or "https://openapi-sandbox.tigerfintech.com" ) # Create Tiger configuration self.config = TigerOpenClientConfig( tiger_id=self.credentials["tiger_id"], private_key=self.credentials["private_key"], account=self.account.account_number, server_url=server_url, env=self.account.environment, ) logger.info( f"Tiger SDK configured for {self.account.environment} environment" ) except Exception as e: logger.error(f"Failed to configure Tiger SDK: {e}") raise async def _initialize_clients(self) -> None: """Initialize Tiger SDK clients.""" try: # Create main client self.client = TigerOpenClient(self.config) # Create specialized clients self.trade_client = TradeClient(self.config) self.quote_client = QuoteClient(self.config) self.push_client = PushClient(self.config) logger.info("Tiger SDK clients initialized successfully") except Exception as e: logger.error(f"Failed to initialize Tiger clients: {e}") raise async def _execute_trade_method( self, method: str, args: List[Any], kwargs: Dict[str, Any] ) -> Any: """Execute trade client method.""" if not self.trade_client: raise RuntimeError("Trade client not initialized") if not hasattr(self.trade_client, method): raise AttributeError(f"Trade client has no method '{method}'") # Get method and execute trade_method = getattr(self.trade_client, method) result = trade_method(*args, **kwargs) # Convert response to serializable format return self._serialize_response(result) async def _execute_quote_method( self, method: str, args: List[Any], kwargs: Dict[str, Any] ) -> Any: """Execute quote client method.""" if not self.quote_client: raise RuntimeError("Quote client not initialized") if not hasattr(self.quote_client, method): raise AttributeError(f"Quote client has no method '{method}'") # Get method and execute quote_method = getattr(self.quote_client, method) result = quote_method(*args, **kwargs) # Convert response to serializable format return self._serialize_response(result) async def _execute_push_method( self, method: str, args: List[Any], kwargs: Dict[str, Any] ) -> Any: """Execute push client method.""" if not self.push_client: raise RuntimeError("Push client not initialized") if not hasattr(self.push_client, method): raise AttributeError(f"Push client has no method '{method}'") # Get method and execute push_method = getattr(self.push_client, method) result = push_method(*args, **kwargs) # Convert response to serializable format return self._serialize_response(result) async def _health_check(self) -> Dict[str, Any]: """Perform health check.""" try: # Test trade client connection trade_health = False if self.trade_client: try: # Try to get account info account_info = self.trade_client.get_account() trade_health = bool(account_info) except: pass # Test quote client connection quote_health = False if self.quote_client: try: # Try to get market status market_status = self.quote_client.get_market_status() quote_health = bool(market_status) except: pass return { "process_id": self.process_id, "account_id": self.account_id, "account_number": self.account.account_number, "environment": self.account.environment, "is_initialized": self.is_initialized, "trade_client_healthy": trade_health, "quote_client_healthy": quote_health, "task_count": self.task_count, "last_heartbeat": self.last_heartbeat.isoformat(), "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Health check failed: {e}") return { "process_id": self.process_id, "account_id": self.account_id, "is_initialized": self.is_initialized, "health_error": str(e), "timestamp": datetime.utcnow().isoformat(), } def _serialize_response(self, response) -> Any: """ Serialize Tiger API response to JSON-compatible format. Args: response: Tiger API response object Returns: Serializable representation """ try: # Handle different response types if hasattr(response, "to_dict"): return response.to_dict() elif hasattr(response, "__dict__"): # Convert object attributes to dict result = {} for key, value in response.__dict__.items(): if not key.startswith("_"): try: # Try to serialize the value json.dumps(value) result[key] = value except (TypeError, ValueError): # Convert non-serializable values to string result[key] = str(value) return result elif isinstance(response, (list, tuple)): return [self._serialize_response(item) for item in response] elif isinstance(response, dict): return {k: self._serialize_response(v) for k, v in response.items()} else: # Try direct serialization try: json.dumps(response) return response except (TypeError, ValueError): return str(response) except Exception as e: logger.warning(f"Failed to serialize response: {e}") return {"serialization_error": str(e), "response_type": str(type(response))} def tiger_worker_main( process_id: str, account_id: str, task_queue: mp.Queue, result_queue: mp.Queue ) -> None: """ Main entry point for Tiger worker process. Args: process_id: Unique process identifier account_id: Tiger account ID task_queue: Queue for receiving tasks result_queue: Queue for sending results """ # Set up process-specific logging logger.remove() # Remove default handler logger.add( sys.stderr, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | " f"<cyan>worker-{process_id[:8]}</cyan> | <level>{{message}}</level>", level="DEBUG", ) logger.info(f"Starting Tiger worker process {process_id} for account {account_id}") worker = None try: # Create worker instance worker = TigerWorker(process_id, account_id) # Initialize worker (this is synchronous in the worker process) import asyncio async def async_main(): # Initialize worker if not await worker.initialize(): logger.error("Failed to initialize worker") return # Send ready signal ready_msg = { "type": "ready", "process_id": process_id, "account_id": account_id, "timestamp": datetime.utcnow().isoformat(), } result_queue.put(ready_msg) logger.info(f"Worker {process_id} ready and waiting for tasks") # Main processing loop while True: try: # Get task from queue (with timeout) try: task_data = task_queue.get(timeout=1.0) except: # No task, continue loop continue # Handle special messages if isinstance(task_data, dict): msg_type = task_data.get("type") if msg_type == "shutdown": logger.info("Received shutdown signal") await worker.shutdown() break elif msg_type == "heartbeat": heartbeat_response = await worker.heartbeat() result_queue.put(heartbeat_response) continue # Process regular task if isinstance(task_data, dict) and "task_id" in task_data: response = await worker.process_task(task_data) result_queue.put(response) else: logger.warning(f"Invalid task data: {task_data}") except KeyboardInterrupt: logger.info("Received interrupt signal") break except Exception as e: logger.error(f"Error in main loop: {e}") logger.error(traceback.format_exc()) # Run async main asyncio.run(async_main()) except Exception as e: logger.error(f"Fatal error in worker process: {e}") logger.error(traceback.format_exc()) finally: if worker: try: asyncio.run(worker.shutdown()) except: pass logger.info(f"Tiger worker process {process_id} exiting") if __name__ == "__main__": # This allows the worker to be run standalone for testing if len(sys.argv) >= 3: process_id = sys.argv[1] account_id = sys.argv[2] # Create dummy queues for testing task_queue = mp.Queue() result_queue = mp.Queue() tiger_worker_main(process_id, account_id, task_queue, result_queue) else: print("Usage: python tiger_worker.py <process_id> <account_id>") sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server