MCP Code Analyzer
by emiryasar
- mcp_code_analyzer
- server
import logging
from enum import Enum
from typing import Dict, Any, List, Optional
from pathlib import Path
import json
import sys
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
from ..tools.manager import ToolManager
logger = logging.getLogger(__name__)
__all__ = ["MCPServer", "main"]
class MCPServer:
"""MCP Server implementation for code analysis"""
def __init__(self, analyze_paths: List[str]):
self.analyze_paths = []
self.base_path = Path.cwd()
self._setup_paths(analyze_paths)
self.tool_manager = ToolManager()
self.server = Server("code-analyzer")
self._setup_handlers()
def _setup_paths(self, analyze_paths: List[str]):
"""Setup and validate analysis paths"""
for path in analyze_paths:
try:
path_obj = Path(path)
normalized_path = path_obj.resolve() if path_obj.is_absolute() \
else (self.base_path / path_obj).resolve()
if normalized_path.exists():
self.analyze_paths.append(normalized_path)
logger.info(f"Added valid path: {normalized_path}")
else:
logger.warning(f"Path does not exist: {path}")
except Exception as e:
logger.error(f"Error processing path {path}: {e}")
if not self.analyze_paths:
self.analyze_paths = [self.base_path]
logger.warning(f"No valid paths provided, using current directory: {self.base_path}")
async def _handle_tool_execution(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Execute tool with enhanced error handling and logging"""
operation = arguments.get('operation', 'unknown')
path = arguments.get('path') or arguments.get('file_path')
logger.info(f"Starting tool execution - Name: {name}, Operation: {operation}, Path: {path}")
try:
# Input validation
if not path and name in ['create_file', 'stream_edit', 'modify_code']:
logger.error(f"No path provided for {name}")
return {"error": "Path is required for this operation"}
# Special handling for code modification
if name == "code_modifier":
result = await self.tool_manager.execute_tool(name, arguments)
return await self._format_modification_result(result)
# Get tool instance
tool = self.tool_manager.get_tool(name)
if not tool:
logger.error(f"Tool not found: {name}")
return {"error": f"Tool {name} not found"}
# Execute tool operation
logger.info(f"Executing {name} with arguments: {arguments}")
result = await self.tool_manager.execute_tool(name, arguments)
# Log result summary
if isinstance(result, dict):
success = result.get('success', False)
error = result.get('error')
if error:
logger.error(f"Tool execution failed - {name}: {error}")
elif success:
logger.info(f"Tool execution successful - {name}")
return await self._handle_tool_result(result)
except Exception as e:
logger.exception(f"Error executing tool {name}: {e}")
return {"error": str(e), "details": f"Failed to execute {name}"}
def _ensure_utf8(self, obj: Any) -> Any:
"""Ensure all strings in object are UTF-8 encoded"""
if isinstance(obj, str):
return obj.encode('utf-8', errors='replace').decode('utf-8')
elif isinstance(obj, dict):
return {k: self._ensure_utf8(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._ensure_utf8(item) for item in obj]
return obj
async def _handle_tool_result(self, result: Any) -> List[types.TextContent]:
"""Handle tool execution result with proper encoding"""
try:
# Ensure proper encoding of result
encoded_result = self._ensure_utf8(result)
# Convert to string safely
try:
if isinstance(encoded_result, (dict, list)):
result_str = json.dumps(encoded_result, ensure_ascii=False)
else:
result_str = str(encoded_result)
return [types.TextContent(type="text", text=result_str)]
except Exception as json_error:
logger.error(f"JSON encoding error: {json_error}")
return [types.TextContent(type="text", text=str(encoded_result))]
except Exception as e:
logger.error(f"Error handling tool result: {e}", exc_info=True)
return [types.TextContent(type="text", text=f"Error processing result: {str(e)}")]
async def _format_modification_result(self, result: Dict) -> List[types.TextContent]:
"""Format code modification result"""
if "error" in result:
return [types.TextContent(type="text", text=json.dumps({
"success": False,
"error": result["error"]
}))]
# Format successful result
formatted_result = {
"success": True,
"modification": {
"backup_path": result.get("backup_path"),
"affected_files": len(result.get("affected_code", [])),
"dependencies": len(result.get("dependencies", []))
}
}
if result.get("affected_code"):
formatted_result["details"] = {
"affected_code": [
{
"file": code["file_path"],
"reason": code["reason"],
"action": code["suggested_action"]
}
for code in result["affected_code"]
]
}
return [types.TextContent(type="text", text=json.dumps(formatted_result))]
async def _handle_tool_result(self, result: Any) -> List[types.TextContent]:
"""Handle tool execution result with proper encoding"""
try:
safe_result = self._convert_to_safe_format(result)
encoded_result = self._ensure_utf8(safe_result)
try:
if isinstance(encoded_result, (dict, list)):
result_str = json.dumps(encoded_result, ensure_ascii=False)
else:
result_str = str(encoded_result)
return [types.TextContent(type="text", text=result_str)]
except Exception as json_error:
logger.error(f"JSON encoding error: {json_error}")
return [types.TextContent(type="text", text=str(encoded_result))]
except Exception as e:
logger.error(f"Error handling tool result: {e}", exc_info=True)
return [types.TextContent(type="text", text=f"Error processing result: {str(e)}")]
def _convert_to_safe_format(self, obj: Any) -> Any:
"""Convert complex objects to JSON-serializable format"""
if isinstance(obj, dict):
return {k: self._convert_to_safe_format(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._convert_to_safe_format(item) for item in obj]
elif isinstance(obj, Enum):
return obj.name
elif hasattr(obj, '__dict__'):
return self._convert_to_safe_format(obj.__dict__)
return obj
def _setup_handlers(self):
"""Setup all server handlers"""
@self.server.list_resources()
async def handle_list_resources() -> List[types.Resource]:
resources = []
for path in self.analyze_paths:
resources.append(
types.Resource(
uri=types.AnyUrl(f"memo://insights/{Path(path).name}"),
name=f"Analysis for {Path(path).name}",
description=f"Analysis results for {path}",
mimeType="text/plain"
)
)
return resources
@self.server.list_prompts()
async def handle_list_prompts() -> List[types.Prompt]:
prompts = []
for path in self.analyze_paths:
prompts.append(
types.Prompt(
name=f"analyze-{Path(path).name}",
description=f"Analyze code in {path}",
arguments=[
types.PromptArgument(
name="tool",
description="Analysis tool to use",
required=True
)
]
)
)
return prompts
@self.server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
"""List available analysis tools"""
return [
types.Tool(
name="analyze_project_structure",
description="Directory structure and organization analysis with tree view",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
types.Tool(
name="analyze_project_statistics",
description="Project-wide statistics and metrics",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
types.Tool(
name="analyze_project_technology",
description="Detect and analyze used technologies and frameworks",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
# File Operations
types.Tool(
name="file_operations",
description="File operations with MCP support",
inputSchema={
"type": "object",
"properties": {
"operation": {"type": "string", "enum": ["analyze", "create", "modify", "stream"]},
"path": {"type": "string"},
"content": {"type": "string"},
"section": {
"type": "object",
"properties": {
"start": {"type": "number"},
"end": {"type": "number"}
}
},
"stream_operation": {"type": "string", "enum": ["start", "write", "finish"]}
},
"required": ["operation", "path"]
}
),
types.Tool(
name="code_modifier",
description="Safe code modification with impact analysis",
inputSchema={
"type": "object",
"properties": {
"file_path": {"type": "string"},
"operation": {
"type": "string",
"enum": ["modify", "insert", "delete"]
},
"section": {
"type": "object",
"properties": {
"start": {"type": "number"},
"end": {"type": "number"}
}
},
"content": {"type": "string"},
"description": {"type": "string"}
},
"required": ["file_path", "operation"]
}
),
types.Tool(
name="manage_changes",
description="Manage code changes and their application",
inputSchema={
"type": "object",
"properties": {
"file_path": {"type": "string"},
"operation": {
"type": "string",
"enum": ["apply", "revert", "status", "history"]
},
"change_ids": {
"type": "array",
"items": {"type": "string"}
},
"limit": {"type": "number"}
},
"required": ["file_path", "operation"]
}
),
types.Tool(
name="search_code",
description="Search code with pattern matching",
inputSchema={
"type": "object",
"properties": {
"pattern": {"type": "string"},
"search_type": {
"type": "string",
"enum": ["text", "regex", "ast"]
},
"scope": {"type": "string"}
},
"required": ["pattern"]
}
),
# Code Analysis Tools
types.Tool(
name="analyze_code_structure",
description="Analyze code structure and architecture",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
types.Tool(
name="validate_code",
description="Validate code quality and standards",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"validation_type": {
"type": "string",
"enum": ["syntax", "style", "security", "all"]
}
},
"required": ["path"]
}
),
types.Tool(
name="check_syntax",
description="Advanced syntax checking and analysis",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"check_type": {
"type": "string",
"enum": ["all", "tokens", "ast", "imports", "naming"]
}
},
"required": ["path"]
}
),
# Search Tools
types.Tool(
name="search_files",
description="Advanced file search capabilities",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"operation": {
"type": "string",
"enum": ["find", "glob", "pattern", "recent"]
},
"pattern": {"type": "string"},
"recursive": {"type": "boolean"}
},
"required": ["path", "operation"]
}
),
types.Tool(
name="search_content",
description="Search within file contents",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"operation": {
"type": "string",
"enum": ["search", "analyze", "regex", "similar"]
},
"text": {"type": "string"},
"pattern": {"type": "string"}
},
"required": ["path", "operation"]
}
),
# Version Control
types.Tool(
name="version_control",
description="Advanced version control and history management",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"operation": {
"type": "string",
"enum": [
"create_version",
"restore_version",
"get_history",
"compare_versions",
"get_changes",
"cleanup"
]
},
"version_id": {"type": "string"},
"description": {"type": "string"}
},
"required": ["path", "operation"]
}
),
types.Tool(
name="analyze_imports",
description="Analyze import statements and dependencies",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
types.Tool(
name="find_pattern_usages",
description="Find pattern occurrences and analyze usage",
inputSchema={
"type": "object",
"properties": {
"pattern": {"type": "string"},
"pattern_type": {
"type": "string",
"enum": ["all", "code", "variable", "function", "class"]
}
},
"required": ["pattern"]
}
),
types.Tool(
name="find_code_patterns",
description="Detect code patterns and anti-patterns",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
),
types.Tool(
name="find_references",
description="Find code references",
inputSchema={
"type": "object",
"properties": {
"target": {"type": "string"},
"ref_type": {
"type": "string",
"enum": ["all", "class", "function", "variable"]
}
},
"required": ["target"]
}
),
types.Tool(
name="preview_changes",
description="Preview code changes",
inputSchema={
"type": "object",
"properties": {
"pattern": {"type": "string"},
"replacement": {"type": "string"}
},
"required": ["pattern", "replacement"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any] | None) -> List[types.TextContent]:
"""Handle tool execution with improved error handling"""
if not arguments:
return [types.TextContent(type="text", text="Missing arguments")]
try:
# Special handling for file operations
if name == "file_operations":
tool = self.tool_manager.get_tool(name)
if not tool:
return [types.TextContent(type="text", text=f"Tool {name} not found")]
result = await tool.execute(arguments)
return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
# Handle paths for other tools
if "file_path" in arguments:
arguments["file_path"] = self._resolve_path(arguments["file_path"])
if "path" in arguments:
arguments["path"] = self._resolve_path(arguments["path"])
logger.info(f"Executing tool {name} with arguments: {arguments}")
result = await self.tool_manager.execute_tool(name, arguments)
if isinstance(result, dict) and "error" in result:
return [types.TextContent(type="text", text=str(result["error"]))]
return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
except Exception as e:
logger.error(f"Error executing tool {name}: {e}", exc_info=True)
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
def _resolve_path(self, path_str: str) -> str:
"""Resolve path string to absolute path"""
if not path_str or path_str == ".":
return str(self.analyze_paths[0])
try:
path_obj = Path(path_str)
if not path_obj.is_absolute():
path = str((self.analyze_paths[0] / path_obj))
logger.info(f"Resolved path: {path}")
return path
return str(path_obj)
except Exception as e:
logger.error(f"Error resolving path: {e}")
return path_str
async def run(self):
"""Run the MCP server"""
logger.info(f"Starting server with paths: {self.analyze_paths}")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
try:
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="code-analyzer",
server_version="0.1.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
raise
async def main(analyze_paths: List[str]):
"""Main entry point for the MCP server"""
logger.info(f"Starting Code Analyzer with paths: {analyze_paths}")
server = MCPServer(analyze_paths)
await server.run()