server.pyโข76.1 kB
#!/usr/bin/env python3
"""
SCS MCP Server - Smart Code Search for Claude Desktop
"""
import json
import sys
import asyncio
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from mcp.server import Server, InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, ServerCapabilities, Resource, Prompt, PromptArgument
# Configure logging to only log errors to stderr
logging.basicConfig(
level=logging.ERROR,
format='%(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger(__name__)
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.clean_search import SmartCodeSearch
from src.core.db_wrapper import ThreadSafeDB
from src.core.orchestrator import OrchestrationEngine
from src.core.context_aware_orchestrator import ContextAwareOrchestrator
class SCSMCPServer:
"""Main MCP server implementation"""
def __init__(self):
self.server = Server("scs-mcp")
self.project_instances: Dict[str, SmartCodeSearch] = {}
self.current_project = None
self.orchestrator = None # Will be initialized on first use
self.model_tools = None # Lazy-loaded model info tools
self.setup_tools()
self.setup_handlers()
self.setup_resources()
self.setup_prompts()
def setup_tools(self):
"""Register available tools with the MCP server"""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="search",
description="Search code by meaning using AI embeddings",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"limit": {
"type": "integer",
"description": "Maximum results",
"default": 10
},
"project_path": {
"type": "string",
"description": "Project path to search",
"default": "."
},
"search_type": {
"type": "string",
"enum": ["semantic", "text", "hybrid"],
"default": "hybrid"
},
"file_type": {
"type": "string",
"enum": ["code", "document", "config", "all"],
"default": "all"
},
"language": {
"type": "string",
"description": "Filter by language"
}
},
"required": ["query"]
}
),
Tool(
name="index",
description="Index a project for semantic search",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Path to project to index"
},
"force": {
"type": "boolean",
"description": "Force re-indexing",
"default": False
},
"patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to include"
}
},
"required": ["project_path"]
}
),
Tool(
name="analyze_symbol",
description="Deep analysis of a specific symbol",
inputSchema={
"type": "object",
"properties": {
"symbol_name": {
"type": "string",
"description": "Name of symbol to analyze"
},
"project_path": {
"type": "string",
"description": "Project path",
"default": "."
},
"include_references": {
"type": "boolean",
"default": True
},
"include_tests": {
"type": "boolean",
"default": True
}
},
"required": ["symbol_name"]
}
),
Tool(
name="find_similar",
description="Find code similar to a given snippet",
inputSchema={
"type": "object",
"properties": {
"code_snippet": {
"type": "string",
"description": "Code snippet to find similar to"
},
"project_path": {
"type": "string",
"description": "Project path",
"default": "."
},
"limit": {
"type": "integer",
"default": 5
}
},
"required": ["code_snippet"]
}
),
Tool(
name="get_context",
description="Get relevant context for current work",
inputSchema={
"type": "object",
"properties": {
"project_path": {
"type": "string",
"description": "Project path",
"default": "."
},
"recent_files": {
"type": "array",
"items": {"type": "string"},
"description": "Recent files being worked on"
},
"recent_symbols": {
"type": "array",
"items": {"type": "string"},
"description": "Recent symbols referenced"
}
}
}
),
# High-value orchestrated tools
Tool(
name="instant_review",
description="Multi-agent code review with instant feedback (920/1000 value)",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to file to review"
},
"code": {
"type": "string",
"description": "Code snippet to review (if file_path not provided)"
},
"review_depth": {
"type": "string",
"enum": ["quick", "standard", "deep"],
"default": "standard",
"description": "Depth of review"
}
}
}
),
Tool(
name="debt_orchestrator",
description="Analyze technical debt and calculate ROI for fixes (900/1000 value)",
inputSchema={
"type": "object",
"properties": {
"scope": {
"type": "string",
"enum": ["file", "directory", "project"],
"default": "file",
"description": "Scope of analysis"
},
"path": {
"type": "string",
"description": "Path to analyze",
"default": "."
},
"include_roi": {
"type": "boolean",
"default": True,
"description": "Calculate ROI for debt items"
}
}
}
),
Tool(
name="test_gap_analyzer",
description="Find untested code and generate test suggestions (880/1000 value)",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to analyze",
"default": "."
},
"include_private": {
"type": "boolean",
"default": False,
"description": "Include private methods"
},
"min_complexity": {
"type": "integer",
"default": 5,
"description": "Minimum complexity to flag"
},
"generate_tests": {
"type": "boolean",
"default": True,
"description": "Generate test suggestions"
}
}
}
),
Tool(
name="import_optimizer",
description="Analyze and optimize imports (650/1000 value)",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "File to analyze"
},
"code": {
"type": "string",
"description": "Code to analyze (if file_path not provided)"
},
"auto_fix": {
"type": "boolean",
"default": False,
"description": "Return fixed code"
}
}
}
),
# Context-aware versions for large files
Tool(
name="review_large_file",
description="Review large files with intelligent context management",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to large file"
},
"max_tokens": {
"type": "integer",
"default": 50000,
"description": "Maximum tokens per chunk"
},
"focus_areas": {
"type": "array",
"items": {"type": "string"},
"description": "Areas to focus on (errors, complexity, security)"
},
"model": {
"type": "string",
"default": "claude-3-opus",
"description": "Model to use for context limits"
}
},
"required": ["file_path"]
}
),
Tool(
name="analyze_with_context",
description="Run any analysis with automatic context management",
inputSchema={
"type": "object",
"properties": {
"flow": {
"type": "string",
"enum": ["instant_review", "debt_orchestrator", "test_gap_analyzer"],
"description": "Analysis flow to run"
},
"target": {
"type": "string",
"description": "File or directory to analyze"
},
"model": {
"type": "string",
"default": "claude-3-opus",
"description": "Model for context limits"
},
"fallback_models": {
"type": "array",
"items": {"type": "string"},
"description": "Models to try if primary fails"
}
},
"required": ["flow", "target"]
}
),
# Model information tools
Tool(
name="get_available_models",
description="Get list of all available AI models with capabilities and pricing",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="compare_models",
description="Compare two AI models side by side",
inputSchema={
"type": "object",
"properties": {
"model1": {
"type": "string",
"description": "First model name (e.g., claude-3-opus)"
},
"model2": {
"type": "string",
"description": "Second model name (e.g., gpt-4-turbo)"
}
},
"required": ["model1", "model2"]
}
),
Tool(
name="suggest_model_for_task",
description="Get AI model recommendation for specific task and budget",
inputSchema={
"type": "object",
"properties": {
"task_type": {
"type": "string",
"description": "Type of task (code_review, test_generation, documentation, debugging, quick_task)",
"enum": ["code_review", "test_generation", "documentation", "debugging", "quick_task"]
},
"context_size": {
"type": "integer",
"description": "Estimated tokens needed"
},
"budget": {
"type": "number",
"description": "Maximum cost in USD (optional)"
}
},
"required": ["task_type", "context_size"]
}
),
Tool(
name="get_current_model_status",
description="Get current AI model being used and its status",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="estimate_operation_cost",
description="Estimate cost for a specific operation",
inputSchema={
"type": "object",
"properties": {
"operation": {
"type": "string",
"description": "Operation name (e.g., code_review, test_generation)"
},
"input_size": {
"type": "integer",
"description": "Input tokens"
},
"output_size": {
"type": "integer",
"description": "Expected output tokens",
"default": 2000
},
"model": {
"type": "string",
"description": "Specific model to use (optional, uses current if not specified)"
}
},
"required": ["operation", "input_size"]
}
),
Tool(
name="generate_dependency_graph",
description="Generate dependency graph in various text formats (DOT, Mermaid, JSON)",
inputSchema={
"type": "object",
"properties": {
"output_format": {
"type": "string",
"description": "Output format: 'dot' (Graphviz), 'mermaid' (GitHub/GitLab), or 'json'",
"enum": ["dot", "mermaid", "json"],
"default": "mermaid"
},
"graph_type": {
"type": "string",
"description": "Type of graph to generate",
"enum": ["imports", "calls", "inheritance"],
"default": "imports"
},
"file_pattern": {
"type": "string",
"description": "File pattern to analyze (e.g., '*.py', 'src/**/*.js')"
},
"detect_cycles": {
"type": "boolean",
"description": "Detect and highlight circular dependencies",
"default": false
}
},
"required": []
}
)
]
def setup_handlers(self):
"""Set up handlers for tool calls"""
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> list[TextContent]:
try:
if name == "search":
return await self.handle_search(arguments)
elif name == "index":
return await self.handle_index(arguments)
elif name == "analyze_symbol":
return await self.handle_analyze_symbol(arguments)
elif name == "find_similar":
return await self.handle_find_similar(arguments)
elif name == "get_context":
return await self.handle_get_context(arguments)
# Orchestrated tools
elif name == "instant_review":
return await self.handle_instant_review(arguments)
elif name == "debt_orchestrator":
return await self.handle_debt_orchestrator(arguments)
elif name == "test_gap_analyzer":
return await self.handle_test_gap_analyzer(arguments)
elif name == "import_optimizer":
return await self.handle_import_optimizer(arguments)
# Context-aware tools
elif name == "review_large_file":
return await self.handle_review_large_file(arguments)
elif name == "analyze_with_context":
return await self.handle_analyze_with_context(arguments)
# Model info tools
elif name == "get_available_models":
return await self.handle_get_available_models(arguments)
elif name == "compare_models":
return await self.handle_compare_models(arguments)
elif name == "suggest_model_for_task":
return await self.handle_suggest_model_for_task(arguments)
elif name == "get_current_model_status":
return await self.handle_get_current_model_status(arguments)
elif name == "estimate_operation_cost":
return await self.handle_estimate_operation_cost(arguments)
elif name == "generate_dependency_graph":
return await self.handle_generate_dependency_graph(arguments)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
logger.error(f"Error in tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
def setup_resources(self):
"""Set up resources to provide context about available projects"""
@self.server.list_resources()
async def list_resources() -> list[Resource]:
resources = []
# Add resource for current working directory
cwd = Path.cwd()
resources.append(Resource(
uri=f"project://{cwd.name}",
name=f"Current Project: {cwd.name}",
mimeType="text/plain",
description=f"Working directory: {cwd}"
))
# Add resources for indexed projects
for project_path in self.project_instances.keys():
project_name = Path(project_path).name
resources.append(Resource(
uri=f"project://indexed/{project_name}",
name=f"Indexed: {project_name}",
mimeType="text/plain",
description=f"Indexed project at {project_path}"
))
# Add resource for common directories
common_dirs = [
Path.home() / "Documents",
Path.home() / "Projects",
Path.home() / "Development",
Path("/mnt/c/Users") if Path("/mnt/c").exists() else None
]
for dir_path in common_dirs:
if dir_path and dir_path.exists():
resources.append(Resource(
uri=f"directory://{dir_path.name}",
name=f"Directory: {dir_path.name}",
mimeType="text/plain",
description=f"Available at {dir_path}"
))
return resources
@self.server.read_resource()
async def read_resource(uri: str) -> str:
if uri.startswith("project://"):
parts = uri.replace("project://", "").split("/")
if parts[0] == "indexed" and len(parts) > 1:
# Return info about indexed project
for project_path, instance in self.project_instances.items():
if Path(project_path).name == parts[1]:
return f"Project: {parts[1]}\nPath: {project_path}\nStatus: Indexed and ready for search"
else:
# Return info about current project
cwd = Path.cwd()
return f"Current working directory: {cwd}\nProject name: {cwd.name}"
elif uri.startswith("directory://"):
dir_name = uri.replace("directory://", "")
return f"Directory {dir_name} is available for indexing and search"
return "Resource not found"
def setup_prompts(self):
"""Set up prompts to suggest common actions"""
@self.server.list_prompts()
async def list_prompts() -> list[Prompt]:
return [
Prompt(
name="index_project",
description="Index a project for semantic search",
arguments=[
PromptArgument(
name="project_path",
description="Path to the project to index",
required=True
)
]
),
Prompt(
name="search_code",
description="Search for code by meaning",
arguments=[
PromptArgument(
name="query",
description="What to search for",
required=True
),
PromptArgument(
name="project_path",
description="Project to search in (optional)",
required=False
)
]
),
Prompt(
name="analyze_current_project",
description="Analyze and index the current working directory",
arguments=[]
),
Prompt(
name="find_implementation",
description="Find where something is implemented",
arguments=[
PromptArgument(
name="symbol",
description="Function, class, or variable name",
required=True
)
]
)
]
@self.server.get_prompt()
async def get_prompt(name: str, arguments: Dict[str, Any]) -> list[TextContent]:
if name == "index_project":
project_path = arguments.get("project_path", ".")
return [TextContent(
type="text",
text=f"Please index the project at {project_path} using the 'index' tool, then search for relevant code."
)]
elif name == "search_code":
query = arguments.get("query", "")
project_path = arguments.get("project_path", ".")
return [TextContent(
type="text",
text=f"Search for '{query}' in project {project_path} using the 'search' tool."
)]
elif name == "analyze_current_project":
cwd = Path.cwd()
return [TextContent(
type="text",
text=f"Index the current project at {cwd} and provide an overview of its structure."
)]
elif name == "find_implementation":
symbol = arguments.get("symbol", "")
return [TextContent(
type="text",
text=f"Use 'analyze_symbol' tool to find where '{symbol}' is implemented."
)]
return [TextContent(type="text", text="Unknown prompt")]
def get_or_create_orchestrator(self, project_path: str = ".") -> OrchestrationEngine:
"""Get or create the orchestration engine"""
if self.orchestrator is None:
# Initialize orchestrator with database
db_path = Path(project_path) / ".claude-symbols" / "search.db"
if not db_path.exists():
# Try default location
db_path = Path(".claude-symbols/search.db")
if db_path.exists():
db = ThreadSafeDB(str(db_path))
self.orchestrator = OrchestrationEngine(db, project_path)
else:
raise ValueError(f"Database not found. Please index the project first using the 'index' tool.")
return self.orchestrator
async def handle_instant_review(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle instant_review orchestrated tool"""
try:
# Get orchestrator
orchestrator = self.get_or_create_orchestrator()
# Prepare inputs
inputs = {}
if "file_path" in args:
file_path = Path(args["file_path"])
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
inputs["code"] = f.read()
inputs["file_path"] = str(file_path)
else:
return [TextContent(type="text", text=f"File not found: {file_path}")]
elif "code" in args:
inputs["code"] = args["code"]
else:
return [TextContent(type="text", text="Please provide either 'file_path' or 'code' to review")]
# Execute orchestrated review
result = await orchestrator.execute_flow("instant_review", inputs)
# Format results
output = ["๐ **Instant Code Review**\n"]
if "agents" in result:
# Get aggregated review results
if "review_aggregator" in result["agents"]:
review = result["agents"]["review_aggregator"]
# Show quality score and grade
if "summary" in review:
summary = review["summary"]
output.append(f"๐ **Quality Score**: {summary.get('quality_score', 'N/A')}/100 (Grade: {summary.get('grade', 'N/A')})")
output.append(f"๐ด High: {summary.get('high_severity', 0)} | ๐ก Medium: {summary.get('medium_severity', 0)} | ๐ข Low: {summary.get('low_severity', 0)}\n")
# Show issues
if "issues" in review and review["issues"]:
output.append("**Issues Found:**")
for issue in review["issues"][:10]: # Top 10 issues
severity_icon = {"high": "๐ด", "medium": "๐ก", "low": "๐ข"}.get(issue.get("severity", "low"), "โช")
output.append(f"{severity_icon} {issue.get('type', 'Unknown')}: {issue}")
output.append("")
# Show suggestions
if "suggestions" in review and review["suggestions"]:
output.append("**Suggestions:**")
for suggestion in review["suggestions"]:
output.append(f"โข {suggestion}")
# Show priority actions if available
if "priority_ranker" in result["agents"]:
ranker = result["agents"]["priority_ranker"]
if "action_plan" in ranker and ranker["action_plan"]:
output.append("\n**Priority Actions:**")
for action in ranker["action_plan"][:5]:
output.append(f"{action['order']}. {action['action']}")
# Show execution time
if "execution_ms" in result:
output.append(f"\nโฑ๏ธ Execution time: {result['execution_ms']}ms")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in instant_review: {e}")
return [TextContent(type="text", text=f"Error performing code review: {str(e)}")]
async def handle_debt_orchestrator(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle debt_orchestrator tool"""
try:
orchestrator = self.get_or_create_orchestrator()
# Prepare inputs based on scope
scope = args.get("scope", "file")
path = args.get("path", ".")
inputs = {"path": path, "scope": scope}
# Read code based on scope
if scope == "file":
file_path = Path(path)
if file_path.exists() and file_path.is_file():
with open(file_path, 'r', encoding='utf-8') as f:
inputs["code"] = f.read()
else:
return [TextContent(type="text", text=f"File not found: {path}")]
# Execute debt analysis
result = await orchestrator.execute_flow("debt_orchestrator", inputs)
# Format results
output = ["๐ฐ **Technical Debt Analysis**\n"]
if "agents" in result and "debt_calculator" in result["agents"]:
debt = result["agents"]["debt_calculator"]
# Summary metrics
output.append(f"**Total Debt**: {debt.get('total_debt_hours', 0):.1f} hours (${debt.get('total_debt_cost', 0):,.0f})")
output.append(f"**High Priority**: {debt.get('high_priority_hours', 0):.1f} hours")
output.append(f"**Recommendation**: {debt.get('recommendation', 'N/A')}\n")
# Quick wins
if "quick_wins" in debt and debt["quick_wins"]:
output.append("**Quick Wins** (< 30 min each):")
for item in debt["quick_wins"]:
output.append(f"โข {item['type']}: {item['hours']:.1f}h (${item['cost']:.0f})")
output.append("")
# All debt items
if "debt_items" in debt and debt["debt_items"]:
output.append("**Debt Items by ROI:**")
for item in debt["debt_items"][:10]:
roi_stars = "โญ" * min(5, int(item.get("roi", 0)))
output.append(f"{roi_stars} {item['type']}: {item['hours']:.1f}h (${item['cost']:.0f}) - {item['priority']}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in debt_orchestrator: {e}")
return [TextContent(type="text", text=f"Error analyzing technical debt: {str(e)}")]
async def handle_test_gap_analyzer(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle test_gap_analyzer tool"""
try:
orchestrator = self.get_or_create_orchestrator()
path = args.get("path", ".")
inputs = {
"path": path,
"include_private": args.get("include_private", False),
"min_complexity": args.get("min_complexity", 5)
}
# Read code if it's a file
file_path = Path(path)
if file_path.exists() and file_path.is_file():
with open(file_path, 'r', encoding='utf-8') as f:
inputs["code"] = f.read()
# Execute test gap analysis
result = await orchestrator.execute_flow("test_gap_analyzer", inputs)
# Format results
output = ["๐งช **Test Gap Analysis**\n"]
if "agents" in result:
# Show function extraction results
if "function_extractor" in result["agents"]:
functions = result["agents"]["function_extractor"]
output.append(f"**Functions Found**: {functions.get('count', 0)}")
# Show test gaps
if "test_gap_finder" in result["agents"]:
gaps = result["agents"]["test_gap_finder"]
coverage = gaps.get("coverage_percent", 0)
# Coverage indicator
coverage_icon = "๐ข" if coverage >= 80 else "๐ก" if coverage >= 60 else "๐ด"
output.append(f"{coverage_icon} **Coverage**: {coverage}%")
output.append(f"**Tested**: {gaps.get('tested_count', 0)} | **Untested**: {len(gaps.get('untested', []))}\n")
# List untested functions
if "untested" in gaps and gaps["untested"]:
output.append("**Untested Functions:**")
for func in gaps["untested"][:10]:
output.append(f"โข {func.get('name', 'Unknown')} (line {func.get('line', '?')})")
output.append("")
# Show generated tests if available
if "test_generator" in result["agents"] and args.get("generate_tests", True):
test_code = result["agents"]["test_generator"]
if isinstance(test_code, str) and test_code:
output.append("**Generated Test Template:**")
output.append("```python")
output.append(test_code[:500]) # First 500 chars
output.append("```")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in test_gap_analyzer: {e}")
return [TextContent(type="text", text=f"Error analyzing test gaps: {str(e)}")]
async def handle_import_optimizer(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle import_optimizer tool"""
try:
orchestrator = self.get_or_create_orchestrator()
inputs = {}
if "file_path" in args:
file_path = Path(args["file_path"])
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
inputs["code"] = f.read()
inputs["file_path"] = str(file_path)
else:
return [TextContent(type="text", text=f"File not found: {file_path}")]
elif "code" in args:
inputs["code"] = args["code"]
else:
return [TextContent(type="text", text="Please provide either 'file_path' or 'code' to analyze")]
# Execute import optimization
result = await orchestrator.execute_flow("import_optimizer", inputs)
# Format results
output = ["๐ฆ **Import Analysis**\n"]
if "agents" in result and "import_analyzer" in result["agents"]:
analysis = result["agents"]["import_analyzer"]
# Summary
output.append(f"**Total Imports**: {analysis.get('count', 0)}")
output.append(f"**Unused Imports**: {analysis.get('unused_count', 0)}\n")
# Show all imports
if "imports" in analysis and analysis["imports"]:
output.append("**All Imports:**")
for imp in analysis["imports"][:20]: # First 20
module = imp.get("module", "Unknown")
line = imp.get("line", "?")
alias = f" as {imp['alias']}" if imp.get("alias") else ""
output.append(f" Line {line}: {module}{alias}")
output.append("")
# Show unused imports
if "unused" in analysis and analysis["unused"]:
output.append("**๐๏ธ Unused Imports (can be removed):**")
for imp in analysis["unused"]:
output.append(f" Line {imp.get('line', '?')}: {imp.get('module', 'Unknown')}")
# Provide auto-fix if requested
if args.get("auto_fix", False) and inputs.get("code"):
output.append("\n**Auto-fix:** Remove the unused imports listed above from your code.")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in import_optimizer: {e}")
return [TextContent(type="text", text=f"Error optimizing imports: {str(e)}")]
def get_or_create_instance(self, project_path: str) -> SmartCodeSearch:
"""Get or create a SmartCodeSearch instance for a project"""
abs_path = str(Path(project_path).absolute())
if abs_path not in self.project_instances:
# Create instance silently
self.project_instances[abs_path] = SmartCodeSearch(abs_path, quiet=True)
return self.project_instances[abs_path]
async def handle_search(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle search tool call"""
query = args.get("query", "")
project_path = args.get("project_path", ".")
limit = args.get("limit", 10)
search_type = args.get("search_type", "hybrid")
search_instance = self.get_or_create_instance(project_path)
# Run search in thread pool to avoid blocking
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
search_instance.search,
query,
limit
)
if not results:
return [TextContent(type="text", text="No results found")]
# Format results
formatted_results = []
for result in results:
# Handle both dict and tuple formats
if isinstance(result, dict):
file_path = result.get('file_path', '')
symbol = result.get('name', '')
content = result.get('content', '')
score = result.get('score', 0.0)
line_num = result.get('line_num', 0)
formatted_results.append(
f"๐ {file_path}:{line_num}\n"
f" Symbol: {symbol}\n"
f" Score: {score:.3f}\n"
f" Content:\n{content[:200]}...\n"
)
else:
# Tuple format from clean_search
file_path, symbol, content, score = result
formatted_results.append(
f"๐ {file_path}\n"
f" Symbol: {symbol}\n"
f" Score: {score:.3f}\n"
f" Content:\n{content[:200]}...\n"
)
return [TextContent(type="text", text="\n".join(formatted_results))]
async def handle_index(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle index tool call"""
project_path = args.get("project_path", ".")
force = args.get("force", False)
search_instance = self.get_or_create_instance(project_path)
# Run indexing in thread pool
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
search_instance.index_project,
force
)
return [TextContent(
type="text",
text=f"โ
Project indexed successfully: {project_path}"
)]
async def handle_analyze_symbol(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle analyze_symbol tool call"""
symbol_name = args.get("symbol_name", "")
project_path = args.get("project_path", ".")
search_instance = self.get_or_create_instance(project_path)
# Search for the symbol
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
search_instance.search,
symbol_name,
20
)
if not results:
return [TextContent(type="text", text=f"Symbol '{symbol_name}' not found")]
# Analyze results
definitions = []
references = []
tests = []
for result in results:
# Handle both dict and tuple formats
if isinstance(result, dict):
file_path = result.get('file_path', '')
content = result.get('content', '')
else:
file_path, _, content, _ = result
if "def " + symbol_name in content or "class " + symbol_name in content:
definitions.append((file_path, content[:300]))
elif "test" in file_path.lower() and args.get("include_tests", True):
tests.append((file_path, content[:200]))
elif args.get("include_references", True):
references.append((file_path, content[:200]))
output = [f"๐ Analysis of '{symbol_name}':\n"]
if definitions:
output.append("\n๐ Definitions:")
for path, content in definitions[:3]:
output.append(f" {path}:\n {content}")
if references:
output.append(f"\n๐ References ({len(references)} found):")
for path, content in references[:5]:
output.append(f" {path}")
if tests:
output.append(f"\n๐งช Tests ({len(tests)} found):")
for path, content in tests[:3]:
output.append(f" {path}")
return [TextContent(type="text", text="\n".join(output))]
async def handle_find_similar(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle find_similar tool call"""
code_snippet = args.get("code_snippet", "")
project_path = args.get("project_path", ".")
limit = args.get("limit", 5)
search_instance = self.get_or_create_instance(project_path)
# Use the code snippet as a semantic search query
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
search_instance.search,
code_snippet,
limit
)
if not results:
return [TextContent(type="text", text="No similar code found")]
formatted_results = ["๐ Similar code found:\n"]
for result in results:
# Handle both dict and tuple formats
if isinstance(result, dict):
file_path = result.get('file_path', '')
content = result.get('content', '')
score = result.get('score', 0.0)
else:
file_path, _, content, score = result
formatted_results.append(
f"๐ {file_path} (similarity: {score:.3f})\n"
f" {content[:200]}...\n"
)
return [TextContent(type="text", text="\n".join(formatted_results))]
async def handle_get_context(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle get_context tool call"""
project_path = args.get("project_path", ".")
recent_files = args.get("recent_files", [])
recent_symbols = args.get("recent_symbols", [])
search_instance = self.get_or_create_instance(project_path)
context_items = []
# Get context for recent symbols
for symbol in recent_symbols[:3]:
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
search_instance.search,
symbol,
3
)
for result in results[:2]:
# Handle both dict and tuple formats
if isinstance(result, dict):
file_path = result.get('file_path', '')
else:
file_path, _, _, _ = result
context_items.append(f"Related to '{symbol}': {file_path}")
# Get context for recent files (find related files)
for file_path in recent_files[:3]:
# Extract likely symbols from filename
base_name = Path(file_path).stem
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None,
search_instance.search,
base_name,
3
)
for result in results[:2]:
# Handle both dict and tuple formats
if isinstance(result, dict):
found_path = result.get('file_path', '')
else:
found_path, _, _, _ = result
if found_path != file_path:
context_items.append(f"Related to {file_path}: {found_path}")
if not context_items:
return [TextContent(type="text", text="No additional context found")]
# Remove duplicates and limit to 10 items
unique_context = list(set(context_items))[:10]
output = "๐ Relevant context:\n" + "\n".join(unique_context)
return [TextContent(type="text", text=output)]
async def handle_review_large_file(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle review_large_file tool call with context management"""
file_path = args.get("file_path", "")
focus_areas = args.get("focus_areas", ["errors", "complexity", "best_practices"])
max_file_size = args.get("max_file_size", 1000000) # 1MB default
# Check file exists
path = Path(file_path)
if not path.exists():
return [TextContent(type="text", text=f"Error: File '{file_path}' not found")]
# Check file size
file_size = path.stat().st_size
if file_size > max_file_size * 10: # Absolute max 10MB
return [TextContent(type="text", text=f"Error: File too large ({file_size:,} bytes)")]
# Initialize context-aware orchestrator if not exists
if not hasattr(self, 'context_orchestrator'):
db = self.get_or_create_db()
from src.core.context_aware_orchestrator import ContextAwareOrchestrator
self.context_orchestrator = ContextAwareOrchestrator(db, ".")
# Read file content
try:
content = path.read_text(encoding='utf-8')
except Exception as e:
return [TextContent(type="text", text=f"Error reading file: {str(e)}")]
# Prepare inputs for review
inputs = {
"code": content,
"file_path": str(path),
"focus_areas": focus_areas,
"_file_size": file_size
}
# Execute context-aware review
try:
result = await self.context_orchestrator.execute_flow("instant_review", inputs)
# Format results
output = [f"๐ Large File Review: {path.name}\n"]
output.append(f"๐ File size: {file_size:,} bytes")
# Add context metadata
if "_context_metadata" in result:
meta = result["_context_metadata"]
output.append(f"๐ง Context: {meta['utilization']} used, {meta['items']} items")
if "compression_stats" in meta:
stats = meta["compression_stats"]
if stats.get("items_compressed", 0) > 0:
output.append(f"๐ฆ Compressed {stats['items_compressed']} items")
# Add chunk info if file was chunked
if "total_chunks" in result:
output.append(f"๐ Processed in {result['total_chunks']} chunks")
# Add review results
if "summary" in result:
output.append(f"\n๐ Summary:")
output.append(f" Total issues: {result['summary'].get('total_issues', 0)}")
output.append(f" Total errors: {result['summary'].get('total_errors', 0)}")
# Add key findings
if "all_issues" in result and result["all_issues"]:
output.append("\nโ ๏ธ Key Issues Found:")
for issue in result["all_issues"][:10]: # Top 10 issues
chunk_info = f"[chunk {issue.get('chunk', 1)}] " if "chunk" in issue else ""
output.append(f" {chunk_info}{issue.get('agent', 'unknown')}: {issue.get('description', str(issue))}")
# Add recommendations
if "agents" in result:
for agent_name, agent_results in result["agents"].items():
if isinstance(agent_results, list):
# Multiple chunks
for chunk_result in agent_results[:2]: # First 2 chunks
if isinstance(chunk_result, dict) and "result" in chunk_result:
res = chunk_result["result"]
if isinstance(res, dict) and "recommendations" in res:
output.append(f"\n๐ก {agent_name} recommendations:")
for rec in res["recommendations"][:3]:
output.append(f" โข {rec}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in large file review: {e}")
return [TextContent(type="text", text=f"Error during review: {str(e)}")]
async def handle_analyze_with_context(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle analyze_with_context tool call with automatic context management"""
flow = args.get("flow", "instant_review")
code = args.get("code", "")
file_path = args.get("file_path", None)
model = args.get("model", "claude-3-opus")
fallback_models = args.get("fallback_models", ["claude-3-opus", "gpt-4-turbo", "gpt-4"])
# Initialize context-aware orchestrator if not exists
if not hasattr(self, 'context_orchestrator'):
db = self.get_or_create_db()
from src.core.context_aware_orchestrator import ContextAwareOrchestrator
self.context_orchestrator = ContextAwareOrchestrator(db, ".", model)
# Prepare inputs
inputs = {"code": code}
if file_path:
path = Path(file_path)
if path.exists():
try:
inputs["code"] = path.read_text(encoding='utf-8')
inputs["file_path"] = str(path)
except Exception as e:
return [TextContent(type="text", text=f"Error reading file: {str(e)}")]
# Execute with model fallback
try:
result = await self.context_orchestrator.execute_with_model_fallback(
flow,
inputs,
fallback_models
)
# Format results
output = [f"๐ Context-Aware Analysis: {flow}\n"]
# Add model info
if "_model_used" in result:
output.append(f"๐ค Model: {result['_model_used']}")
# Add context metadata
if "_context_metadata" in result:
meta = result["_context_metadata"]
output.append(f"๐ง Context usage: {meta['utilization']}")
output.append(f"๐ Items in context: {meta['items']}")
# Show compression stats if any
if "compression_stats" in meta:
stats = meta["compression_stats"]
if stats.get("items_compressed", 0) > 0:
output.append(f"๐ฆ Compressed: {stats['items_compressed']} items")
if stats.get("items_dropped", 0) > 0:
output.append(f"๐๏ธ Dropped: {stats['items_dropped']} low-priority items")
# Add chunk info if applicable
if "total_chunks" in result:
output.append(f"๐ Processed in {result['total_chunks']} chunks")
# Add flow-specific results
if "agents" in result:
output.append(f"\n๐ Analysis Results:")
for agent_name, agent_result in result["agents"].items():
if isinstance(agent_result, list):
# Multiple chunks
output.append(f" {agent_name}: {len(agent_result)} chunks processed")
elif isinstance(agent_result, dict):
# Single result
if "score" in agent_result:
output.append(f" {agent_name}: score {agent_result['score']}")
elif "issues" in agent_result:
output.append(f" {agent_name}: {len(agent_result['issues'])} issues found")
elif "error" in agent_result:
output.append(f" {agent_name}: โ {agent_result['error']}")
# Add summary if available
if "summary" in result:
output.append(f"\n๐ Summary:")
for key, value in result["summary"].items():
output.append(f" {key}: {value}")
# Add execution time
if "execution_ms" in result:
output.append(f"\nโฑ๏ธ Execution time: {result['execution_ms']}ms")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in context-aware analysis: {e}")
return [TextContent(type="text", text=f"Error during analysis: {str(e)}")]
async def handle_get_available_models(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle get_available_models tool call"""
try:
from .tools.model_info_tools import ModelInfoTools
tools = ModelInfoTools()
result = await tools.get_available_models()
output = ["๐ Available AI Models\n"]
# Summary
summary = result.get("summary", {})
output.append(f"Total models: {summary['total_models']}")
output.append(f"Providers available: {summary['providers_available']}")
if summary['cheapest_model']:
cheap = summary['cheapest_model']
output.append(f"๐ฐ Cheapest: {cheap['name']} ({cheap['provider']}) - ${cheap['cost']:.4f}/1k")
if summary['largest_context']:
large = summary['largest_context']
output.append(f"๐ Largest context: {large['name']} ({large['provider']}) - {large['size']:,} tokens")
# Providers and models
output.append("\n๐ค Models by Provider:")
for provider, info in result.get("providers", {}).items():
output.append(f"\n{provider.upper()} (API key: {'โ
' if info['has_api_key'] else 'โ'})")
for model in info['models']:
output.append(f" โข {model['name']}")
output.append(f" Context: {model['context_window']:,} tokens")
output.append(f" Max output: {model['max_output']:,} tokens")
output.append(f" Cost: ${model['cost']['input_per_1k']:.4f} in / ${model['cost']['output_per_1k']:.4f} out")
features = []
if model['supports_vision']:
features.append("๐๏ธ Vision")
if model['supports_functions']:
features.append("๐ง Functions")
if model['supports_json_mode']:
features.append("๐ JSON mode")
if features:
output.append(f" Features: {', '.join(features)}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in get_available_models: {e}")
return [TextContent(type="text", text=f"Error getting model information: {str(e)}")]
async def handle_compare_models(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle compare_models tool call"""
try:
from .tools.model_info_tools import ModelInfoTools
model1 = args.get("model1")
model2 = args.get("model2")
if not model1 or not model2:
return [TextContent(type="text", text="Error: Both model1 and model2 are required")]
tools = ModelInfoTools()
result = await tools.compare_models(model1, model2)
output = [f"๐ Model Comparison: {model1} vs {model2}\n"]
# Add comparison details
if "comparison" in result:
comp = result["comparison"]
if "context_ratio" in comp:
output.append(f"Context window ratio: {comp['context_ratio']:.2f}x")
output.append(f"Larger context: {comp.get('larger_context', 'N/A')}")
if "cost_ratio" in comp:
cost = comp["cost_ratio"]
output.append(f"\nCost comparison:")
output.append(f" Input cost ratio: {cost.get('input', 0):.2f}x")
output.append(f" Output cost ratio: {cost.get('output', 0):.2f}x")
output.append(f" Cheaper model: {comp.get('cheaper', 'N/A')}")
if "vision_support" in comp:
output.append(f"\nVision support:")
for model, has_vision in comp["vision_support"].items():
output.append(f" {model}: {'โ
' if has_vision else 'โ'}")
# Add recommendations
if "recommendations" in result:
output.append(f"\n๐ก Recommendations:")
for rec in result["recommendations"]:
output.append(f" โข {rec}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in compare_models: {e}")
return [TextContent(type="text", text=f"Error comparing models: {str(e)}")]
async def handle_suggest_model_for_task(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle suggest_model_for_task tool call"""
try:
from .tools.model_info_tools import ModelInfoTools
task_type = args.get("task_type", "general")
context_size = args.get("context_size", 10000)
budget = args.get("budget", None)
tools = ModelInfoTools()
result = await tools.suggest_model_for_task(task_type, context_size, budget)
output = [f"๐ฏ Model Suggestion for {task_type}\n"]
if result.get("error"):
output.append(f"โ {result['error']}")
if result.get("reason"):
output.append(f"Reason: {result['reason']}")
else:
output.append(f"โ
Suggested model: {result['suggestion']}")
output.append(f"Provider: {result['provider']}")
output.append(f"Context window: {result['context_window']:,} tokens")
output.append(f"Estimated cost: ${result['estimated_cost']:.4f}")
if result.get("reasoning"):
output.append(f"\n๐ Reasoning: {result['reasoning']}")
if result.get("alternatives"):
output.append(f"\n๐ Alternative models:")
for alt in result["alternatives"]:
output.append(f" โข {alt['model']} ({alt['provider']})")
output.append(f" Cost: ${alt['cost']:.4f}")
output.append(f" Context: {alt['context']:,} tokens")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in suggest_model_for_task: {e}")
return [TextContent(type="text", text=f"Error suggesting model: {str(e)}")]
async def handle_get_current_model_status(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle get_current_model_status tool call"""
try:
from .tools.model_info_tools import ModelInfoTools
tools = ModelInfoTools()
result = await tools.get_current_model_status()
output = [f"๐ค Current Model Status\n"]
output.append(f"Model: {result['current_model']}")
output.append(f"Provider: {result['provider']}")
# Capabilities
cap = result['capabilities']
output.append(f"\n๐ Capabilities:")
output.append(f" Context window: {cap['context_window']:,} tokens")
output.append(f" Max output: {cap['max_output']:,} tokens")
output.append(f" Usable context: {cap['usable_context']:,} tokens")
features = []
if cap['supports_vision']:
features.append("๐๏ธ Vision")
if cap['supports_functions']:
features.append("๐ง Functions")
if cap['supports_json_mode']:
features.append("๐ JSON mode")
if features:
output.append(f" Features: {', '.join(features)}")
# Cost estimate
if result.get('cost_estimate'):
cost = result['cost_estimate']
output.append(f"\n๐ฐ Typical cost estimate:")
output.append(f" Total: ${cost.get('total', 0):.4f}")
output.append(f" (10k tokens in, 2k tokens out)")
# Context usage
if result.get('context_usage'):
usage = result['context_usage']
output.append(f"\n๐ Current context usage:")
output.append(f" Tokens used: {usage['tokens_used']:,}")
output.append(f" Tokens available: {usage['tokens_available']:,}")
output.append(f" Utilization: {usage['utilization']}")
# Recommendations
if result.get('recommendations'):
output.append(f"\n๐ก Recommendations:")
for rec in result['recommendations']:
output.append(f" โข {rec}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in get_current_model_status: {e}")
return [TextContent(type="text", text=f"Error getting model status: {str(e)}")]
async def handle_estimate_operation_cost(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle estimate_operation_cost tool call"""
try:
from .tools.model_info_tools import ModelInfoTools
operation = args.get("operation", "general")
input_size = args.get("input_size", 10000)
output_size = args.get("output_size", 2000)
model = args.get("model", None)
tools = ModelInfoTools()
result = await tools.estimate_operation_cost(operation, input_size, output_size, model)
output = [f"๐ฐ Cost Estimate for '{operation}'\n"]
output.append(f"Model: {result['model']}")
# Token counts
tokens = result['tokens']
output.append(f"\n๐ Token usage:")
output.append(f" Input: {tokens['input']:,} tokens")
output.append(f" Output: {tokens['output']:,} tokens")
output.append(f" Total: {tokens['total']:,} tokens")
# Cost breakdown
cost = result['cost']
output.append(f"\n๐ต Cost breakdown:")
output.append(f" Input cost: ${cost['breakdown']['input']:.4f}")
output.append(f" Output cost: ${cost['breakdown']['output']:.4f}")
output.append(f" Total cost: ${cost['total']:.4f}")
# Cheaper alternatives
if result.get('cheaper_alternatives'):
output.append(f"\n๐ Cheaper alternatives:")
for alt in result['cheaper_alternatives']:
output.append(f" โข {alt['model']} ({alt['provider']})")
output.append(f" Cost: ${alt['cost']:.4f}")
output.append(f" Savings: ${alt['savings']:.4f} ({alt['savings_percent']:.0f}%)")
# Recommendation
if result.get('recommendation'):
output.append(f"\n{result['recommendation']}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in estimate_operation_cost: {e}")
return [TextContent(type="text", text=f"Error estimating cost: {str(e)}")]
async def handle_generate_dependency_graph(self, args: Dict[str, Any]) -> list[TextContent]:
"""Handle generate_dependency_graph tool call"""
try:
from .tools.graph_export import GraphExporter
from .core.dependency_analyzer import DependencyAnalyzer
from pathlib import Path
output_format = args.get("output_format", "mermaid")
graph_type = args.get("graph_type", "imports")
file_pattern = args.get("file_pattern", "*.py")
detect_cycles = args.get("detect_cycles", False)
# Analyze project dependencies
analyzer = DependencyAnalyzer(Path.cwd())
# Analyze files matching pattern
files_analyzed = 0
for file_path in Path.cwd().rglob(file_pattern):
if file_path.is_file() and not any(part.startswith('.') for part in file_path.parts):
try:
analyzer.analyze_file(file_path)
files_analyzed += 1
except Exception:
pass # Skip files that can't be analyzed
if files_analyzed == 0:
return [TextContent(type="text", text=f"No files found matching pattern: {file_pattern}")]
# Create exporter and generate graph
exporter = GraphExporter(dependency_analyzer=analyzer)
# Generate the graph
if output_format == "dot":
graph_output = exporter.export_to_dot(graph_type, f"{graph_type.title()} Dependency Graph")
elif output_format == "mermaid":
graph_output = exporter.export_to_mermaid(graph_type, f"{graph_type.title()} Dependency Graph")
elif output_format == "json":
graph_output = exporter.export_to_json(graph_type, include_metadata=True)
else:
return [TextContent(type="text", text=f"Unsupported format: {output_format}")]
# Detect circular dependencies if requested
cycles_info = ""
if detect_cycles:
cycles = exporter.detect_circular_dependencies(graph_type)
if cycles:
cycles_info = f"\n\nโ ๏ธ **Circular Dependencies Detected:**\n"
for cycle in cycles:
cycles_info += f" โข {' โ '.join(cycle)}\n"
else:
cycles_info = "\n\nโ
No circular dependencies detected."
# Format output
output = [f"๐ **Dependency Graph** ({output_format.upper()} format)\n"]
output.append(f"Files analyzed: {files_analyzed}")
output.append(f"Graph type: {graph_type}")
output.append(f"\n{graph_output}")
if cycles_info:
output.append(cycles_info)
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
logger.error(f"Error in generate_dependency_graph: {e}")
return [TextContent(type="text", text=f"Error generating graph: {str(e)}")]
async def run(self):
"""Run the MCP server"""
# Create initialization options
init_options = InitializationOptions(
server_name="scs-mcp",
server_version="1.0.0",
capabilities=ServerCapabilities()
)
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
initialization_options=init_options
)
async def main():
server = SCSMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())