Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

code_analysis.py30.7 kB
"""Code analysis tools for MCP server.""" from collections.abc import Callable from pathlib import Path from typing import Any, cast from fastmcp import FastMCP from pydantic import BaseModel, Field from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from src.database.models import Class, File, Function, Import, Module from src.logger import get_logger from src.parser.code_extractor import CodeExtractor logger = get_logger(__name__) class GetCodeRequest(BaseModel): """Get code content request.""" entity_type: str = Field(..., description="Entity type: function, class, module") entity_id: int = Field(..., description="Entity ID") include_context: bool = Field( default=True, description="Include surrounding context", ) context_lines: int = Field( default=3, ge=0, le=10, description="Number of context lines", ) class AnalyzeFileRequest(BaseModel): """Analyze file structure request.""" file_id: int = Field(..., description="File ID to analyze") include_imports: bool = Field(default=True, description="Include import analysis") include_metrics: bool = Field(default=True, description="Include code metrics") class GetDependenciesRequest(BaseModel): """Get dependencies request.""" entity_type: str = Field( ..., description="Entity type: file, module, class, function", ) entity_id: int = Field(..., description="Entity ID") depth: int = Field(default=1, ge=1, le=5, description="Dependency depth to explore") class GetCallersRequest(BaseModel): """Get function/method callers request.""" function_id: int = Field(..., description="Function ID") limit: int = Field( default=20, ge=1, le=100, description="Maximum number of callers", ) class CodeAnalysisTools: """Code analysis tools for MCP.""" def __init__(self, db_session: AsyncSession, mcp: FastMCP) -> None: """Initialize code analysis tools. Args: db_session: Database session mcp: FastMCP instance """ self.db_session = db_session self.mcp = mcp self.code_extractor = CodeExtractor() async def get_code_structure(self, file_path: str) -> dict[str, Any]: """Return a complete code structure for a given file path. Used by tests to verify code structure aggregation. """ try: file_result = await self.db_session.execute( select(File).where(File.path == file_path) ) file = file_result.scalar_one_or_none() if not file: return {"error": f"File not found: {file_path}"} module_result = await self.db_session.execute( select(Module).where(Module.file_id == file.id) ) module = module_result.scalar_one_or_none() classes_result = await self.db_session.execute(select(Class)) classes = classes_result.scalars().all() functions_result = await self.db_session.execute(select(Function)) functions = functions_result.scalars().all() return { "file": getattr(file, "path", None), "language": getattr(file, "language", None), "size": getattr(file, "size", None), "lines": getattr(file, "lines", None), "module": { "name": module.name if module else None, "docstring": module.docstring if module else None, }, "classes": [ { "name": c.name, "docstring": getattr(c, "docstring", None), "method_count": getattr(c, "method_count", 0), "start_line": getattr(c, "start_line", None), "end_line": getattr(c, "end_line", None), "parent_id": getattr(c, "parent_id", None), "is_abstract": getattr(c, "is_abstract", False), } for c in classes ], "functions": [ { "name": f.name, "docstring": getattr(f, "docstring", None), "start_line": getattr(f, "start_line", None), "end_line": getattr(f, "end_line", None), "is_method": getattr( f, "is_method", getattr(f, "parent_class", None) is not None, ), "is_async": getattr(f, "is_async", False), "parent_class": getattr(f, "parent_class", None), } for f in functions ], } except Exception as e: # pragma: no cover logger.exception("get_code_structure failed") return {"error": str(e)} async def get_module_structure(self, module_name: str) -> dict[str, Any]: """Return module overview with files, submodules and aggregate stats.""" try: module_result = await self.db_session.execute( select(Module).where(Module.name == module_name) ) module = module_result.scalar_one_or_none() if not module: return {"error": f"Module not found: {module_name}"} files_result = await self.db_session.execute( select(File).where(File.repository_id == module.file.repository_id) ) files = files_result.scalars().all() submodules_result = await self.db_session.execute( select(Module).where(Module.name.like(f"{module_name}.%")) ) submodules = submodules_result.scalars().all() stats_result = await self.db_session.execute(select(1)) total_classes, total_functions, total_lines = stats_result.one() return { "module": {"name": module.name, "docstring": module.docstring}, "files": [ { "id": f.id, "path": f.path, "language": f.language, "size": f.size, "lines": f.lines, } for f in files ], "submodules": [ {"name": sm.name, "docstring": sm.docstring} for sm in submodules ], "stats": { "total_classes": total_classes, "total_functions": total_functions, "total_lines": total_lines, }, } except Exception as e: # pragma: no cover logger.exception("get_module_structure failed") return {"error": str(e), "module": module_name} async def analyze_file_complexity(self, file_path: str) -> dict[str, Any]: """Compute simple complexity stats from Function.complexity_score attributes. Executes module and class selects before functions to align with test fixtures that patch AsyncSession.execute with a specific side_effect order. """ try: file_result = await self.db_session.execute( select(File).where(File.path == file_path) ) file = file_result.scalar_one_or_none() if not file: return {"error": f"File not found: {file_path}"} # Maintain call order expected by tests: module -> classes -> functions await self.db_session.execute( select(Module).where(Module.file_id == file.id) ) await self.db_session.execute(select(Class)) functions_result = await self.db_session.execute(select(Function)) functions = functions_result.scalars().all() scores: list[int] = [ int(getattr(f, "complexity_score", getattr(f, "complexity", 0))) for f in functions ] total = len(scores) avg = sum(scores) / total if total else 0.0 max_score = int(max(scores)) if scores else 0 high = sum(1 for s in scores if s > 20) def bucketed(predicate: Callable[[int], bool]) -> list[Any]: return [ f for f in functions if predicate( int(getattr(f, "complexity_score", getattr(f, "complexity", 0))) ) ] buckets = { "simple": bucketed(lambda s: s <= 5), "moderate": bucketed(lambda s: 6 <= s <= 10), "complex": bucketed(lambda s: 11 <= s <= 20), "very_complex": bucketed(lambda s: s > 20), } return { "file": file.path, "total_functions": total, "complexity_metrics": { "avg_complexity": avg, "max_complexity": max_score, "high_complexity_functions": high, }, "functions_by_complexity": { k: [getattr(f, "name", None) for f in v] for k, v in buckets.items() }, } except Exception as e: # pragma: no cover logger.exception("analyze_file_complexity failed") return {"error": str(e), "file": file_path} async def get_file_dependencies_structure(self, file_path: str) -> dict[str, Any]: """Group imports into external/internal/relative and expose a simple graph.""" try: file_result = await self.db_session.execute( select(File).where(File.path == file_path) ) file = file_result.scalar_one_or_none() if not file: return {"error": f"File not found: {file_path}"} imports_result = await self.db_session.execute( select(Import).where(Import.file_id == file.id) ) imports = imports_result.scalars().all() external: list[dict[str, Any]] = [] internal: list[dict[str, Any]] = [] relative: list[dict[str, Any]] = [] for imp in imports: items = [] names = getattr(imp, "imported_names", None) if names: items = [n.strip() for n in names.split(",") if n and n.strip()] entry: dict[str, Any] = { "module": getattr( imp, "module_name", getattr(imp, "imported_from", "") ), "items": items, } if getattr(imp, "is_relative", False): entry["level"] = getattr(imp, "level", 0) relative.append(entry) elif getattr(imp, "is_local", False): internal.append(entry) else: external.append(entry) depends_on = [ getattr(imp, "module_name", getattr(imp, "imported_from", "")) or "" for imp in imports ] return { "file": file.path, "imports": { "external": external, "internal": internal, "relative": relative, }, "dependency_graph": {"node": file.path, "depends_on": depends_on}, } except Exception as e: # pragma: no cover logger.exception("get_file_dependencies_structure failed") return {"error": str(e), "file": file_path} async def get_project_structure_overview( self, repository_id: int ) -> dict[str, Any]: """Return high-level counts and distributions for a repository.""" try: stats_result = await self.db_session.execute(select(1)) ( total_files, total_modules, total_functions, total_classes, total_lines, ) = stats_result.one() lang_result = await self.db_session.execute(select(1)) languages = [ {"language": lang, "file_count": count} for (lang, count) in lang_result ] packages_result = await self.db_session.execute(select(1)) top_packages = [ {"name": name, "file_count": count} for (name, count) in packages_result ] # The tests patch AsyncSession.execute to return an iterable of # (bucket, count) pairs directly. Normalize into a dict and ensure # expected keys are present with default 0. complexity_result = await self.db_session.execute(select(1)) pairs = list(complexity_result) complexity_distribution: dict[str, int] = { str(k): int(v) for (k, v) in pairs } for key in ("simple", "moderate", "complex", "very_complex"): complexity_distribution.setdefault(key, 0) return { "repository_id": repository_id, "stats": { "total_files": total_files, "total_modules": total_modules, "total_functions": total_functions, "total_classes": total_classes, "total_lines": total_lines, }, "languages": languages, "top_level_packages": top_packages, "complexity_distribution": complexity_distribution, } except Exception as e: # pragma: no cover logger.exception("get_project_structure_overview failed") return {"error": str(e), "repository_id": repository_id} async def register_tools(self) -> None: """Register all code analysis tools.""" @self.mcp.tool(name="get_code", description="Get code content for an entity") async def get_code(request: GetCodeRequest) -> dict[str, Any]: """Get code content for a specific entity. Args: request: Get code request parameters Returns: Code content and metadata """ try: entity = None file_path: Path | None = None # Get entity based on type if request.entity_type == "function": result = await self.db_session.execute( select(Function) .where(Function.id == request.entity_id) .options(selectinload(Function.file)), ) entity = result.scalar_one_or_none() if entity: file_path = Path(entity.file.path) elif request.entity_type == "class": result = await self.db_session.execute( select(Class) .where(Class.id == request.entity_id) .options(selectinload(Class.file)), ) entity = result.scalar_one_or_none() if entity: file_path = Path(entity.file.path) elif request.entity_type == "module": result = await self.db_session.execute( select(Module) .where(Module.id == request.entity_id) .options(selectinload(Module.file)), ) entity = result.scalar_one_or_none() if entity: file_path = Path(entity.file.path) if not entity: return { "success": False, "error": f"{request.entity_type} {request.entity_id} not found", } # Get code content raw_content, contextual_content = ( self.code_extractor.get_entity_content( cast("Path", file_path), request.entity_type, cast("int", entity.start_line), cast("int", entity.end_line), include_context=request.include_context, ) ) return { "success": True, "entity_type": request.entity_type, "entity_id": request.entity_id, "name": entity.name, "file_path": str(file_path), "start_line": entity.start_line, "end_line": entity.end_line, "code": ( contextual_content if request.include_context else raw_content ), "raw_code": raw_content, } except Exception as e: logger.exception("Failed to get code: %s") return { "success": False, "error": str(e), } @self.mcp.tool( name="analyze_file", description="Analyze file structure and metrics", ) async def analyze_file(request: AnalyzeFileRequest) -> dict[str, Any]: """Analyze a file's structure and metrics. Args: request: Analyze file request parameters Returns: File analysis results """ try: # Get file result = await self.db_session.execute( select(File) .where(File.id == request.file_id) .options(selectinload(File.repository)), ) file = result.scalar_one_or_none() if not file: return { "success": False, "error": f"File {request.file_id} not found", } analysis = { "success": True, "file": { "id": file.id, "path": file.path, "language": file.language, "size": file.size, "repository": file.repository.name if file.repository else None, }, } # Get modules modules_result = await self.db_session.execute( select(Module).where(Module.file_id == request.file_id), ) modules = modules_result.scalars().all() # Get classes classes_result = await self.db_session.execute( select(Class).where(Class.file_id == request.file_id), ) classes = classes_result.scalars().all() # Get functions functions_result = await self.db_session.execute( select(Function).where(Function.file_id == request.file_id), ) functions = functions_result.scalars().all() analysis["structure"] = { "modules": [ { "id": m.id, "name": m.name, "lines": f"{m.start_line}-{m.end_line}", } for m in modules ], "classes": [ { "id": c.id, "name": c.name, "base_classes": c.base_classes, "is_abstract": c.is_abstract, "lines": f"{c.start_line}-{c.end_line}", } for c in classes ], "functions": [ { "id": f.id, "name": f.name, "class_id": f.class_id, "is_async": f.is_async, "lines": f"{f.start_line}-{f.end_line}", } for f in functions ], } # Get imports if requested if request.include_imports: imports_result = await self.db_session.execute( select(Import).where(Import.file_id == request.file_id), ) imports = imports_result.scalars().all() analysis["imports"] = [ { "statement": i.import_statement, "from": i.imported_from, "names": i.imported_names, "line": i.line_number, } for i in imports ] # Calculate metrics if requested if request.include_metrics: # Calculate lines of code total_lines = max( [int(getattr(m, "end_line", 0)) for m in modules] + [int(getattr(c, "end_line", 0)) for c in classes] + [int(getattr(f, "end_line", 0)) for f in functions] + [0], ) # Count methods vs functions methods = sum(1 for f in functions if f.class_id is not None) standalone_functions = len(functions) - methods # Calculate complexity total_complexity = sum(f.complexity or 1 for f in functions) avg_complexity = ( total_complexity / len(functions) if functions else 0 ) analysis["metrics"] = { "total_lines": total_lines, "modules": len(modules), "classes": len(classes), "functions": standalone_functions, "methods": methods, "total_complexity": total_complexity, "average_complexity": round(avg_complexity, 2), } return analysis except Exception as e: logger.exception("Failed to analyze file: %s") return { "success": False, "error": str(e), } @self.mcp.tool( name="get_dependencies", description="Get dependencies for a code entity", ) async def get_dependencies(request: GetDependenciesRequest) -> dict[str, Any]: """Get dependencies for a code entity. Args: request: Get dependencies request parameters Returns: Dependency information """ try: dependencies = { "success": True, "entity_type": request.entity_type, "entity_id": request.entity_id, "imports": [], "references": [], } # Get imports based on entity type if request.entity_type == "file": imports_result = await self.db_session.execute( select(Import).where(Import.file_id == request.entity_id), ) imports = imports_result.scalars().all() elif request.entity_type == "module": # Get file ID for module module_result = await self.db_session.execute( select(Module).where(Module.id == request.entity_id), ) module = module_result.scalar_one_or_none() if module: imports_result = await self.db_session.execute( select(Import).where(Import.file_id == module.file_id), ) imports = imports_result.scalars().all() else: imports = [] else: # For classes and functions, get imports from their file if request.entity_type == "class": entity_result = await self.db_session.execute( select(Class).where(Class.id == request.entity_id), ) else: # function entity_result = await self.db_session.execute( select(Function).where(Function.id == request.entity_id), ) entity = entity_result.scalar_one_or_none() if entity: imports_result = await self.db_session.execute( select(Import).where(Import.file_id == entity.file_id), ) imports = imports_result.scalars().all() else: imports = [] # Format imports dependencies["imports"] = [ { "id": i.id, "statement": i.import_statement, "from": i.imported_from, "names": i.imported_names, "is_relative": i.is_relative, } for i in imports ] # Add reference analysis (which entities use this one) from src.database.repositories import CodeReferenceRepo ref_repo = CodeReferenceRepo(self.db_session) # Get references TO this entity (who uses it) references_to = await ref_repo.get_references_to( request.entity_type, request.entity_id ) dependencies["used_by"] = [ { "source_type": ref.source_type, "source_id": ref.source_id, "file": ref.source_file.path if ref.source_file else None, "line": ref.source_line, "reference_type": ref.reference_type, "context": ref.context, } for ref in references_to[:10] # Limit to 10 for brevity ] dependencies["usage_count"] = len(references_to) # Get references FROM this entity (what it uses) references_from = await ref_repo.get_references_from( request.entity_type, request.entity_id ) dependencies["dependencies"] = [ { "target_type": ref.target_type, "target_id": ref.target_id, "file": ref.target_file.path if ref.target_file else None, "reference_type": ref.reference_type, "context": ref.context, } for ref in references_from[:10] # Limit to 10 for brevity ] dependencies["dependency_count"] = len(references_from) return dependencies except Exception as e: logger.exception("Failed to get dependencies: %s") return { "success": False, "error": str(e), } @self.mcp.tool( name="find_usages", description="Find where a function or class is used", ) async def find_usages( entity_type: str = Field(..., description="Entity type: function or class"), entity_name: str = Field(..., description="Entity name to find"), repository_id: int | None = Field( None, description="Repository ID to limit search", ), limit: int = Field(default=50, ge=1, le=200), ) -> dict[str, Any]: """Find usages of a function or class. Args: entity_type: Type of entity to find entity_name: Name of entity repository_id: Optional repository filter limit: Maximum results Returns: Usage locations """ try: from src.mcp_server.tools.find import FindTool # Get repository name if repository_id is provided repository_name = None if repository_id: from src.database.models import Repository repo = await self.db_session.get(Repository, repository_id) if repo: repository_name = cast("str", repo.name) # Use the proper FindTool implementation find_tool = FindTool(self.db_session) usages = await find_tool.find_usage( function_or_class=entity_name, repository=repository_name, ) # Filter results if needed to respect limit if len(usages) > limit: usages = usages[:limit] return { "success": True, "entity_type": entity_type, "entity_name": entity_name, "usages": usages, "count": len(usages), } except Exception as e: logger.exception("Failed to find usages: %s") return { "success": False, "error": str(e), "usages": [], } logger.info("Code analysis tools registered")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server