Skip to main content
Glama

Gemini MCP Server

by lbds137
main.py12.5 kB
""" Main MCP server implementation that orchestrates all modular components. """ import logging import os import sys from logging.handlers import RotatingFileHandler from os import PathLike from typing import IO, Any, Dict, Optional, Union from .core.orchestrator import ConversationOrchestrator from .core.registry import ToolRegistry from .json_rpc import JsonRpcServer, create_result_response from .models.manager import DualModelManager from .services.cache import ResponseCache from .services.memory import ConversationMemory # Try to import dotenv if available try: from dotenv import load_dotenv HAS_DOTENV = True except ImportError: HAS_DOTENV = False def load_dotenv( dotenv_path: Optional[Union[str, PathLike[str]]] = None, stream: Optional[IO[str]] = None, verbose: bool = False, override: bool = False, interpolate: bool = True, encoding: Optional[str] = None, ) -> bool: """Dummy function when dotenv is not available.""" return False logger = logging.getLogger(__name__) __version__ = "3.0.0" class GeminiMCPServer: """Main MCP Server that integrates all modular components.""" def __init__(self): """Initialize the server with modular components.""" # Load environment variables at startup self._load_env_file() self.model_manager: Optional[DualModelManager] = None self.tool_registry = ToolRegistry() self.cache = ResponseCache(max_size=100, ttl_seconds=3600) self.memory = ConversationMemory(max_turns=50, max_entries=100) self.orchestrator: Optional[ConversationOrchestrator] = None # Create JSON-RPC server self.server = JsonRpcServer("gemini-mcp-server") self._setup_handlers() # Make server instance available globally for tools import gemini_mcp setattr(gemini_mcp, "_server_instance", self) # Also set as global for bundled mode globals()["_server_instance"] = self def _load_env_file(self) -> None: """Load .env file from multiple possible locations.""" # Try multiple locations for .env file # 1. Directory of the main entry point (works with launcher.py) main_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # 2. Parent directory of main (in case we're in a subdirectory) parent_dir = os.path.dirname(main_dir) # 3. Current working directory cwd = os.getcwd() # 4. Script directory (where this file is) script_dir = os.path.dirname(os.path.abspath(__file__)) env_locations = [ os.path.join(main_dir, ".env"), os.path.join(parent_dir, ".env"), os.path.join(cwd, ".env"), os.path.join(script_dir, ".env"), ] # If python-dotenv is available, try to use it first if HAS_DOTENV: env_loaded = False for env_path in env_locations: if os.path.exists(env_path): logger.info(f"Loading .env from {env_path}") load_dotenv(env_path) env_loaded = True break if not env_loaded: # Try current directory as last fallback logger.info("No .env file found in expected locations, trying current directory") load_dotenv() else: # Manual .env loading if python-dotenv is not available for env_path in env_locations: if os.path.exists(env_path): logger.info(f"Loading .env from {env_path} (manual mode)") try: with open(env_path, "r") as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) # Strip whitespace from key and value key = key.strip() value = value.strip() # Remove quotes if present if (value.startswith('"') and value.endswith('"')) or ( value.startswith("'") and value.endswith("'") ): value = value[1:-1] os.environ[key] = value if key == "GEMINI_API_KEY": logger.info( f"Set GEMINI_API_KEY from .env file " f"(length: {len(value)})" ) break except Exception as e: logger.error(f"Failed to load .env file: {e}") def _initialize_model_manager(self) -> bool: """Initialize the model manager with API key.""" api_key = os.getenv("GEMINI_API_KEY") if not api_key: logger.error("No GEMINI_API_KEY found in environment. Please check your .env file.") # Log all env vars starting with GEMINI for debugging gemini_vars = {k: v for k, v in os.environ.items() if k.startswith("GEMINI")} if gemini_vars: logger.info(f"Found GEMINI env vars: {list(gemini_vars.keys())}") else: logger.warning("No GEMINI environment variables found at all") return False try: logger.info(f"Initializing DualModelManager with API key (length: {len(api_key)})") self.model_manager = DualModelManager(api_key) # Create orchestrator with all components logger.info("Creating conversation orchestrator...") self.orchestrator = ConversationOrchestrator( tool_registry=self.tool_registry, model_manager=self.model_manager, memory=self.memory, cache=self.cache, ) logger.info("Model manager initialization complete") return True except Exception as e: logger.error(f"Failed to initialize model manager: {e}", exc_info=True) return False def _setup_handlers(self): """Set up JSON-RPC handlers.""" # Register handlers self.server.register_handler("initialize", self.handle_initialize) self.server.register_handler("tools/list", self.handle_tools_list) self.server.register_handler("tools/call", self.handle_tool_call) def handle_initialize(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialization request.""" # Reload environment variables in case they changed self._load_env_file() # Log the API key status api_key = os.environ.get("GEMINI_API_KEY") if api_key: logger.info(f"GEMINI_API_KEY found (length: {len(api_key)})") else: logger.warning("GEMINI_API_KEY not found in environment") # Discover and register all tools FIRST self.tool_registry.discover_tools() logger.info(f"Registered {len(self.tool_registry.list_tools())} tools") # Initialize model manager AFTER tools are registered model_initialized = self._initialize_model_manager() return create_result_response( request_id, { "protocolVersion": "2024-11-05", "serverInfo": { "name": "gemini-mcp-server", "version": __version__, "modelsAvailable": model_initialized, }, "capabilities": {"tools": {}}, }, ) def handle_tools_list(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tool list request.""" # Get tool definitions from registry tool_defs = self.tool_registry.get_mcp_tool_definitions() # Convert to tool list format tools = [] for tool_def in tool_defs: tools.append( { "name": tool_def["name"], "description": tool_def["description"], "inputSchema": tool_def["inputSchema"], } ) return create_result_response(request_id, {"tools": tools}) def handle_tool_call(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tool execution request.""" tool_name = params.get("name", "") arguments = params.get("arguments", {}) logger.info(f"Executing tool: {tool_name}") # Validate tool name if not tool_name: return create_result_response( request_id, {"content": [{"type": "text", "text": "❌ Error: Tool name is required"}]}, ) # Check if models are initialized if not self.orchestrator: result = ( "❌ Gemini models not initialized. " "Please set GEMINI_API_KEY environment variable." ) return create_result_response( request_id, {"content": [{"type": "text", "text": result}]} ) # Execute tool through orchestrator try: # Use orchestrator to execute tool (async converted to sync) import asyncio # Create event loop if needed try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: output = loop.run_until_complete( self.orchestrator.execute_tool( tool_name=tool_name, parameters=arguments, request_id=request_id ) ) if output.success: result = output.result or "" else: result = f"❌ Error: {output.error or 'Unknown error'}" finally: # Clean up loop if we created it if asyncio.get_event_loop() is loop: loop.close() except Exception as e: logger.error(f"Error executing tool {tool_name}: {e}") result = f"❌ Error executing tool: {str(e)}" return create_result_response(request_id, {"content": [{"type": "text", "text": result}]}) def run(self): """Run the MCP server.""" logger.info(f"Starting Gemini MCP Server v{__version__} (Modular)") # Configure unbuffered output for proper MCP communication sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 1) sys.stderr = os.fdopen(sys.stderr.fileno(), "w", 1) # Run the JSON-RPC server self.server.run() def main(): """Main entry point.""" # Create logs directory if it doesn't exist log_dir = os.path.expanduser("~/.claude-mcp-servers/gemini-collab/logs") os.makedirs(log_dir, exist_ok=True) # Configure logging with both stderr and file output log_file = os.path.join(log_dir, "gemini-mcp-server.log") # Create handlers handlers = [ logging.StreamHandler(sys.stderr), RotatingFileHandler( log_file, mode="a", encoding="utf-8", maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, # Keep 5 backup files ), ] # Configure logging # Use DEBUG level if GEMINI_DEBUG env var is set, otherwise INFO log_level = logging.DEBUG if os.getenv("GEMINI_DEBUG") else logging.INFO logging.basicConfig( level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=handlers, force=True, # Ensure logging is configured even if already configured elsewhere ) logger.info(f"Logging to file: {log_file}") try: server = GeminiMCPServer() server.run() except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lbds137/gemini-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server