Skip to main content
Glama

Gemini MCP Server

by lbds137
bundler.py22.8 kB
#!/usr/bin/env python3 """ Bundler that creates a working single-file server from modular components. Works with the modular architecture to combine all components into a single deployable file. """ import ast import logging import re import sys import textwrap from pathlib import Path from typing import Dict, Optional # Configure logging only if running as main script if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).parent.parent SRC_DIR = PROJECT_ROOT / "src" / "gemini_mcp" OUTPUT_FILE = PROJECT_ROOT / "server.py" class Bundler: """Creates a bundled server.py using dynamic discovery for the modular architecture.""" def __init__(self): self.output_lines = [] self.discovered_components = [] # List of (rel_path, docstring) tuples self.discovered_tools = [] # List of tool info dicts def discover_all(self) -> None: """Discovers all components and tools in a single pass.""" # Define the order for core components. This order is critical to ensure # base classes and core modules are defined before they are used by other components. ordered_paths = [ "json_rpc.py", # Core JSON-RPC implementation "models/base.py", # Base models must come before implementations "models/manager.py", # Model manager "models/memory.py", # Memory models "services/cache.py", # Cache service "services/memory.py", # Memory service "tools/base.py", # Tool base class must come before tool implementations "core/registry.py", # Registry needs tool base "core/orchestrator.py", # Orchestrator uses registry "protocols/debate.py", # Protocols come last "main.py", # Main server class (not server.py!) ] # Process ordered components first for rel_path in ordered_paths: file_path = SRC_DIR / rel_path if file_path.exists(): self._process_file(file_path, is_tool=False) else: # Skip warning for missing files we don't need if rel_path not in ["server.py"]: # We use main.py instead of server.py logger.warning(f"Core component not found: {file_path}") # Then discover and process tools tools_dir = SRC_DIR / "tools" if tools_dir.exists(): for tool_file in sorted(tools_dir.glob("*.py")): if tool_file.name not in ["__init__.py", "base.py"]: self._process_file(tool_file, is_tool=True) else: logger.error(f"Tools directory not found: {tools_dir}") def _process_file(self, file_path: Path, is_tool: bool = False) -> None: """Parses a single file to extract components and tool information.""" rel_path = file_path.relative_to(SRC_DIR).as_posix() try: with open(file_path, "r", encoding="utf-8") as f: source = f.read() tree = ast.parse(source, filename=str(file_path)) # Get component info (docstring) docstring = ast.get_docstring(tree) or "No description available" self.discovered_components.append((rel_path, docstring.strip())) # If it's a tool file, look for tool classes if is_tool: self._extract_tools_from_ast(tree, file_path) except SyntaxError as e: logger.error(f"Syntax error in {file_path}: {e}") self.discovered_components.append((rel_path, "Component file (syntax error)")) except OSError as e: logger.error(f"Failed to read {file_path}: {e}") except Exception as e: logger.error(f"Unexpected error processing {file_path}: {e}") def _extract_tools_from_ast(self, tree: ast.AST, file_path: Path) -> None: """Extract tool classes from an AST.""" for node in ast.walk(tree): if isinstance(node, ast.ClassDef): if self._is_tool_class(node): tool_info = self._extract_tool_info(node, file_path) if tool_info: self.discovered_tools.append(tool_info) logger.info( f"Discovered tool: {tool_info['class_name']} ({tool_info['tool_name']})" ) def _is_tool_class(self, node: ast.ClassDef) -> bool: """Check if a class definition is a tool class.""" # Check direct base classes base_names = [] for base in node.bases: if isinstance(base, ast.Name): base_names.append(base.id) elif isinstance(base, ast.Attribute): # Handle cases like module.BaseTool base_names.append(base.attr) return any(base in ["BaseTool", "MCPTool"] for base in base_names) def _extract_tool_info(self, node: ast.ClassDef, file_path: Path) -> Optional[Dict[str, str]]: """Extract tool information from a class definition.""" tool_name = None # Look for the name property for item in node.body: if isinstance(item, ast.FunctionDef) and item.name == "name": # Check if it's a property has_property_decorator = any( isinstance(d, ast.Name) and d.id == "property" for d in item.decorator_list ) if has_property_decorator: # Find the return statement for stmt in item.body: if isinstance(stmt, ast.Return) and isinstance(stmt.value, ast.Constant): tool_name = stmt.value.value break if tool_name: return {"class_name": node.name, "tool_name": tool_name, "file_name": file_path.stem} else: logger.warning(f"Tool class {node.name} in {file_path} has no name property") return None def generate_imports(self) -> str: """Generate the import section for the bundled file.""" imports = textwrap.dedent( """ #!/usr/bin/env python3 # flake8: noqa \"\"\" Gemini MCP Server - Single File Bundle MCP server that enables Claude to collaborate with Gemini AI models. This version combines all modular components into a single deployable file. Generated by bundler. \"\"\" import asyncio import collections import hashlib import importlib import inspect import json import logging import os import sys import time from abc import ABC, abstractmethod from collections import OrderedDict, deque from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type import google.generativeai as genai from google.api_core import exceptions as google_exceptions try: from dotenv import load_dotenv except ImportError: load_dotenv = None # Create logger without configuring (main() will configure) logger = logging.getLogger("gemini-mcp") __version__ = "3.0.0" # Global model manager instance (will be set by server) model_manager = None # Create gemini_mcp namespace for bundled mode class _GeminiMCP: _server_instance = None gemini_mcp = _GeminiMCP() """ ).strip() return imports def generate_tool_registry_override(self) -> str: """Generate code to override ToolRegistry for bundled operation.""" tool_class_names = [tool["class_name"] for tool in self.discovered_tools] tool_list = ",\n ".join(tool_class_names) override_code = textwrap.dedent( f""" # ========== Tool Registry Override for Bundled Operation ========== # Store the bundled tool classes globally BUNDLED_TOOL_CLASSES = [ {tool_list} ] # Override the ToolRegistry's discover_tools method for bundled operation def _bundled_discover_tools(self, tools_path: Optional[Path] = None) -> None: \"\"\"Discover and register all tools - bundled version.\"\"\" # Clear any existing tools to ensure clean state self._tools.clear() self._tool_classes.clear() logger.info("Registering bundled tools") for tool_class in BUNDLED_TOOL_CLASSES: try: tool_instance = tool_class() tool_name = tool_instance.name # Use property access self._tools[tool_name] = tool_instance self._tool_classes[tool_name] = tool_class logger.info(f"Registered tool: {{tool_name}}") except Exception as e: logger.error(f"Failed to register tool {{tool_class.__name__}}: {{e}}") logger.info(f"Registered {{len(self._tools)}} tools in bundled mode") # Function to apply the override - will be called from main() def _apply_tool_registry_override(): \"\"\"Apply the bundled tool registry override.\"\"\" ToolRegistry.discover_tools = _bundled_discover_tools """ ).strip() return override_code def clean_content(self, content: str, rel_path: str) -> str: """Clean module content for bundling using AST parsing.""" try: # Parse the content to ensure it's valid Python tree = ast.parse(content) # For main.py, we need special handling if rel_path == "main.py": # Remove the if __name__ == "__main__" block since we'll add our own class MainBlockRemover(ast.NodeTransformer): def visit_If(self, node): # Check if this is a __name__ == "__main__" check if ( isinstance(node.test, ast.Compare) and isinstance(node.test.left, ast.Name) and node.test.left.id == "__name__" and len(node.test.comparators) == 1 and isinstance(node.test.comparators[0], ast.Constant) and node.test.comparators[0].value == "__main__" ): return None # Remove this node return self.generic_visit(node) tree = MainBlockRemover().visit(tree) # Convert back to source import astor cleaned = astor.to_source(tree) # Remove imports that won't work in bundled version lines = cleaned.split("\n") filtered_lines = [] for line in lines: # Skip relative imports if line.strip().startswith("from .") or line.strip().startswith("from .."): continue # Skip imports from our own package if "from gemini_mcp" in line or "from src.gemini_mcp" in line: continue if "import gemini_mcp" in line or "import src.gemini_mcp" in line: continue filtered_lines.append(line) return "\n".join(filtered_lines) except Exception as e: logger.warning( f"Failed to use AST cleaning for {rel_path}, falling back to text processing: {e}" ) # Fallback to simple text processing return self._simple_clean_content(content) def _fix_tool_imports(self, content: str, is_tool: bool) -> str: """Fix tool imports for bundled operation.""" if not is_tool: return content # Look for the model manager access block and replace it lines = content.split("\n") new_lines = [] i = 0 while i < len(lines): line = lines[i] # Check if this is the start of the model manager access block if "# Get model manager from server instance" in line: # Skip lines until we find the generate_content call new_lines.append(" # Access global model manager in bundled version") new_lines.append(" global model_manager") new_lines.append("") # Skip ahead until we find the response_text line while ( i < len(lines) and "response_text, model_used = model_manager.generate_content" not in lines[i] ): i += 1 # Now include the generate_content line if i < len(lines): new_lines.append(lines[i]) else: new_lines.append(line) i += 1 content = "\n".join(new_lines) # Also handle any remaining import attempts content = content.replace( "from .. import model_manager", "# Model manager will be accessed as global in bundled version", ) content = content.replace( "from .. import _server_instance", "# Server instance access not needed in bundled version", ) return content def _fix_orchestrator_for_bundled(self, content: str) -> str: """Fix orchestrator to work with bundled tools.""" if "class ConversationOrchestrator" not in content: return content # Replace the execute_tool method to handle bundled tools new_execute_tool = ''' async def execute_tool( self, tool_name: str, parameters: Dict[str, Any], request_id: Optional[str] = None ) -> ToolOutput: """Execute a single tool with proper context injection.""" # Check cache first cache_key = self.cache.create_key(tool_name, parameters) cached_result = self.cache.get(cache_key) if cached_result: logger.info(f"Cache hit for {tool_name}") return cached_result # Get the tool tool = self.tool_registry.get_tool(tool_name) if not tool: return ToolOutput( success=False, error=f"Unknown tool: {tool_name}" ) # For bundled operation, set global model manager global model_manager model_manager = self.model_manager # Execute the tool try: output = await tool.execute(parameters) except Exception as e: logger.error(f"Error executing tool {tool_name}: {e}") output = ToolOutput(success=False, error=str(e)) # Cache successful results if output.success: self.cache.set(cache_key, output) # Store in execution history self.execution_history.append(output) return output''' # Find and replace the execute_tool method pattern = r"(async def execute_tool\(.*?\) -> ToolOutput:.*?)(return output)" replacement = new_execute_tool.strip() content = re.sub(pattern, replacement, content, flags=re.DOTALL) return content def _simple_clean_content(self, content: str): """Simple text-based content cleaning as fallback.""" lines = content.split("\n") cleaned_lines = [] # Track if we're at the beginning of the file seen_code = False in_module_docstring = False docstring_delimiter = None for i, line in enumerate(lines): stripped = line.strip() # Skip shebang if i == 0 and line.startswith("#!"): continue # Handle module-level docstrings if not seen_code and not in_module_docstring: # Check if this starts a docstring if stripped.startswith('"""') or stripped.startswith("'''"): docstring_delimiter = '"""' if stripped.startswith('"""') else "'''" # Check if it's a single-line docstring if stripped.count(docstring_delimiter) >= 2: # Single line module docstring - skip it continue else: # Multi-line docstring starts in_module_docstring = True continue elif stripped and not stripped.startswith("#"): # This is the first real code seen_code = True # If we're in a module docstring, skip until the end if in_module_docstring: if docstring_delimiter in line: in_module_docstring = False docstring_delimiter = None continue # Skip imports if stripped.startswith("from .") or stripped.startswith("from .."): continue if "from gemini_mcp" in line or "from src.gemini_mcp" in line: continue if stripped.startswith("import gemini_mcp") or stripped.startswith( "import src.gemini_mcp" ): continue # Skip duplicate logger and module-level imports that would conflict if "logger = logging.getLogger" in line: continue if line == "import json" or line == "import logging" or line == "import sys": if seen_code: # Only skip if we've already seen these imports continue # Skip if __name__ == "__main__" blocks to avoid duplicate execution if 'if __name__ == "__main__"' in line: # Skip this block entirely continue if line.strip() == "main()" and i > 0 and "if __name__" in lines[i - 1]: continue cleaned_lines.append(line) return "\n".join(cleaned_lines) def create_bundle(self) -> str: """Create the complete bundled server.""" logger.info("Starting bundle creation...") # Try to import astor for better AST handling try: import astor logger.info("Using AST-based cleaning (astor available)") except ImportError: logger.warning("astor not available, using text-based cleaning") # Discover all components and tools self.discover_all() logger.info( f"Discovered {len(self.discovered_components)} components and {len(self.discovered_tools)} tools" ) # Start building the output output_parts = [] # Add imports output_parts.append(self.generate_imports()) output_parts.append("") # Process each component for rel_path, description in self.discovered_components: file_path = SRC_DIR / rel_path if not file_path.exists(): logger.warning(f"Component file not found: {file_path}") continue logger.info(f"Processing {rel_path}...") try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() # Clean the content cleaned_content = self.clean_content(content, rel_path) # Fix tool imports if needed is_tool = rel_path.startswith("tools/") and rel_path != "tools/base.py" cleaned_content = self._fix_tool_imports(cleaned_content, is_tool) # Fix orchestrator if needed if "orchestrator.py" in rel_path: cleaned_content = self._fix_orchestrator_for_bundled(cleaned_content) if not cleaned_content.strip(): continue # Add section header (but make it safe) # Escape any special characters in description safe_description = description.replace("\n", " ").replace("\r", "") if len(safe_description) > 80: safe_description = safe_description[:77] + "..." output_parts.append("") output_parts.append(f"# {'='*10} {safe_description} {'='*10}") output_parts.append("") output_parts.append(cleaned_content) except Exception as e: logger.error(f"Failed to process {rel_path}: {e}") # Add the tool registry override output_parts.append("") output_parts.append(self.generate_tool_registry_override()) # Add main execution output_parts.append("") output_parts.append(self._generate_main_section()) return "\n".join(output_parts) def _generate_main_section(self) -> str: """Generate the main execution section.""" return textwrap.dedent( """ # ========== Main Execution ========== if __name__ == "__main__": # Apply the tool registry override before running _apply_tool_registry_override() # Call the main function from the bundled code main() """ ).strip() def main(): """Main bundling function.""" logger.info("Bundler for Gemini MCP Server") logger.info(f"Source: {SRC_DIR}") logger.info(f"Output: {OUTPUT_FILE}") bundler = Bundler() try: # Create bundle bundle_content = bundler.create_bundle() # Write output OUTPUT_FILE.write_text(bundle_content, encoding="utf-8") OUTPUT_FILE.chmod(0o755) logger.info(f"✓ Bundle created: {OUTPUT_FILE}") logger.info(f" Size: {len(bundle_content):,} bytes") logger.info(f" Lines: {bundle_content.count(chr(10)):,}") # Test compilation try: compile(bundle_content, str(OUTPUT_FILE), "exec") logger.info("✓ Bundle compiles successfully") except SyntaxError as e: logger.error(f"✗ Syntax error in generated bundle: {e}") logger.error(f" Line {e.lineno}: {e.text}") return 1 except Exception as e: logger.error(f"Failed to create bundle: {e}") import traceback traceback.print_exc() return 1 return 0 if __name__ == "__main__": sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lbds137/gemini-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server