Skip to main content
Glama

Portfolio Manager

by ikhyunAn
main.py8.3 kB
import sys import signal import logging import time import socket from portfolio_server.server import create_mcp_server # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("portfolio_mcp") # Valid transport types VALID_TRANSPORTS = ["stdio", "sse", "http"] # Transport-specific configuration TRANSPORT_CONFIG = { "stdio": { "startup_delay": 0.1, # Short delay for stdio "max_retries": 1, # No retries needed for stdio "retry_delay": 0.5, # Delay between retries }, "sse": { "startup_delay": 3.0, # Increased delay for SSE to establish connection "max_retries": 5, # Increased retries for network transports "retry_delay": 3.0, # Increased delay between retries "port": 8080, # Default SSE port "connection_timeout": 10.0 # Added timeout for connection attempts }, "http": { "startup_delay": 0.5, "max_retries": 3, "retry_delay": 1.0, "port": 8000, # Default HTTP port } } # Global flag for shutdown is_shutting_down = False # Signal handlers for graceful shutdown def handle_shutdown_signal(signum, frame): global is_shutting_down if is_shutting_down: logger.warning("Forced shutdown triggered") sys.exit(1) logger.info(f"Shutdown signal received ({signum}), stopping server gracefully...") is_shutting_down = True # Note: The actual shutdown happens in the main loop # Register signal handlers signal.signal(signal.SIGINT, handle_shutdown_signal) signal.signal(signal.SIGTERM, handle_shutdown_signal) # Create the MCP server at module level try: mcp = create_mcp_server() logger.info("MCP server created successfully") except Exception as e: logger.error(f"Failed to create MCP server: {e}") sys.exit(1) def validate_transport(transport_type): """Validate that the transport type is supported.""" if transport_type not in VALID_TRANSPORTS: logger.error(f"Invalid transport type: {transport_type}. Valid options are: {', '.join(VALID_TRANSPORTS)}") return False return True def check_connection_status(transport_type, config=None): """Check the connection status for the specified transport.""" if config is None: config = {} if transport_type == "stdio": # stdio should always be available return True elif transport_type == "sse" or transport_type == "http": # Check if the port is available for sse/http port = config.get("port", TRANSPORT_CONFIG[transport_type]["port"]) try: # Try to bind to the port to see if it's available sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('127.0.0.1', port)) sock.close() # If the connection was refused, the port is available (good) # If we could connect, someone else is using it (bad for our server) return result != 0 except Exception as e: logger.warning(f"Error checking connection status for {transport_type}: {e}") return False else: logger.warning(f"Unknown transport type for status check: {transport_type}") return False def run_server_with_retry(transport_type, config=None): """Run the server with retry logic if the connection fails.""" if config is None: config = {} # Get transport configuration transport_config = TRANSPORT_CONFIG.get(transport_type, {}) max_retries = config.get("max_retries", transport_config.get("max_retries", 1)) retry_delay = config.get("retry_delay", transport_config.get("retry_delay", 1.0)) startup_delay = config.get("startup_delay", transport_config.get("startup_delay", 0.5)) # Wait before starting to allow connections to establish logger.info(f"Waiting {startup_delay}s before starting server...") time.sleep(startup_delay) # Check connection status if not check_connection_status(transport_type, config): logger.warning(f"Transport {transport_type} may not be available. Proceeding with caution.") # Initialize retry counter retries = 0 last_error = None while retries <= max_retries: try: # Create transport options transport_options = {} # Add transport-specific options if transport_type == "sse" or transport_type == "http": port = config.get("port", transport_config.get("port")) transport_options["port"] = port # Run the server with the configured transport and options logger.info(f"Starting server with {transport_type} transport (attempt {retries + 1}/{max_retries + 1})...") mcp.run(transport=transport_type, **transport_options) # If we get here, the server started successfully return True except ConnectionError as e: # Specific handling for connection errors last_error = e logger.error(f"Connection error starting server: {e}") except OSError as e: # Handle OS errors like port already in use last_error = e logger.error(f"OS error starting server: {e}") except Exception as e: # Catch any other exceptions last_error = e logger.error(f"Error starting server: {e}") # Increment retry counter retries += 1 if retries <= max_retries: logger.info(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) # If we get here, all retries failed logger.error(f"Failed to start server after {max_retries + 1} attempts. Last error: {last_error}") return False if __name__ == "__main__": logger.info("Portfolio Manager MCP Server starting") # Default transport = "stdio" config = {} # Check command line arguments for transport type if len(sys.argv) > 1: for arg in sys.argv[1:]: if arg == "--sse": transport = "sse" elif arg.startswith("--transport="): transport = arg.split("=")[1] elif arg.startswith("--port="): try: config["port"] = int(arg.split("=")[1]) except ValueError: logger.error(f"Invalid port number: {arg.split('=')[1]}") sys.exit(1) elif arg.startswith("--retry="): try: config["max_retries"] = int(arg.split("=")[1]) except ValueError: logger.error(f"Invalid retry count: {arg.split('=')[1]}") sys.exit(1) # Validate transport type if not validate_transport(transport): sys.exit(1) logger.info(f"Starting MCP server with transport: {transport}") if config: logger.info(f"Transport configuration: {config}") try: # For SSE transport, use the dedicated SSE module if transport == "sse": from portfolio_server.sse import run_sse_server port = config.get("port", TRANSPORT_CONFIG["sse"]["port"]) host = "0.0.0.0" # Default host logger.info(f"Starting SSE server on {host}:{port}") run_sse_server(port=port, host=host) else: # For other transports, use the standard retry logic success = run_server_with_retry(transport, config) if not success: logger.error("Failed to start the server after all attempts.") sys.exit(1) except KeyboardInterrupt: # Handle keyboard interrupt (Ctrl+C) logger.info("Server interrupted via keyboard") except Exception as e: # Handle any other exceptions logger.error(f"Error running MCP server: {e}") sys.exit(1) finally: # Perform cleanup logger.info("Shutting down MCP server...") # Add any necessary cleanup code here logger.info("MCP server shutdown complete")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ikhyunAn/MCP_InvestmentPortfolio'

If you have feedback or need assistance with the MCP directory API, please join our Discord server