Skip to main content
Glama

Enhanced Gemini MCP Server

by ECamposSoria
enhanced_server.py33.4 kB
#!/usr/bin/env python3 """ Enhanced Claude-Gemini MCP Server Leverages Gemini's 1M token context for intelligent codebase analysis Based on Gemini's architectural recommendations """ import json import sys import os import glob import asyncio from typing import Dict, Any, Optional, List, Tuple from pathlib import Path from dataclasses import dataclass from datetime import datetime, timedelta import hashlib import time # Ensure unbuffered output sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 1) sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', 1) __version__ = "2.0.0" # Token counting - try tiktoken first, fallback to estimation try: import tiktoken TIKTOKEN_AVAILABLE = True encoding = tiktoken.get_encoding("cl100k_base") except ImportError: TIKTOKEN_AVAILABLE = False # Initialize Gemini try: from google import genai from google.genai import types API_KEY = os.environ.get("GEMINI_API_KEY", "YOUR_API_KEY_HERE") if API_KEY == "YOUR_API_KEY_HERE": print(json.dumps({ "jsonrpc": "2.0", "error": { "code": -32603, "message": "Please set your Gemini API key in GEMINI_API_KEY environment variable" } }), file=sys.stdout, flush=True) sys.exit(1) client = genai.Client(api_key=API_KEY) GEMINI_AVAILABLE = True except Exception as e: GEMINI_AVAILABLE = False GEMINI_ERROR = str(e) @dataclass class FileInfo: """Metadata for each file in the codebase""" path: str content: str tokens: int size: int modified: datetime language: str relevance_score: float class CodebaseIngestion: """Handles codebase scanning, parsing, and relevance scoring""" # File extensions to include CODE_EXTENSIONS = { '.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.c', '.h', '.cs', '.php', '.rb', '.go', '.rs', '.swift', '.kt', '.scala', '.html', '.css', '.scss', '.vue', '.svelte', '.md', '.yml', '.yaml', '.json', '.xml', '.sql', '.sh', '.dockerfile', '.tf', '.tf' } # Directories and patterns to skip SKIP_PATTERNS = { 'node_modules', '.git', '__pycache__', '.venv', 'venv', 'dist', 'build', '.next', '.nuxt', 'target', 'bin', 'obj', '.idea', '.vscode', 'coverage', '.pytest_cache', '.mypy_cache', '*.lock', 'package-lock.json', 'yarn.lock', '*.log' } @staticmethod def count_tokens(text: str) -> int: """Count tokens using tiktoken or fallback estimation""" if TIKTOKEN_AVAILABLE: return len(encoding.encode(text)) else: # Fallback: rough estimate (4 chars per token) return len(text) // 4 @staticmethod def get_language(file_path: str) -> str: """Detect programming language from file extension""" ext = Path(file_path).suffix.lower() language_map = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.jsx': 'react', '.tsx': 'react-ts', '.java': 'java', '.cpp': 'cpp', '.c': 'c', '.h': 'c-header', '.cs': 'csharp', '.php': 'php', '.rb': 'ruby', '.go': 'go', '.rs': 'rust', '.swift': 'swift', '.kt': 'kotlin', '.scala': 'scala', '.html': 'html', '.css': 'css', '.scss': 'scss', '.vue': 'vue', '.svelte': 'svelte', '.md': 'markdown', '.yml': 'yaml', '.yaml': 'yaml', '.json': 'json', '.xml': 'xml', '.sql': 'sql', '.sh': 'bash', '.dockerfile': 'docker', '.tf': 'terraform' } return language_map.get(ext, 'unknown') @staticmethod def calculate_relevance_score(file_info: Dict[str, Any], project_root: str) -> float: """Calculate relevance score for file prioritization""" score = 1.0 path = file_info['relative_path'] # Language importance (core languages get higher scores) language_weights = { 'python': 1.2, 'javascript': 1.2, 'typescript': 1.3, 'java': 1.2, 'cpp': 1.1, 'go': 1.2, 'rust': 1.2, 'css': 0.8, 'html': 0.7, 'json': 0.6, 'yaml': 0.5 } score *= language_weights.get(file_info['language'], 1.0) # Directory importance (src, lib, core dirs are more important) if any(core_dir in path for core_dir in ['src/', 'lib/', 'core/', 'app/']): score *= 1.3 elif any(low_dir in path for low_dir in ['test/', 'tests/', 'docs/', 'doc/']): score *= 0.7 elif any(config_dir in path for config_dir in ['config/', 'configs/', 'settings/']): score *= 0.6 # File size factor (prefer smaller files for better fit) if file_info['tokens'] < 100: score *= 1.1 elif file_info['tokens'] > 5000: score *= 0.8 elif file_info['tokens'] > 10000: score *= 0.6 # Main files get higher priority filename = Path(path).name.lower() if filename in ['main.py', 'index.js', 'app.py', 'server.py', 'main.go']: score *= 1.5 elif filename.startswith('test_') or filename.endswith('_test.py'): score *= 0.7 return score @classmethod def scan_codebase(cls, project_path: str, max_tokens: int = 900000) -> Dict[str, Any]: """Scan and prepare codebase for Gemini context""" project_path = Path(project_path).resolve() files_info = [] total_tokens = 0 for file_path in project_path.rglob('*'): # Skip directories and hidden files if file_path.is_dir() or file_path.name.startswith('.'): continue # Skip unwanted patterns relative_path = file_path.relative_to(project_path) if any(pattern in str(relative_path) for pattern in cls.SKIP_PATTERNS): continue # Only include code files if file_path.suffix.lower() not in cls.CODE_EXTENSIONS: continue try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Skip empty files if not content.strip(): continue file_tokens = cls.count_tokens(content) # Skip extremely large individual files if file_tokens > 50000: continue file_info = { 'path': str(file_path), 'relative_path': str(relative_path), 'content': content, 'tokens': file_tokens, 'size': len(content), 'modified': datetime.fromtimestamp(file_path.stat().st_mtime), 'language': cls.get_language(str(file_path)) } # Calculate relevance score file_info['relevance_score'] = cls.calculate_relevance_score(file_info, str(project_path)) files_info.append(file_info) total_tokens += file_tokens except Exception as e: continue # Sort by relevance score (highest first) files_info.sort(key=lambda x: x['relevance_score'], reverse=True) # Trim to fit within token limit selected_files = [] used_tokens = 0 for file_info in files_info: if used_tokens + file_info['tokens'] <= max_tokens: selected_files.append(file_info) used_tokens += file_info['tokens'] else: break return { 'files': selected_files, 'total_tokens': used_tokens, 'total_files': len(selected_files), 'scanned_files': len(files_info), 'project_path': str(project_path) } class ContextManager: """Manages the 1M token context window for Gemini""" @staticmethod def create_codebase_context(codebase_data: Dict[str, Any]) -> str: """Create formatted context for Gemini with structured representation""" context = f"""# INTELLIGENT CODEBASE ANALYSIS CONTEXT ## PROJECT OVERVIEW - **Path:** {codebase_data['project_path']} - **Files Loaded:** {codebase_data['total_files']} (out of {codebase_data['scanned_files']} scanned) - **Total Tokens:** {codebase_data['total_tokens']:,} - **Analysis Capabilities:** Architecture, Semantic Search, Improvements, Code Flow ## CODEBASE STRUCTURE """ # Add file tree structure context += "### File Tree:\n" for file_info in codebase_data['files'][:20]: # Show top 20 files in tree context += f"- {file_info['relative_path']} ({file_info['language']}, {file_info['tokens']} tokens, score: {file_info['relevance_score']:.2f})\n" if len(codebase_data['files']) > 20: context += f"... and {len(codebase_data['files']) - 20} more files\n" context += "\n## COMPLETE FILE CONTENTS:\n\n" # Add all file contents with clear delimiters for file_info in codebase_data['files']: context += f"### 📁 {file_info['relative_path']} ({file_info['language']})\n" context += f"```{file_info['language']}\n{file_info['content']}\n```\n\n" context += """ ## ANALYSIS INSTRUCTIONS: You now have the complete codebase loaded in your context with intelligent file prioritization. When answering questions: 1. Reference specific files and line numbers when possible 2. Consider the overall architecture and relationships between files 3. Provide concrete, actionable insights 4. Focus on the most relevant files based on the user's query 5. Use your understanding of the complete codebase context """ return context class AnalysisSession: """Lightweight session cache for analysis results""" def __init__(self, ttl_minutes: int = 30): self.cache: Dict[str, Dict[str, Any]] = {} self.ttl = timedelta(minutes=ttl_minutes) self.project_path: Optional[str] = None self.codebase_loaded_at: Optional[datetime] = None def _make_cache_key(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Create a unique cache key from tool name and arguments""" # Sort arguments for consistent hashing args_str = json.dumps(arguments, sort_keys=True) return f"{tool_name}:{hashlib.md5(args_str.encode()).hexdigest()}" def _is_expired(self, cache_entry: Dict[str, Any]) -> bool: """Check if a cache entry has expired""" return datetime.now() - cache_entry['timestamp'] > self.ttl def get(self, tool_name: str, arguments: Dict[str, Any]) -> Optional[str]: """Get cached result if available and not expired""" key = self._make_cache_key(tool_name, arguments) if key in self.cache: entry = self.cache[key] if not self._is_expired(entry): return entry['result'] + "\n\n📌 *(Cached result - generated " + \ f"{int((datetime.now() - entry['timestamp']).total_seconds() / 60)} minutes ago)*" else: # Remove expired entry del self.cache[key] return None def set(self, tool_name: str, arguments: Dict[str, Any], result: str): """Cache an analysis result""" key = self._make_cache_key(tool_name, arguments) self.cache[key] = { 'result': result, 'timestamp': datetime.now(), 'tool_name': tool_name, 'arguments': arguments } def clear(self): """Clear all cached results""" self.cache.clear() self.project_path = None self.codebase_loaded_at = None def set_project(self, project_path: str): """Set current project and clear cache if project changed""" if self.project_path and self.project_path != project_path: # Different project, clear cache self.clear() self.project_path = project_path self.codebase_loaded_at = datetime.now() def export_session(self, export_path: Optional[str] = None) -> str: """Export important findings to a markdown file""" if not self.project_path: return "❌ No active session to export" if not export_path: export_path = self.project_path # Create export directory export_dir = os.path.join(export_path, ".gemini-analysis") os.makedirs(export_dir, exist_ok=True) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"session_export_{timestamp}.md" filepath = os.path.join(export_dir, filename) # Build export content content = f"""# Gemini Analysis Session Export **Project:** {self.project_path} **Session Started:** {self.codebase_loaded_at} **Export Time:** {datetime.now()} **Cached Analyses:** {len(self.cache)} --- """ # Add each cached analysis for key, entry in sorted(self.cache.items(), key=lambda x: x[1]['timestamp']): tool_name = entry['tool_name'] args = entry['arguments'] timestamp = entry['timestamp'].strftime("%Y-%m-%d %H:%M:%S") content += f"## {tool_name}" if args: content += f" - {json.dumps(args)}" content += f"\n\n**Generated:** {timestamp}\n\n" content += entry['result'] content += "\n\n---\n\n" # Write to file with open(filepath, 'w', encoding='utf-8') as f: f.write(content) # Also update .gitignore if it exists gitignore_path = os.path.join(self.project_path, ".gitignore") if os.path.exists(gitignore_path): with open(gitignore_path, 'r', encoding='utf-8') as f: gitignore_content = f.read() if ".gemini-analysis/" not in gitignore_content: with open(gitignore_path, 'a', encoding='utf-8') as f: f.write("\n# Gemini analysis exports\n.gemini-analysis/\n") return f"✅ Session exported to: {filepath}\n\nNote: Added .gemini-analysis/ to .gitignore to prevent committing analysis files." def get_stats(self) -> Dict[str, Any]: """Get session statistics""" if not self.cache: return {"cached_analyses": 0, "cache_size_kb": 0} # Calculate approximate cache size (just count result text) cache_size = sum(len(entry['result']) + len(entry['tool_name']) + len(json.dumps(entry['arguments'])) for entry in self.cache.values()) return { "cached_analyses": len(self.cache), "cache_size_kb": round(cache_size / 1024, 2), "oldest_entry": min((e['timestamp'] for e in self.cache.values()), default=None), "newest_entry": max((e['timestamp'] for e in self.cache.values()), default=None) } # Global state current_codebase_context = None current_project_path = None current_codebase_data = None analysis_session = AnalysisSession(ttl_minutes=30) def send_response(response: Dict[str, Any]): """Send a JSON-RPC response""" print(json.dumps(response), flush=True) def handle_initialize(request_id: Any) -> Dict[str, Any]: """Handle initialization""" return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": "enhanced-claude-gemini-mcp", "version": __version__ } } } def handle_tools_list(request_id: Any) -> Dict[str, Any]: """List available enhanced analysis tools""" if not GEMINI_AVAILABLE: return { "jsonrpc": "2.0", "id": request_id, "result": {"tools": []} } tools = [ { "name": "load_codebase", "description": "Load entire codebase into Gemini's 1M token context with intelligent prioritization", "inputSchema": { "type": "object", "properties": { "project_path": { "type": "string", "description": "Path to the project directory to analyze" }, "max_tokens": { "type": "number", "description": "Maximum tokens to use (default: 900000)", "default": 900000 } }, "required": ["project_path"] } }, { "name": "analyze_architecture", "description": "Get comprehensive architecture analysis of loaded codebase (results cached for 30 minutes)", "inputSchema": { "type": "object", "properties": { "focus": { "type": "string", "description": "Focus area: architecture, patterns, dependencies, structure, or custom query", "default": "architecture" } } } }, { "name": "semantic_search", "description": "Search codebase semantically using natural language queries (results cached for 30 minutes)", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Natural language description of what to find in the codebase" } }, "required": ["query"] } }, { "name": "suggest_improvements", "description": "Get specific improvement suggestions for the loaded codebase (results cached for 30 minutes)", "inputSchema": { "type": "object", "properties": { "area": { "type": "string", "description": "Focus area: performance, security, maintainability, testing, architecture, or general", "default": "general" } } } }, { "name": "explain_codeflow", "description": "Trace and explain how specific functionality works across the codebase (results cached for 30 minutes)", "inputSchema": { "type": "object", "properties": { "functionality": { "type": "string", "description": "Describe the functionality to trace through the codebase" } }, "required": ["functionality"] } }, { "name": "codebase_summary", "description": "Get a comprehensive summary of the loaded codebase (results cached for 30 minutes)", "inputSchema": { "type": "object", "properties": {} } }, { "name": "ask_with_context", "description": "Ask any question with full codebase context and intelligent analysis (results cached for 30 minutes)", "inputSchema": { "type": "object", "properties": { "question": { "type": "string", "description": "Your question about the codebase" } }, "required": ["question"] } }, { "name": "export_session", "description": "Export cached analysis results to a markdown file for future reference", "inputSchema": { "type": "object", "properties": { "export_path": { "type": "string", "description": "Optional path to export the session (defaults to project path)" } } } }, { "name": "session_stats", "description": "Get current session cache statistics", "inputSchema": { "type": "object", "properties": {} } } ] return { "jsonrpc": "2.0", "id": request_id, "result": {"tools": tools} } def call_gemini_with_context(prompt: str, temperature: float = 0.3) -> str: """Call Gemini with current codebase context""" try: if current_codebase_context: full_prompt = current_codebase_context + "\n\n## USER QUERY:\n" + prompt else: full_prompt = "❌ No codebase loaded. Please use 'load_codebase' first.\n\n" + prompt response = client.models.generate_content( model='gemini-2.5-flash', contents=full_prompt, config=types.GenerateContentConfig( temperature=temperature, max_output_tokens=8192, ) ) return response.text except Exception as e: return f"Error calling Gemini: {str(e)}" def handle_tool_call(request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]: """Handle enhanced tool execution""" global current_codebase_context, current_project_path, current_codebase_data tool_name = params.get("name") arguments = params.get("arguments", {}) try: if tool_name == "load_codebase": project_path = arguments.get("project_path") max_tokens = arguments.get("max_tokens", 900000) if not os.path.exists(project_path): result = f"❌ Error: Project path '{project_path}' does not exist" else: # Scan and ingest codebase codebase_data = CodebaseIngestion.scan_codebase(project_path, max_tokens) current_codebase_context = ContextManager.create_codebase_context(codebase_data) current_project_path = project_path current_codebase_data = codebase_data # Update session with new project analysis_session.set_project(project_path) # Create summary with statistics languages = {} for file_info in codebase_data['files']: lang = file_info['language'] languages[lang] = languages.get(lang, 0) + 1 lang_summary = ", ".join([f"{lang}: {count}" for lang, count in sorted(languages.items())]) result = f"""✅ INTELLIGENT CODEBASE LOADED SUCCESSFULLY 📁 **Project:** {codebase_data['project_path']} 📄 **Files Loaded:** {codebase_data['total_files']} (from {codebase_data['scanned_files']} scanned) 🧮 **Tokens Used:** {codebase_data['total_tokens']:,} / 1,000,000 📊 **Languages:** {lang_summary} 🎯 **Prioritization:** Files ranked by relevance score Gemini now has your entire codebase intelligently loaded in context! **Available Analysis Tools:** • `analyze_architecture` - Comprehensive architecture analysis • `semantic_search` - Natural language code search • `suggest_improvements` - Targeted improvement suggestions • `explain_codeflow` - Trace functionality across files • `codebase_summary` - Complete project overview • `ask_with_context` - Ask anything about your code Ready for intelligent analysis! 🧠 💡 **Session Caching:** Analysis results are cached for 30 minutes to improve performance.""" elif tool_name == "analyze_architecture": if not current_codebase_context: result = "❌ No codebase loaded. Use 'load_codebase' first." else: # Check cache first cached_result = analysis_session.get(tool_name, arguments) if cached_result: result = cached_result else: focus = arguments.get("focus", "architecture") prompt = f"""Provide a comprehensive {focus} analysis of this codebase. Include: 1. **Overall Architecture & Design Patterns** 2. **Key Components & Their Relationships** 3. **Data Flow & Dependencies** 4. **Technology Stack & Framework Usage** 5. **Code Organization & Structure Quality** 6. **Notable Design Decisions** Be specific and reference actual files, functions, and code patterns you observe.""" result = call_gemini_with_context(prompt, 0.2) # Cache the result analysis_session.set(tool_name, arguments, result) elif tool_name == "semantic_search": if not current_codebase_context: result = "❌ No codebase loaded. Use 'load_codebase' first." else: # Check cache first cached_result = analysis_session.get(tool_name, arguments) if cached_result: result = cached_result else: query = arguments.get("query") prompt = f"""Perform a semantic search for: "{query}" Provide: 1. **Exact file locations** where this functionality exists 2. **Relevant code snippets** with line context 3. **Related functions/classes** that work together 4. **Usage patterns** across the codebase 5. **Dependencies** and connections to other parts Focus on semantic meaning, not just keyword matching.""" result = call_gemini_with_context(prompt, 0.2) # Cache the result analysis_session.set(tool_name, arguments, result) elif tool_name == "suggest_improvements": if not current_codebase_context: result = "❌ No codebase loaded. Use 'load_codebase' first." else: # Check cache first cached_result = analysis_session.get(tool_name, arguments) if cached_result: result = cached_result else: area = arguments.get("area", "general") prompt = f"""Analyze the codebase and suggest specific improvements for {area}: Provide: 1. **Specific Issues** with file/line references 2. **Concrete Solutions** with code examples 3. **Priority Ranking** (High/Medium/Low) 4. **Implementation Steps** for each suggestion 5. **Potential Risks** of each change Focus on actionable improvements with clear benefits.""" result = call_gemini_with_context(prompt, 0.3) # Cache the result analysis_session.set(tool_name, arguments, result) elif tool_name == "explain_codeflow": if not current_codebase_context: result = "❌ No codebase loaded. Use 'load_codebase' first." else: # Check cache first cached_result = analysis_session.get(tool_name, arguments) if cached_result: result = cached_result else: functionality = arguments.get("functionality") prompt = f"""Trace and explain how this functionality works: "{functionality}" Provide: 1. **Entry Points** - Where this functionality starts 2. **Code Flow** - Step-by-step execution path across files 3. **Key Functions/Classes** - Main components involved 4. **Data Transformations** - How data flows and changes 5. **External Dependencies** - APIs, databases, etc. 6. **Visual Flow** - ASCII diagram if helpful Reference specific files and line numbers.""" result = call_gemini_with_context(prompt, 0.2) # Cache the result analysis_session.set(tool_name, arguments, result) elif tool_name == "codebase_summary": if not current_codebase_context: result = "❌ No codebase loaded. Use 'load_codebase' first." else: # Check cache first cached_result = analysis_session.get(tool_name, arguments) if cached_result: result = cached_result else: prompt = """Provide a comprehensive summary of this entire codebase: 1. **Project Purpose** - What does this software do? 2. **Architecture Overview** - How is it structured? 3. **Key Features** - Main functionality areas 4. **Technology Stack** - Languages, frameworks, tools 5. **Code Quality Assessment** - Strengths and areas for improvement 6. **Development Insights** - Patterns, conventions, notable aspects Make it accessible for both technical and non-technical stakeholders.""" result = call_gemini_with_context(prompt, 0.3) # Cache the result analysis_session.set(tool_name, arguments, result) elif tool_name == "ask_with_context": if not current_codebase_context: result = "❌ No codebase loaded. Use 'load_codebase' first." else: # Check cache first cached_result = analysis_session.get(tool_name, arguments) if cached_result: result = cached_result else: question = arguments.get("question") result = call_gemini_with_context(question, 0.3) # Cache the result analysis_session.set(tool_name, arguments, result) elif tool_name == "export_session": export_path = arguments.get("export_path") result = analysis_session.export_session(export_path) elif tool_name == "session_stats": stats = analysis_session.get_stats() if stats["cached_analyses"] == 0: result = "📦 No cached analyses in current session." else: result = f"""📊 **Session Cache Statistics:** 📝 **Cached Analyses:** {stats['cached_analyses']} 💾 **Cache Size:** {stats['cache_size_kb']} KB ⏱️ **Session Started:** {analysis_session.codebase_loaded_at or 'N/A'} 📅 **Oldest Entry:** {stats.get('oldest_entry', 'N/A')} 🆕 **Newest Entry:** {stats.get('newest_entry', 'N/A')} 💡 Tip: Use `export_session` to save important findings before they expire!""" else: raise ValueError(f"Unknown tool: {tool_name}") return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": f"🧠 GEMINI INTELLIGENT ANALYSIS:\n\n{result}" } ] } } except Exception as e: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": str(e) } } def main(): """Main server loop""" while True: try: line = sys.stdin.readline() if not line: break request = json.loads(line.strip()) method = request.get("method") request_id = request.get("id") params = request.get("params", {}) if method == "initialize": response = handle_initialize(request_id) elif method == "tools/list": response = handle_tools_list(request_id) elif method == "tools/call": response = handle_tool_call(request_id, params) else: response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } send_response(response) except json.JSONDecodeError: continue except EOFError: break except Exception as e: if 'request_id' in locals(): send_response({ "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } }) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ECamposSoria/gemini-mcp-enhanced'

If you have feedback or need assistance with the MCP directory API, please join our Discord server