Skip to main content
Glama
example_usage.py•17.8 kB
""" Example usage of Tiger Process Pool Manager. Demonstrates how to integrate the process pool with MCP server endpoints and handle Tiger API calls with account isolation. """ import asyncio import sys import uuid from datetime import datetime from typing import Any, Dict, Optional # Add paths for imports sys.path.insert( 0, "/Volumes/extdisk/MyRepos/cctrading-ws/tiger-mcp/packages/shared/src" ) sys.path.insert( 0, "/Volumes/extdisk/MyRepos/cctrading-ws/tiger-mcp/packages/database/src" ) from loguru import logger from .process_manager import get_process_manager from shared.account_manager import AccountStatus, get_account_manager from shared.account_router import get_account_router class TigerAPIService: """ High-level Tiger API service using process pool manager. Provides a clean interface for MCP server endpoints to execute Tiger API calls with automatic account routing and process management. """ def __init__(self): """Initialize the Tiger API service.""" self.process_manager = get_process_manager() self.account_manager = get_account_manager() self.account_router = get_account_router() self._started = False async def start(self) -> None: """Start the Tiger API service.""" if not self._started: await self.process_manager.start() self._started = True logger.info("TigerAPIService started") async def stop(self) -> None: """Stop the Tiger API service.""" if self._started: await self.process_manager.stop() self._started = False logger.info("TigerAPIService stopped") async def get_account_info( self, account_id: Optional[str] = None, use_default: bool = True ) -> Dict[str, Any]: """ Get account information. Args: account_id: Specific account ID, or None to use default/routing use_default: Use default trading account if no account_id specified Returns: Account information """ try: # Route to appropriate account target_account_id = await self._route_account( account_id, use_default, "trading" ) # Execute API call result = await self.process_manager.execute_api_call( account_id=target_account_id, method="trade.get_account", timeout=10.0 ) return { "success": True, "account_id": target_account_id, "data": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Failed to get account info: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat(), } async def get_positions( self, account_id: Optional[str] = None, use_default: bool = True ) -> Dict[str, Any]: """ Get account positions. Args: account_id: Specific account ID, or None to use default/routing use_default: Use default trading account if no account_id specified Returns: Position information """ try: # Route to appropriate account target_account_id = await self._route_account( account_id, use_default, "trading" ) # Execute API call result = await self.process_manager.execute_api_call( account_id=target_account_id, method="trade.get_positions", timeout=10.0 ) return { "success": True, "account_id": target_account_id, "data": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Failed to get positions: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat(), } async def get_quote( self, symbol: str, account_id: Optional[str] = None, use_default: bool = True ) -> Dict[str, Any]: """ Get stock quote. Args: symbol: Stock symbol account_id: Specific account ID, or None to use default/routing use_default: Use default data account if no account_id specified Returns: Quote information """ try: # Route to appropriate account (prefer data account for quotes) target_account_id = await self._route_account( account_id, use_default, "data" ) # Execute API call result = await self.process_manager.execute_api_call( account_id=target_account_id, method="quote.get_quote_brief", args=[symbol], timeout=10.0, ) return { "success": True, "account_id": target_account_id, "symbol": symbol, "data": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Failed to get quote for {symbol}: {e}") return { "success": False, "error": str(e), "symbol": symbol, "timestamp": datetime.utcnow().isoformat(), } async def place_order( self, symbol: str, action: str, quantity: int, order_type: str = "MKT", price: Optional[float] = None, account_id: Optional[str] = None, use_default: bool = True, ) -> Dict[str, Any]: """ Place a trading order. Args: symbol: Stock symbol action: Order action (BUY/SELL) quantity: Order quantity order_type: Order type (MKT, LMT, etc.) price: Limit price (required for limit orders) account_id: Specific account ID, or None to use default/routing use_default: Use default trading account if no account_id specified Returns: Order placement result """ try: # Route to appropriate account (always use trading account for orders) target_account_id = await self._route_account( account_id, use_default, "trading" ) # Prepare order parameters order_params = { "symbol": symbol, "action": action, "quantity": quantity, "order_type": order_type, } if price is not None: order_params["price"] = price # Execute API call result = await self.process_manager.execute_api_call( account_id=target_account_id, method="trade.place_order", kwargs=order_params, timeout=15.0, ) return { "success": True, "account_id": target_account_id, "order_params": order_params, "data": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Failed to place order: {e}") return { "success": False, "error": str(e), "order_params": { "symbol": symbol, "action": action, "quantity": quantity, "order_type": order_type, "price": price, }, "timestamp": datetime.utcnow().isoformat(), } async def get_orders( self, account_id: Optional[str] = None, use_default: bool = True, status: Optional[str] = None, ) -> Dict[str, Any]: """ Get order history. Args: account_id: Specific account ID, or None to use default/routing use_default: Use default trading account if no account_id specified status: Filter by order status Returns: Order history """ try: # Route to appropriate account target_account_id = await self._route_account( account_id, use_default, "trading" ) # Prepare parameters params = {} if status: params["status"] = status # Execute API call result = await self.process_manager.execute_api_call( account_id=target_account_id, method="trade.get_orders", kwargs=params, timeout=10.0, ) return { "success": True, "account_id": target_account_id, "data": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Failed to get orders: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat(), } async def get_market_status( self, market: str = "US", account_id: Optional[str] = None, use_default: bool = True, ) -> Dict[str, Any]: """ Get market status. Args: market: Market code (US, HK, etc.) account_id: Specific account ID, or None to use default/routing use_default: Use default data account if no account_id specified Returns: Market status information """ try: # Route to appropriate account (prefer data account for market info) target_account_id = await self._route_account( account_id, use_default, "data" ) # Execute API call result = await self.process_manager.execute_api_call( account_id=target_account_id, method="quote.get_market_status", args=[market], timeout=10.0, ) return { "success": True, "account_id": target_account_id, "market": market, "data": result, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Failed to get market status: {e}") return { "success": False, "error": str(e), "market": market, "timestamp": datetime.utcnow().isoformat(), } async def health_check_all(self) -> Dict[str, Any]: """ Perform health check on all accounts and processes. Returns: Health check results """ try: # Get system metrics system_metrics = await self.process_manager.get_system_metrics() # Run health checks on all accounts health_results = await self.process_manager.health_check_all_accounts() # Get process metrics process_metrics = await self.process_manager.get_process_metrics() # Summarize health status healthy_accounts = sum(1 for r in health_results if r["healthy"]) total_accounts = len(health_results) return { "success": True, "system_health": { "total_processes": system_metrics.get("total_processes", 0), "healthy_accounts": healthy_accounts, "total_accounts": total_accounts, "system_success_rate": system_metrics.get("success_rate", 0), "average_response_time": system_metrics.get( "average_response_time", 0 ), }, "account_health": health_results, "process_metrics": { pid: { "total_tasks": metrics.total_tasks, "success_rate": metrics.success_rate, "error_rate": metrics.error_rate, "average_response_time": metrics.average_response_time, "uptime_seconds": metrics.uptime_seconds, } for pid, metrics in process_metrics.items() }, "timestamp": datetime.utcnow().isoformat(), } except Exception as e: logger.error(f"Health check failed: {e}") return { "success": False, "error": str(e), "timestamp": datetime.utcnow().isoformat(), } # Private methods async def _route_account( self, account_id: Optional[str], use_default: bool, operation_type: str ) -> str: """ Route request to appropriate account. Args: account_id: Specific account ID use_default: Use default account if no account_id operation_type: Type of operation (trading, data) Returns: Target account ID Raises: ValueError: If no suitable account found """ if account_id: # Validate specific account account = await self.account_manager.get_account_by_id( uuid.UUID(account_id) ) if not account: raise ValueError(f"Account {account_id} not found") if account.status != AccountStatus.ACTIVE: raise ValueError(f"Account {account.account_number} is not active") return account_id elif use_default: # Use default account if operation_type == "trading": account = await self.account_manager.get_default_trading_account() if not account: raise ValueError("No default trading account configured") else: # data operations account = await self.account_manager.get_default_data_account() if not account: # Fall back to trading account for data operations account = await self.account_manager.get_default_trading_account() if not account: raise ValueError( "No default data or trading account configured" ) return str(account.id) else: # Use routing logic target_account = await self.account_router.route_request(operation_type) if not target_account: raise ValueError( f"No suitable account found for {operation_type} operations" ) return str(target_account.id) async def demo_usage(): """Demonstrate usage of the Tiger API service.""" logger.info("=== Tiger API Service Demo ===") # Initialize service api_service = TigerAPIService() try: # Start the service await api_service.start() logger.info("Tiger API service started") # Example 1: Health check logger.info("\n1. Performing system health check...") health = await api_service.health_check_all() logger.info(f"System health: {health['success']}") if health["success"]: logger.info( f"Healthy accounts: {health['system_health']['healthy_accounts']}" ) logger.info( f"Total processes: {health['system_health']['total_processes']}" ) # Example 2: Get market status logger.info("\n2. Getting market status...") market_status = await api_service.get_market_status("US") logger.info(f"Market status request: {market_status['success']}") # Example 3: Get quote logger.info("\n3. Getting stock quote...") quote = await api_service.get_quote("AAPL") logger.info(f"Quote request: {quote['success']}") # Example 4: Get account info logger.info("\n4. Getting account information...") account_info = await api_service.get_account_info() logger.info(f"Account info request: {account_info['success']}") # Example 5: Get positions logger.info("\n5. Getting positions...") positions = await api_service.get_positions() logger.info(f"Positions request: {positions['success']}") # Note: Order placement would require valid credentials and permissions logger.info("\n6. Order operations (would require valid credentials)...") logger.info(" - place_order() for actual trading") logger.info(" - get_orders() for order history") logger.info("\n=== Demo completed successfully ===") except Exception as e: logger.error(f"Demo failed: {e}") import traceback logger.error(traceback.format_exc()) finally: # Clean up await api_service.stop() logger.info("Tiger API service stopped") if __name__ == "__main__": # Configure logging logger.remove() logger.add( sys.stderr, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | " "<cyan>demo</cyan> | <level>{message}</level>", level="INFO", ) # Run demo asyncio.run(demo_usage())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luxiaolei/tiger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server