Skip to main content
Glama

MCP Platform

by jck411
main.py•8.84 kB
""" Main application entry point - WebSocket interface with graceful shutdown handling. """ from __future__ import annotations import asyncio import contextlib import logging import os import signal import sys from typing import Any import src.chat.chat_orchestrator from src.chat import ChatOrchestrator from src.clients import LLMClient, MCPClient from src.clients.model_capabilities import ModelCapabilities from src.config import Configuration from src.history import create_repository from src.websocket_server import run_websocket_server def _configure_advanced_logging(logging_config: dict[str, Any]) -> None: """ Advanced logging configuration with hierarchical loggers and feature control. This approach is more efficient than setting individual loggers because: 1. Uses parent logger inheritance for automatic propagation 2. Supports different log levels per module 3. Enables/disables features at the configuration level 4. Pre-configures known logger hierarchies """ # Level mapping for efficient lookup level_map = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL, } # Set global level global_level = logging_config.get("level", "WARNING") logging.getLogger().setLevel(level_map.get(global_level, logging.WARNING)) # Module-to-logger mapping for efficient configuration module_logger_map = { "chat": { "loggers": ["src.chat"], "default_level": "INFO", "features": [ "llm_replies", "system_prompt", "tool_execution", "tool_results", ], }, "connection_pool": { "loggers": ["src.clients"], "default_level": "INFO", "features": ["connection_events", "pool_stats", "http_requests"], }, "mcp": { "loggers": ["mcp", "src.clients.mcp_client"], "default_level": "INFO", "features": [ "connection_attempts", "health_checks", "tool_calls", "tool_arguments", "tool_results", ], }, } modules_config = logging_config.get("modules", {}) # Configure each module's loggers for module_name, module_config in modules_config.items(): if not isinstance(module_config, dict): continue # Get the module's specific level, or use global default module_level = module_config.get( "level", module_logger_map.get(module_name, {}).get("default_level", global_level), ) level_value = level_map.get(module_level, logging.WARNING) # Set level on parent loggers (efficient - children inherit) for logger_name in module_logger_map.get(module_name, {}).get("loggers", []): logging.getLogger(logger_name).setLevel(level_value) # Store feature flags for runtime checking (more efficient than repeated config lookups) if not hasattr(logging, "_module_features"): logging._module_features = {} logging._module_features[module_name] = module_config.get("enable_features", {}) # Advanced configuration (for future enhancement) advanced_config = logging_config.get("advanced", {}) if advanced_config.get("structured_logging"): # Could implement JSON structured logging here pass if advanced_config.get("async_logging"): # Could implement async log writing here pass def _on_logging_config_change(new_config: dict[str, Any]) -> None: """ Handle real-time logging configuration changes. This function is called whenever the configuration file is modified, allowing for dynamic logging reconfiguration without restart. """ try: logging_config = new_config.get("logging", {}) if logging_config: # Reconfigure logging with new settings _configure_advanced_logging(logging_config) logging.info("šŸ”„ Logging configuration updated in real-time") except Exception as e: # Log errors but don't crash the application logging.error(f"āŒ Failed to update logging configuration: {e}") # Configure logging for the application logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") async def main() -> None: """Main entry point - WebSocket interface with graceful shutdown handling.""" config = Configuration() # Apply consolidated logging configuration from YAML logging_config = config.get_logging_config() if "format" in logging_config: for handler in logging.getLogger().handlers: if isinstance(handler, logging.StreamHandler): handler.setFormatter(logging.Formatter(logging_config["format"])) # Initialize advanced logging configuration _configure_advanced_logging(logging_config) # Subscribe to configuration changes for real-time logging updates config.subscribe_to_changes(_on_logging_config_change) config_path = os.path.join(os.path.dirname(__file__), "servers_config.json") servers_config = config.load_config(config_path) # Get MCP connection configuration mcp_connection_config = config.get_mcp_connection_config() # Get MCP logging configuration mcp_logging_config = config.get_mcp_logging_config() # Combine connection and logging configuration for MCP clients mcp_client_config = {**mcp_connection_config, **mcp_logging_config} clients: list[MCPClient] = [] for name, server_config in servers_config["mcpServers"].items(): # Only create clients for enabled servers if server_config.get("enabled", False): clients.append(MCPClient(name, server_config, mcp_client_config)) else: logging.info(f"Skipping disabled server: {name}") # Create repository for chat history using configured storage mode repo = create_repository(config.get_config_dict()) # Now that MCPClient and LLMClient are defined, make them available # in the chat orchestrator module's namespace and rebuild config src.chat.chat_orchestrator.MCPClient = MCPClient # type: ignore[attr-defined] src.chat.chat_orchestrator.LLMClient = LLMClient # type: ignore[attr-defined] ChatOrchestrator.ChatOrchestratorConfig.model_rebuild() # Setup graceful shutdown handler shutdown_event = asyncio.Event() def signal_handler() -> None: """Handle shutdown signals gracefully.""" logging.info("Received shutdown signal, initiating graceful shutdown...") shutdown_event.set() # Register signal handlers for graceful shutdown if sys.platform != "win32": loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) # Create model capabilities manager for robust model parameter filtering capabilities = ModelCapabilities() async with LLMClient(config, capabilities) as llm_client: try: # Start configuration file watching for event-driven updates await config.start_watching() # Run server with shutdown handling server_task = asyncio.create_task( run_websocket_server(clients, llm_client, config.get_config_dict(), repo, config) ) # Wait for either server completion or shutdown signal done, pending = await asyncio.wait( [server_task, asyncio.create_task(shutdown_event.wait())], return_when=asyncio.FIRST_COMPLETED, ) # Cancel pending tasks if shutdown was requested for task in pending: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task # Check if server task completed with an exception for task in done: if task == server_task: exception = task.exception() if exception is not None: raise exception except KeyboardInterrupt: logging.info("Keyboard interrupt received, shutting down...") except Exception as e: logging.error(f"Application error: {e}") raise finally: # Stop configuration watching await config.stop_watching() logging.info("Application shutdown complete") if __name__ == "__main__": asyncio.run(main()) def cli_main() -> None: """Synchronous CLI entrypoint that runs the async main.""" asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jck411/MCP_BACKEND_OPENROUTER'

If you have feedback or need assistance with the MCP directory API, please join our Discord server