Skip to main content
Glama

Smart Code Search MCP Server

server.pyโ€ข76.1 kB
#!/usr/bin/env python3 """ SCS MCP Server - Smart Code Search for Claude Desktop """ import json import sys import asyncio import logging from pathlib import Path from typing import Dict, Any, List, Optional from mcp.server import Server, InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent, ServerCapabilities, Resource, Prompt, PromptArgument # Configure logging to only log errors to stderr logging.basicConfig( level=logging.ERROR, format='%(message)s', handlers=[logging.StreamHandler(sys.stderr)] ) logger = logging.getLogger(__name__) sys.path.insert(0, str(Path(__file__).parent.parent)) from src.core.clean_search import SmartCodeSearch from src.core.db_wrapper import ThreadSafeDB from src.core.orchestrator import OrchestrationEngine from src.core.context_aware_orchestrator import ContextAwareOrchestrator class SCSMCPServer: """Main MCP server implementation""" def __init__(self): self.server = Server("scs-mcp") self.project_instances: Dict[str, SmartCodeSearch] = {} self.current_project = None self.orchestrator = None # Will be initialized on first use self.model_tools = None # Lazy-loaded model info tools self.setup_tools() self.setup_handlers() self.setup_resources() self.setup_prompts() def setup_tools(self): """Register available tools with the MCP server""" @self.server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="search", description="Search code by meaning using AI embeddings", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "limit": { "type": "integer", "description": "Maximum results", "default": 10 }, "project_path": { "type": "string", "description": "Project path to search", "default": "." }, "search_type": { "type": "string", "enum": ["semantic", "text", "hybrid"], "default": "hybrid" }, "file_type": { "type": "string", "enum": ["code", "document", "config", "all"], "default": "all" }, "language": { "type": "string", "description": "Filter by language" } }, "required": ["query"] } ), Tool( name="index", description="Index a project for semantic search", inputSchema={ "type": "object", "properties": { "project_path": { "type": "string", "description": "Path to project to index" }, "force": { "type": "boolean", "description": "Force re-indexing", "default": False }, "patterns": { "type": "array", "items": {"type": "string"}, "description": "File patterns to include" } }, "required": ["project_path"] } ), Tool( name="analyze_symbol", description="Deep analysis of a specific symbol", inputSchema={ "type": "object", "properties": { "symbol_name": { "type": "string", "description": "Name of symbol to analyze" }, "project_path": { "type": "string", "description": "Project path", "default": "." }, "include_references": { "type": "boolean", "default": True }, "include_tests": { "type": "boolean", "default": True } }, "required": ["symbol_name"] } ), Tool( name="find_similar", description="Find code similar to a given snippet", inputSchema={ "type": "object", "properties": { "code_snippet": { "type": "string", "description": "Code snippet to find similar to" }, "project_path": { "type": "string", "description": "Project path", "default": "." }, "limit": { "type": "integer", "default": 5 } }, "required": ["code_snippet"] } ), Tool( name="get_context", description="Get relevant context for current work", inputSchema={ "type": "object", "properties": { "project_path": { "type": "string", "description": "Project path", "default": "." }, "recent_files": { "type": "array", "items": {"type": "string"}, "description": "Recent files being worked on" }, "recent_symbols": { "type": "array", "items": {"type": "string"}, "description": "Recent symbols referenced" } } } ), # High-value orchestrated tools Tool( name="instant_review", description="Multi-agent code review with instant feedback (920/1000 value)", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to file to review" }, "code": { "type": "string", "description": "Code snippet to review (if file_path not provided)" }, "review_depth": { "type": "string", "enum": ["quick", "standard", "deep"], "default": "standard", "description": "Depth of review" } } } ), Tool( name="debt_orchestrator", description="Analyze technical debt and calculate ROI for fixes (900/1000 value)", inputSchema={ "type": "object", "properties": { "scope": { "type": "string", "enum": ["file", "directory", "project"], "default": "file", "description": "Scope of analysis" }, "path": { "type": "string", "description": "Path to analyze", "default": "." }, "include_roi": { "type": "boolean", "default": True, "description": "Calculate ROI for debt items" } } } ), Tool( name="test_gap_analyzer", description="Find untested code and generate test suggestions (880/1000 value)", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "Path to analyze", "default": "." }, "include_private": { "type": "boolean", "default": False, "description": "Include private methods" }, "min_complexity": { "type": "integer", "default": 5, "description": "Minimum complexity to flag" }, "generate_tests": { "type": "boolean", "default": True, "description": "Generate test suggestions" } } } ), Tool( name="import_optimizer", description="Analyze and optimize imports (650/1000 value)", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "File to analyze" }, "code": { "type": "string", "description": "Code to analyze (if file_path not provided)" }, "auto_fix": { "type": "boolean", "default": False, "description": "Return fixed code" } } } ), # Context-aware versions for large files Tool( name="review_large_file", description="Review large files with intelligent context management", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to large file" }, "max_tokens": { "type": "integer", "default": 50000, "description": "Maximum tokens per chunk" }, "focus_areas": { "type": "array", "items": {"type": "string"}, "description": "Areas to focus on (errors, complexity, security)" }, "model": { "type": "string", "default": "claude-3-opus", "description": "Model to use for context limits" } }, "required": ["file_path"] } ), Tool( name="analyze_with_context", description="Run any analysis with automatic context management", inputSchema={ "type": "object", "properties": { "flow": { "type": "string", "enum": ["instant_review", "debt_orchestrator", "test_gap_analyzer"], "description": "Analysis flow to run" }, "target": { "type": "string", "description": "File or directory to analyze" }, "model": { "type": "string", "default": "claude-3-opus", "description": "Model for context limits" }, "fallback_models": { "type": "array", "items": {"type": "string"}, "description": "Models to try if primary fails" } }, "required": ["flow", "target"] } ), # Model information tools Tool( name="get_available_models", description="Get list of all available AI models with capabilities and pricing", inputSchema={ "type": "object", "properties": {} } ), Tool( name="compare_models", description="Compare two AI models side by side", inputSchema={ "type": "object", "properties": { "model1": { "type": "string", "description": "First model name (e.g., claude-3-opus)" }, "model2": { "type": "string", "description": "Second model name (e.g., gpt-4-turbo)" } }, "required": ["model1", "model2"] } ), Tool( name="suggest_model_for_task", description="Get AI model recommendation for specific task and budget", inputSchema={ "type": "object", "properties": { "task_type": { "type": "string", "description": "Type of task (code_review, test_generation, documentation, debugging, quick_task)", "enum": ["code_review", "test_generation", "documentation", "debugging", "quick_task"] }, "context_size": { "type": "integer", "description": "Estimated tokens needed" }, "budget": { "type": "number", "description": "Maximum cost in USD (optional)" } }, "required": ["task_type", "context_size"] } ), Tool( name="get_current_model_status", description="Get current AI model being used and its status", inputSchema={ "type": "object", "properties": {} } ), Tool( name="estimate_operation_cost", description="Estimate cost for a specific operation", inputSchema={ "type": "object", "properties": { "operation": { "type": "string", "description": "Operation name (e.g., code_review, test_generation)" }, "input_size": { "type": "integer", "description": "Input tokens" }, "output_size": { "type": "integer", "description": "Expected output tokens", "default": 2000 }, "model": { "type": "string", "description": "Specific model to use (optional, uses current if not specified)" } }, "required": ["operation", "input_size"] } ), Tool( name="generate_dependency_graph", description="Generate dependency graph in various text formats (DOT, Mermaid, JSON)", inputSchema={ "type": "object", "properties": { "output_format": { "type": "string", "description": "Output format: 'dot' (Graphviz), 'mermaid' (GitHub/GitLab), or 'json'", "enum": ["dot", "mermaid", "json"], "default": "mermaid" }, "graph_type": { "type": "string", "description": "Type of graph to generate", "enum": ["imports", "calls", "inheritance"], "default": "imports" }, "file_pattern": { "type": "string", "description": "File pattern to analyze (e.g., '*.py', 'src/**/*.js')" }, "detect_cycles": { "type": "boolean", "description": "Detect and highlight circular dependencies", "default": false } }, "required": [] } ) ] def setup_handlers(self): """Set up handlers for tool calls""" @self.server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> list[TextContent]: try: if name == "search": return await self.handle_search(arguments) elif name == "index": return await self.handle_index(arguments) elif name == "analyze_symbol": return await self.handle_analyze_symbol(arguments) elif name == "find_similar": return await self.handle_find_similar(arguments) elif name == "get_context": return await self.handle_get_context(arguments) # Orchestrated tools elif name == "instant_review": return await self.handle_instant_review(arguments) elif name == "debt_orchestrator": return await self.handle_debt_orchestrator(arguments) elif name == "test_gap_analyzer": return await self.handle_test_gap_analyzer(arguments) elif name == "import_optimizer": return await self.handle_import_optimizer(arguments) # Context-aware tools elif name == "review_large_file": return await self.handle_review_large_file(arguments) elif name == "analyze_with_context": return await self.handle_analyze_with_context(arguments) # Model info tools elif name == "get_available_models": return await self.handle_get_available_models(arguments) elif name == "compare_models": return await self.handle_compare_models(arguments) elif name == "suggest_model_for_task": return await self.handle_suggest_model_for_task(arguments) elif name == "get_current_model_status": return await self.handle_get_current_model_status(arguments) elif name == "estimate_operation_cost": return await self.handle_estimate_operation_cost(arguments) elif name == "generate_dependency_graph": return await self.handle_generate_dependency_graph(arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Error in tool {name}: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] def setup_resources(self): """Set up resources to provide context about available projects""" @self.server.list_resources() async def list_resources() -> list[Resource]: resources = [] # Add resource for current working directory cwd = Path.cwd() resources.append(Resource( uri=f"project://{cwd.name}", name=f"Current Project: {cwd.name}", mimeType="text/plain", description=f"Working directory: {cwd}" )) # Add resources for indexed projects for project_path in self.project_instances.keys(): project_name = Path(project_path).name resources.append(Resource( uri=f"project://indexed/{project_name}", name=f"Indexed: {project_name}", mimeType="text/plain", description=f"Indexed project at {project_path}" )) # Add resource for common directories common_dirs = [ Path.home() / "Documents", Path.home() / "Projects", Path.home() / "Development", Path("/mnt/c/Users") if Path("/mnt/c").exists() else None ] for dir_path in common_dirs: if dir_path and dir_path.exists(): resources.append(Resource( uri=f"directory://{dir_path.name}", name=f"Directory: {dir_path.name}", mimeType="text/plain", description=f"Available at {dir_path}" )) return resources @self.server.read_resource() async def read_resource(uri: str) -> str: if uri.startswith("project://"): parts = uri.replace("project://", "").split("/") if parts[0] == "indexed" and len(parts) > 1: # Return info about indexed project for project_path, instance in self.project_instances.items(): if Path(project_path).name == parts[1]: return f"Project: {parts[1]}\nPath: {project_path}\nStatus: Indexed and ready for search" else: # Return info about current project cwd = Path.cwd() return f"Current working directory: {cwd}\nProject name: {cwd.name}" elif uri.startswith("directory://"): dir_name = uri.replace("directory://", "") return f"Directory {dir_name} is available for indexing and search" return "Resource not found" def setup_prompts(self): """Set up prompts to suggest common actions""" @self.server.list_prompts() async def list_prompts() -> list[Prompt]: return [ Prompt( name="index_project", description="Index a project for semantic search", arguments=[ PromptArgument( name="project_path", description="Path to the project to index", required=True ) ] ), Prompt( name="search_code", description="Search for code by meaning", arguments=[ PromptArgument( name="query", description="What to search for", required=True ), PromptArgument( name="project_path", description="Project to search in (optional)", required=False ) ] ), Prompt( name="analyze_current_project", description="Analyze and index the current working directory", arguments=[] ), Prompt( name="find_implementation", description="Find where something is implemented", arguments=[ PromptArgument( name="symbol", description="Function, class, or variable name", required=True ) ] ) ] @self.server.get_prompt() async def get_prompt(name: str, arguments: Dict[str, Any]) -> list[TextContent]: if name == "index_project": project_path = arguments.get("project_path", ".") return [TextContent( type="text", text=f"Please index the project at {project_path} using the 'index' tool, then search for relevant code." )] elif name == "search_code": query = arguments.get("query", "") project_path = arguments.get("project_path", ".") return [TextContent( type="text", text=f"Search for '{query}' in project {project_path} using the 'search' tool." )] elif name == "analyze_current_project": cwd = Path.cwd() return [TextContent( type="text", text=f"Index the current project at {cwd} and provide an overview of its structure." )] elif name == "find_implementation": symbol = arguments.get("symbol", "") return [TextContent( type="text", text=f"Use 'analyze_symbol' tool to find where '{symbol}' is implemented." )] return [TextContent(type="text", text="Unknown prompt")] def get_or_create_orchestrator(self, project_path: str = ".") -> OrchestrationEngine: """Get or create the orchestration engine""" if self.orchestrator is None: # Initialize orchestrator with database db_path = Path(project_path) / ".claude-symbols" / "search.db" if not db_path.exists(): # Try default location db_path = Path(".claude-symbols/search.db") if db_path.exists(): db = ThreadSafeDB(str(db_path)) self.orchestrator = OrchestrationEngine(db, project_path) else: raise ValueError(f"Database not found. Please index the project first using the 'index' tool.") return self.orchestrator async def handle_instant_review(self, args: Dict[str, Any]) -> list[TextContent]: """Handle instant_review orchestrated tool""" try: # Get orchestrator orchestrator = self.get_or_create_orchestrator() # Prepare inputs inputs = {} if "file_path" in args: file_path = Path(args["file_path"]) if file_path.exists(): with open(file_path, 'r', encoding='utf-8') as f: inputs["code"] = f.read() inputs["file_path"] = str(file_path) else: return [TextContent(type="text", text=f"File not found: {file_path}")] elif "code" in args: inputs["code"] = args["code"] else: return [TextContent(type="text", text="Please provide either 'file_path' or 'code' to review")] # Execute orchestrated review result = await orchestrator.execute_flow("instant_review", inputs) # Format results output = ["๐Ÿ” **Instant Code Review**\n"] if "agents" in result: # Get aggregated review results if "review_aggregator" in result["agents"]: review = result["agents"]["review_aggregator"] # Show quality score and grade if "summary" in review: summary = review["summary"] output.append(f"๐Ÿ“Š **Quality Score**: {summary.get('quality_score', 'N/A')}/100 (Grade: {summary.get('grade', 'N/A')})") output.append(f"๐Ÿ”ด High: {summary.get('high_severity', 0)} | ๐ŸŸก Medium: {summary.get('medium_severity', 0)} | ๐ŸŸข Low: {summary.get('low_severity', 0)}\n") # Show issues if "issues" in review and review["issues"]: output.append("**Issues Found:**") for issue in review["issues"][:10]: # Top 10 issues severity_icon = {"high": "๐Ÿ”ด", "medium": "๐ŸŸก", "low": "๐ŸŸข"}.get(issue.get("severity", "low"), "โšช") output.append(f"{severity_icon} {issue.get('type', 'Unknown')}: {issue}") output.append("") # Show suggestions if "suggestions" in review and review["suggestions"]: output.append("**Suggestions:**") for suggestion in review["suggestions"]: output.append(f"โ€ข {suggestion}") # Show priority actions if available if "priority_ranker" in result["agents"]: ranker = result["agents"]["priority_ranker"] if "action_plan" in ranker and ranker["action_plan"]: output.append("\n**Priority Actions:**") for action in ranker["action_plan"][:5]: output.append(f"{action['order']}. {action['action']}") # Show execution time if "execution_ms" in result: output.append(f"\nโฑ๏ธ Execution time: {result['execution_ms']}ms") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in instant_review: {e}") return [TextContent(type="text", text=f"Error performing code review: {str(e)}")] async def handle_debt_orchestrator(self, args: Dict[str, Any]) -> list[TextContent]: """Handle debt_orchestrator tool""" try: orchestrator = self.get_or_create_orchestrator() # Prepare inputs based on scope scope = args.get("scope", "file") path = args.get("path", ".") inputs = {"path": path, "scope": scope} # Read code based on scope if scope == "file": file_path = Path(path) if file_path.exists() and file_path.is_file(): with open(file_path, 'r', encoding='utf-8') as f: inputs["code"] = f.read() else: return [TextContent(type="text", text=f"File not found: {path}")] # Execute debt analysis result = await orchestrator.execute_flow("debt_orchestrator", inputs) # Format results output = ["๐Ÿ’ฐ **Technical Debt Analysis**\n"] if "agents" in result and "debt_calculator" in result["agents"]: debt = result["agents"]["debt_calculator"] # Summary metrics output.append(f"**Total Debt**: {debt.get('total_debt_hours', 0):.1f} hours (${debt.get('total_debt_cost', 0):,.0f})") output.append(f"**High Priority**: {debt.get('high_priority_hours', 0):.1f} hours") output.append(f"**Recommendation**: {debt.get('recommendation', 'N/A')}\n") # Quick wins if "quick_wins" in debt and debt["quick_wins"]: output.append("**Quick Wins** (< 30 min each):") for item in debt["quick_wins"]: output.append(f"โ€ข {item['type']}: {item['hours']:.1f}h (${item['cost']:.0f})") output.append("") # All debt items if "debt_items" in debt and debt["debt_items"]: output.append("**Debt Items by ROI:**") for item in debt["debt_items"][:10]: roi_stars = "โญ" * min(5, int(item.get("roi", 0))) output.append(f"{roi_stars} {item['type']}: {item['hours']:.1f}h (${item['cost']:.0f}) - {item['priority']}") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in debt_orchestrator: {e}") return [TextContent(type="text", text=f"Error analyzing technical debt: {str(e)}")] async def handle_test_gap_analyzer(self, args: Dict[str, Any]) -> list[TextContent]: """Handle test_gap_analyzer tool""" try: orchestrator = self.get_or_create_orchestrator() path = args.get("path", ".") inputs = { "path": path, "include_private": args.get("include_private", False), "min_complexity": args.get("min_complexity", 5) } # Read code if it's a file file_path = Path(path) if file_path.exists() and file_path.is_file(): with open(file_path, 'r', encoding='utf-8') as f: inputs["code"] = f.read() # Execute test gap analysis result = await orchestrator.execute_flow("test_gap_analyzer", inputs) # Format results output = ["๐Ÿงช **Test Gap Analysis**\n"] if "agents" in result: # Show function extraction results if "function_extractor" in result["agents"]: functions = result["agents"]["function_extractor"] output.append(f"**Functions Found**: {functions.get('count', 0)}") # Show test gaps if "test_gap_finder" in result["agents"]: gaps = result["agents"]["test_gap_finder"] coverage = gaps.get("coverage_percent", 0) # Coverage indicator coverage_icon = "๐ŸŸข" if coverage >= 80 else "๐ŸŸก" if coverage >= 60 else "๐Ÿ”ด" output.append(f"{coverage_icon} **Coverage**: {coverage}%") output.append(f"**Tested**: {gaps.get('tested_count', 0)} | **Untested**: {len(gaps.get('untested', []))}\n") # List untested functions if "untested" in gaps and gaps["untested"]: output.append("**Untested Functions:**") for func in gaps["untested"][:10]: output.append(f"โ€ข {func.get('name', 'Unknown')} (line {func.get('line', '?')})") output.append("") # Show generated tests if available if "test_generator" in result["agents"] and args.get("generate_tests", True): test_code = result["agents"]["test_generator"] if isinstance(test_code, str) and test_code: output.append("**Generated Test Template:**") output.append("```python") output.append(test_code[:500]) # First 500 chars output.append("```") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in test_gap_analyzer: {e}") return [TextContent(type="text", text=f"Error analyzing test gaps: {str(e)}")] async def handle_import_optimizer(self, args: Dict[str, Any]) -> list[TextContent]: """Handle import_optimizer tool""" try: orchestrator = self.get_or_create_orchestrator() inputs = {} if "file_path" in args: file_path = Path(args["file_path"]) if file_path.exists(): with open(file_path, 'r', encoding='utf-8') as f: inputs["code"] = f.read() inputs["file_path"] = str(file_path) else: return [TextContent(type="text", text=f"File not found: {file_path}")] elif "code" in args: inputs["code"] = args["code"] else: return [TextContent(type="text", text="Please provide either 'file_path' or 'code' to analyze")] # Execute import optimization result = await orchestrator.execute_flow("import_optimizer", inputs) # Format results output = ["๐Ÿ“ฆ **Import Analysis**\n"] if "agents" in result and "import_analyzer" in result["agents"]: analysis = result["agents"]["import_analyzer"] # Summary output.append(f"**Total Imports**: {analysis.get('count', 0)}") output.append(f"**Unused Imports**: {analysis.get('unused_count', 0)}\n") # Show all imports if "imports" in analysis and analysis["imports"]: output.append("**All Imports:**") for imp in analysis["imports"][:20]: # First 20 module = imp.get("module", "Unknown") line = imp.get("line", "?") alias = f" as {imp['alias']}" if imp.get("alias") else "" output.append(f" Line {line}: {module}{alias}") output.append("") # Show unused imports if "unused" in analysis and analysis["unused"]: output.append("**๐Ÿ—‘๏ธ Unused Imports (can be removed):**") for imp in analysis["unused"]: output.append(f" Line {imp.get('line', '?')}: {imp.get('module', 'Unknown')}") # Provide auto-fix if requested if args.get("auto_fix", False) and inputs.get("code"): output.append("\n**Auto-fix:** Remove the unused imports listed above from your code.") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in import_optimizer: {e}") return [TextContent(type="text", text=f"Error optimizing imports: {str(e)}")] def get_or_create_instance(self, project_path: str) -> SmartCodeSearch: """Get or create a SmartCodeSearch instance for a project""" abs_path = str(Path(project_path).absolute()) if abs_path not in self.project_instances: # Create instance silently self.project_instances[abs_path] = SmartCodeSearch(abs_path, quiet=True) return self.project_instances[abs_path] async def handle_search(self, args: Dict[str, Any]) -> list[TextContent]: """Handle search tool call""" query = args.get("query", "") project_path = args.get("project_path", ".") limit = args.get("limit", 10) search_type = args.get("search_type", "hybrid") search_instance = self.get_or_create_instance(project_path) # Run search in thread pool to avoid blocking loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, search_instance.search, query, limit ) if not results: return [TextContent(type="text", text="No results found")] # Format results formatted_results = [] for result in results: # Handle both dict and tuple formats if isinstance(result, dict): file_path = result.get('file_path', '') symbol = result.get('name', '') content = result.get('content', '') score = result.get('score', 0.0) line_num = result.get('line_num', 0) formatted_results.append( f"๐Ÿ“„ {file_path}:{line_num}\n" f" Symbol: {symbol}\n" f" Score: {score:.3f}\n" f" Content:\n{content[:200]}...\n" ) else: # Tuple format from clean_search file_path, symbol, content, score = result formatted_results.append( f"๐Ÿ“„ {file_path}\n" f" Symbol: {symbol}\n" f" Score: {score:.3f}\n" f" Content:\n{content[:200]}...\n" ) return [TextContent(type="text", text="\n".join(formatted_results))] async def handle_index(self, args: Dict[str, Any]) -> list[TextContent]: """Handle index tool call""" project_path = args.get("project_path", ".") force = args.get("force", False) search_instance = self.get_or_create_instance(project_path) # Run indexing in thread pool loop = asyncio.get_event_loop() await loop.run_in_executor( None, search_instance.index_project, force ) return [TextContent( type="text", text=f"โœ… Project indexed successfully: {project_path}" )] async def handle_analyze_symbol(self, args: Dict[str, Any]) -> list[TextContent]: """Handle analyze_symbol tool call""" symbol_name = args.get("symbol_name", "") project_path = args.get("project_path", ".") search_instance = self.get_or_create_instance(project_path) # Search for the symbol loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, search_instance.search, symbol_name, 20 ) if not results: return [TextContent(type="text", text=f"Symbol '{symbol_name}' not found")] # Analyze results definitions = [] references = [] tests = [] for result in results: # Handle both dict and tuple formats if isinstance(result, dict): file_path = result.get('file_path', '') content = result.get('content', '') else: file_path, _, content, _ = result if "def " + symbol_name in content or "class " + symbol_name in content: definitions.append((file_path, content[:300])) elif "test" in file_path.lower() and args.get("include_tests", True): tests.append((file_path, content[:200])) elif args.get("include_references", True): references.append((file_path, content[:200])) output = [f"๐Ÿ” Analysis of '{symbol_name}':\n"] if definitions: output.append("\n๐Ÿ“ Definitions:") for path, content in definitions[:3]: output.append(f" {path}:\n {content}") if references: output.append(f"\n๐Ÿ“Ž References ({len(references)} found):") for path, content in references[:5]: output.append(f" {path}") if tests: output.append(f"\n๐Ÿงช Tests ({len(tests)} found):") for path, content in tests[:3]: output.append(f" {path}") return [TextContent(type="text", text="\n".join(output))] async def handle_find_similar(self, args: Dict[str, Any]) -> list[TextContent]: """Handle find_similar tool call""" code_snippet = args.get("code_snippet", "") project_path = args.get("project_path", ".") limit = args.get("limit", 5) search_instance = self.get_or_create_instance(project_path) # Use the code snippet as a semantic search query loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, search_instance.search, code_snippet, limit ) if not results: return [TextContent(type="text", text="No similar code found")] formatted_results = ["๐Ÿ”„ Similar code found:\n"] for result in results: # Handle both dict and tuple formats if isinstance(result, dict): file_path = result.get('file_path', '') content = result.get('content', '') score = result.get('score', 0.0) else: file_path, _, content, score = result formatted_results.append( f"๐Ÿ“„ {file_path} (similarity: {score:.3f})\n" f" {content[:200]}...\n" ) return [TextContent(type="text", text="\n".join(formatted_results))] async def handle_get_context(self, args: Dict[str, Any]) -> list[TextContent]: """Handle get_context tool call""" project_path = args.get("project_path", ".") recent_files = args.get("recent_files", []) recent_symbols = args.get("recent_symbols", []) search_instance = self.get_or_create_instance(project_path) context_items = [] # Get context for recent symbols for symbol in recent_symbols[:3]: loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, search_instance.search, symbol, 3 ) for result in results[:2]: # Handle both dict and tuple formats if isinstance(result, dict): file_path = result.get('file_path', '') else: file_path, _, _, _ = result context_items.append(f"Related to '{symbol}': {file_path}") # Get context for recent files (find related files) for file_path in recent_files[:3]: # Extract likely symbols from filename base_name = Path(file_path).stem loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, search_instance.search, base_name, 3 ) for result in results[:2]: # Handle both dict and tuple formats if isinstance(result, dict): found_path = result.get('file_path', '') else: found_path, _, _, _ = result if found_path != file_path: context_items.append(f"Related to {file_path}: {found_path}") if not context_items: return [TextContent(type="text", text="No additional context found")] # Remove duplicates and limit to 10 items unique_context = list(set(context_items))[:10] output = "๐Ÿ“š Relevant context:\n" + "\n".join(unique_context) return [TextContent(type="text", text=output)] async def handle_review_large_file(self, args: Dict[str, Any]) -> list[TextContent]: """Handle review_large_file tool call with context management""" file_path = args.get("file_path", "") focus_areas = args.get("focus_areas", ["errors", "complexity", "best_practices"]) max_file_size = args.get("max_file_size", 1000000) # 1MB default # Check file exists path = Path(file_path) if not path.exists(): return [TextContent(type="text", text=f"Error: File '{file_path}' not found")] # Check file size file_size = path.stat().st_size if file_size > max_file_size * 10: # Absolute max 10MB return [TextContent(type="text", text=f"Error: File too large ({file_size:,} bytes)")] # Initialize context-aware orchestrator if not exists if not hasattr(self, 'context_orchestrator'): db = self.get_or_create_db() from src.core.context_aware_orchestrator import ContextAwareOrchestrator self.context_orchestrator = ContextAwareOrchestrator(db, ".") # Read file content try: content = path.read_text(encoding='utf-8') except Exception as e: return [TextContent(type="text", text=f"Error reading file: {str(e)}")] # Prepare inputs for review inputs = { "code": content, "file_path": str(path), "focus_areas": focus_areas, "_file_size": file_size } # Execute context-aware review try: result = await self.context_orchestrator.execute_flow("instant_review", inputs) # Format results output = [f"๐Ÿ“‹ Large File Review: {path.name}\n"] output.append(f"๐Ÿ“Š File size: {file_size:,} bytes") # Add context metadata if "_context_metadata" in result: meta = result["_context_metadata"] output.append(f"๐Ÿง  Context: {meta['utilization']} used, {meta['items']} items") if "compression_stats" in meta: stats = meta["compression_stats"] if stats.get("items_compressed", 0) > 0: output.append(f"๐Ÿ“ฆ Compressed {stats['items_compressed']} items") # Add chunk info if file was chunked if "total_chunks" in result: output.append(f"๐Ÿ”€ Processed in {result['total_chunks']} chunks") # Add review results if "summary" in result: output.append(f"\n๐Ÿ“ Summary:") output.append(f" Total issues: {result['summary'].get('total_issues', 0)}") output.append(f" Total errors: {result['summary'].get('total_errors', 0)}") # Add key findings if "all_issues" in result and result["all_issues"]: output.append("\nโš ๏ธ Key Issues Found:") for issue in result["all_issues"][:10]: # Top 10 issues chunk_info = f"[chunk {issue.get('chunk', 1)}] " if "chunk" in issue else "" output.append(f" {chunk_info}{issue.get('agent', 'unknown')}: {issue.get('description', str(issue))}") # Add recommendations if "agents" in result: for agent_name, agent_results in result["agents"].items(): if isinstance(agent_results, list): # Multiple chunks for chunk_result in agent_results[:2]: # First 2 chunks if isinstance(chunk_result, dict) and "result" in chunk_result: res = chunk_result["result"] if isinstance(res, dict) and "recommendations" in res: output.append(f"\n๐Ÿ’ก {agent_name} recommendations:") for rec in res["recommendations"][:3]: output.append(f" โ€ข {rec}") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in large file review: {e}") return [TextContent(type="text", text=f"Error during review: {str(e)}")] async def handle_analyze_with_context(self, args: Dict[str, Any]) -> list[TextContent]: """Handle analyze_with_context tool call with automatic context management""" flow = args.get("flow", "instant_review") code = args.get("code", "") file_path = args.get("file_path", None) model = args.get("model", "claude-3-opus") fallback_models = args.get("fallback_models", ["claude-3-opus", "gpt-4-turbo", "gpt-4"]) # Initialize context-aware orchestrator if not exists if not hasattr(self, 'context_orchestrator'): db = self.get_or_create_db() from src.core.context_aware_orchestrator import ContextAwareOrchestrator self.context_orchestrator = ContextAwareOrchestrator(db, ".", model) # Prepare inputs inputs = {"code": code} if file_path: path = Path(file_path) if path.exists(): try: inputs["code"] = path.read_text(encoding='utf-8') inputs["file_path"] = str(path) except Exception as e: return [TextContent(type="text", text=f"Error reading file: {str(e)}")] # Execute with model fallback try: result = await self.context_orchestrator.execute_with_model_fallback( flow, inputs, fallback_models ) # Format results output = [f"๐Ÿ”„ Context-Aware Analysis: {flow}\n"] # Add model info if "_model_used" in result: output.append(f"๐Ÿค– Model: {result['_model_used']}") # Add context metadata if "_context_metadata" in result: meta = result["_context_metadata"] output.append(f"๐Ÿง  Context usage: {meta['utilization']}") output.append(f"๐Ÿ“Š Items in context: {meta['items']}") # Show compression stats if any if "compression_stats" in meta: stats = meta["compression_stats"] if stats.get("items_compressed", 0) > 0: output.append(f"๐Ÿ“ฆ Compressed: {stats['items_compressed']} items") if stats.get("items_dropped", 0) > 0: output.append(f"๐Ÿ—‘๏ธ Dropped: {stats['items_dropped']} low-priority items") # Add chunk info if applicable if "total_chunks" in result: output.append(f"๐Ÿ”€ Processed in {result['total_chunks']} chunks") # Add flow-specific results if "agents" in result: output.append(f"\n๐Ÿ“‹ Analysis Results:") for agent_name, agent_result in result["agents"].items(): if isinstance(agent_result, list): # Multiple chunks output.append(f" {agent_name}: {len(agent_result)} chunks processed") elif isinstance(agent_result, dict): # Single result if "score" in agent_result: output.append(f" {agent_name}: score {agent_result['score']}") elif "issues" in agent_result: output.append(f" {agent_name}: {len(agent_result['issues'])} issues found") elif "error" in agent_result: output.append(f" {agent_name}: โŒ {agent_result['error']}") # Add summary if available if "summary" in result: output.append(f"\n๐Ÿ“Š Summary:") for key, value in result["summary"].items(): output.append(f" {key}: {value}") # Add execution time if "execution_ms" in result: output.append(f"\nโฑ๏ธ Execution time: {result['execution_ms']}ms") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in context-aware analysis: {e}") return [TextContent(type="text", text=f"Error during analysis: {str(e)}")] async def handle_get_available_models(self, args: Dict[str, Any]) -> list[TextContent]: """Handle get_available_models tool call""" try: from .tools.model_info_tools import ModelInfoTools tools = ModelInfoTools() result = await tools.get_available_models() output = ["๐Ÿ“Š Available AI Models\n"] # Summary summary = result.get("summary", {}) output.append(f"Total models: {summary['total_models']}") output.append(f"Providers available: {summary['providers_available']}") if summary['cheapest_model']: cheap = summary['cheapest_model'] output.append(f"๐Ÿ’ฐ Cheapest: {cheap['name']} ({cheap['provider']}) - ${cheap['cost']:.4f}/1k") if summary['largest_context']: large = summary['largest_context'] output.append(f"๐Ÿ“ Largest context: {large['name']} ({large['provider']}) - {large['size']:,} tokens") # Providers and models output.append("\n๐Ÿค– Models by Provider:") for provider, info in result.get("providers", {}).items(): output.append(f"\n{provider.upper()} (API key: {'โœ…' if info['has_api_key'] else 'โŒ'})") for model in info['models']: output.append(f" โ€ข {model['name']}") output.append(f" Context: {model['context_window']:,} tokens") output.append(f" Max output: {model['max_output']:,} tokens") output.append(f" Cost: ${model['cost']['input_per_1k']:.4f} in / ${model['cost']['output_per_1k']:.4f} out") features = [] if model['supports_vision']: features.append("๐Ÿ‘๏ธ Vision") if model['supports_functions']: features.append("๐Ÿ”ง Functions") if model['supports_json_mode']: features.append("๐Ÿ“‹ JSON mode") if features: output.append(f" Features: {', '.join(features)}") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in get_available_models: {e}") return [TextContent(type="text", text=f"Error getting model information: {str(e)}")] async def handle_compare_models(self, args: Dict[str, Any]) -> list[TextContent]: """Handle compare_models tool call""" try: from .tools.model_info_tools import ModelInfoTools model1 = args.get("model1") model2 = args.get("model2") if not model1 or not model2: return [TextContent(type="text", text="Error: Both model1 and model2 are required")] tools = ModelInfoTools() result = await tools.compare_models(model1, model2) output = [f"๐Ÿ“Š Model Comparison: {model1} vs {model2}\n"] # Add comparison details if "comparison" in result: comp = result["comparison"] if "context_ratio" in comp: output.append(f"Context window ratio: {comp['context_ratio']:.2f}x") output.append(f"Larger context: {comp.get('larger_context', 'N/A')}") if "cost_ratio" in comp: cost = comp["cost_ratio"] output.append(f"\nCost comparison:") output.append(f" Input cost ratio: {cost.get('input', 0):.2f}x") output.append(f" Output cost ratio: {cost.get('output', 0):.2f}x") output.append(f" Cheaper model: {comp.get('cheaper', 'N/A')}") if "vision_support" in comp: output.append(f"\nVision support:") for model, has_vision in comp["vision_support"].items(): output.append(f" {model}: {'โœ…' if has_vision else 'โŒ'}") # Add recommendations if "recommendations" in result: output.append(f"\n๐Ÿ’ก Recommendations:") for rec in result["recommendations"]: output.append(f" โ€ข {rec}") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in compare_models: {e}") return [TextContent(type="text", text=f"Error comparing models: {str(e)}")] async def handle_suggest_model_for_task(self, args: Dict[str, Any]) -> list[TextContent]: """Handle suggest_model_for_task tool call""" try: from .tools.model_info_tools import ModelInfoTools task_type = args.get("task_type", "general") context_size = args.get("context_size", 10000) budget = args.get("budget", None) tools = ModelInfoTools() result = await tools.suggest_model_for_task(task_type, context_size, budget) output = [f"๐ŸŽฏ Model Suggestion for {task_type}\n"] if result.get("error"): output.append(f"โŒ {result['error']}") if result.get("reason"): output.append(f"Reason: {result['reason']}") else: output.append(f"โœ… Suggested model: {result['suggestion']}") output.append(f"Provider: {result['provider']}") output.append(f"Context window: {result['context_window']:,} tokens") output.append(f"Estimated cost: ${result['estimated_cost']:.4f}") if result.get("reasoning"): output.append(f"\n๐Ÿ“ Reasoning: {result['reasoning']}") if result.get("alternatives"): output.append(f"\n๐Ÿ”„ Alternative models:") for alt in result["alternatives"]: output.append(f" โ€ข {alt['model']} ({alt['provider']})") output.append(f" Cost: ${alt['cost']:.4f}") output.append(f" Context: {alt['context']:,} tokens") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in suggest_model_for_task: {e}") return [TextContent(type="text", text=f"Error suggesting model: {str(e)}")] async def handle_get_current_model_status(self, args: Dict[str, Any]) -> list[TextContent]: """Handle get_current_model_status tool call""" try: from .tools.model_info_tools import ModelInfoTools tools = ModelInfoTools() result = await tools.get_current_model_status() output = [f"๐Ÿค– Current Model Status\n"] output.append(f"Model: {result['current_model']}") output.append(f"Provider: {result['provider']}") # Capabilities cap = result['capabilities'] output.append(f"\n๐Ÿ“Š Capabilities:") output.append(f" Context window: {cap['context_window']:,} tokens") output.append(f" Max output: {cap['max_output']:,} tokens") output.append(f" Usable context: {cap['usable_context']:,} tokens") features = [] if cap['supports_vision']: features.append("๐Ÿ‘๏ธ Vision") if cap['supports_functions']: features.append("๐Ÿ”ง Functions") if cap['supports_json_mode']: features.append("๐Ÿ“‹ JSON mode") if features: output.append(f" Features: {', '.join(features)}") # Cost estimate if result.get('cost_estimate'): cost = result['cost_estimate'] output.append(f"\n๐Ÿ’ฐ Typical cost estimate:") output.append(f" Total: ${cost.get('total', 0):.4f}") output.append(f" (10k tokens in, 2k tokens out)") # Context usage if result.get('context_usage'): usage = result['context_usage'] output.append(f"\n๐Ÿ“ˆ Current context usage:") output.append(f" Tokens used: {usage['tokens_used']:,}") output.append(f" Tokens available: {usage['tokens_available']:,}") output.append(f" Utilization: {usage['utilization']}") # Recommendations if result.get('recommendations'): output.append(f"\n๐Ÿ’ก Recommendations:") for rec in result['recommendations']: output.append(f" โ€ข {rec}") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in get_current_model_status: {e}") return [TextContent(type="text", text=f"Error getting model status: {str(e)}")] async def handle_estimate_operation_cost(self, args: Dict[str, Any]) -> list[TextContent]: """Handle estimate_operation_cost tool call""" try: from .tools.model_info_tools import ModelInfoTools operation = args.get("operation", "general") input_size = args.get("input_size", 10000) output_size = args.get("output_size", 2000) model = args.get("model", None) tools = ModelInfoTools() result = await tools.estimate_operation_cost(operation, input_size, output_size, model) output = [f"๐Ÿ’ฐ Cost Estimate for '{operation}'\n"] output.append(f"Model: {result['model']}") # Token counts tokens = result['tokens'] output.append(f"\n๐Ÿ“Š Token usage:") output.append(f" Input: {tokens['input']:,} tokens") output.append(f" Output: {tokens['output']:,} tokens") output.append(f" Total: {tokens['total']:,} tokens") # Cost breakdown cost = result['cost'] output.append(f"\n๐Ÿ’ต Cost breakdown:") output.append(f" Input cost: ${cost['breakdown']['input']:.4f}") output.append(f" Output cost: ${cost['breakdown']['output']:.4f}") output.append(f" Total cost: ${cost['total']:.4f}") # Cheaper alternatives if result.get('cheaper_alternatives'): output.append(f"\n๐Ÿ”„ Cheaper alternatives:") for alt in result['cheaper_alternatives']: output.append(f" โ€ข {alt['model']} ({alt['provider']})") output.append(f" Cost: ${alt['cost']:.4f}") output.append(f" Savings: ${alt['savings']:.4f} ({alt['savings_percent']:.0f}%)") # Recommendation if result.get('recommendation'): output.append(f"\n{result['recommendation']}") return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in estimate_operation_cost: {e}") return [TextContent(type="text", text=f"Error estimating cost: {str(e)}")] async def handle_generate_dependency_graph(self, args: Dict[str, Any]) -> list[TextContent]: """Handle generate_dependency_graph tool call""" try: from .tools.graph_export import GraphExporter from .core.dependency_analyzer import DependencyAnalyzer from pathlib import Path output_format = args.get("output_format", "mermaid") graph_type = args.get("graph_type", "imports") file_pattern = args.get("file_pattern", "*.py") detect_cycles = args.get("detect_cycles", False) # Analyze project dependencies analyzer = DependencyAnalyzer(Path.cwd()) # Analyze files matching pattern files_analyzed = 0 for file_path in Path.cwd().rglob(file_pattern): if file_path.is_file() and not any(part.startswith('.') for part in file_path.parts): try: analyzer.analyze_file(file_path) files_analyzed += 1 except Exception: pass # Skip files that can't be analyzed if files_analyzed == 0: return [TextContent(type="text", text=f"No files found matching pattern: {file_pattern}")] # Create exporter and generate graph exporter = GraphExporter(dependency_analyzer=analyzer) # Generate the graph if output_format == "dot": graph_output = exporter.export_to_dot(graph_type, f"{graph_type.title()} Dependency Graph") elif output_format == "mermaid": graph_output = exporter.export_to_mermaid(graph_type, f"{graph_type.title()} Dependency Graph") elif output_format == "json": graph_output = exporter.export_to_json(graph_type, include_metadata=True) else: return [TextContent(type="text", text=f"Unsupported format: {output_format}")] # Detect circular dependencies if requested cycles_info = "" if detect_cycles: cycles = exporter.detect_circular_dependencies(graph_type) if cycles: cycles_info = f"\n\nโš ๏ธ **Circular Dependencies Detected:**\n" for cycle in cycles: cycles_info += f" โ€ข {' โ†’ '.join(cycle)}\n" else: cycles_info = "\n\nโœ… No circular dependencies detected." # Format output output = [f"๐Ÿ“Š **Dependency Graph** ({output_format.upper()} format)\n"] output.append(f"Files analyzed: {files_analyzed}") output.append(f"Graph type: {graph_type}") output.append(f"\n{graph_output}") if cycles_info: output.append(cycles_info) return [TextContent(type="text", text="\n".join(output))] except Exception as e: logger.error(f"Error in generate_dependency_graph: {e}") return [TextContent(type="text", text=f"Error generating graph: {str(e)}")] async def run(self): """Run the MCP server""" # Create initialization options init_options = InitializationOptions( server_name="scs-mcp", server_version="1.0.0", capabilities=ServerCapabilities() ) async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, initialization_options=init_options ) async def main(): server = SCSMCPServer() await server.run() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server