MCP Codebase Insight

by tosin2013
Verified
import json import logging from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any, Optional, List from datetime import datetime import time from mcp.server import Server from mcp.server.fastmcp import Context, FastMCP from qdrant_client import QdrantClient from sentence_transformers import SentenceTransformer from .core import ( ServerConfig, EmbeddingProvider, VectorStore, CacheManager, HealthMonitor, MetricsCollector, ErrorContext, handle_error ) from .utils.logger import get_logger logger = get_logger(__name__) class CodebaseAnalyzer: """Analyzes code patterns and architecture.""" def __init__( self, vector_store: VectorStore, cache_manager: CacheManager, metrics_collector: MetricsCollector ): self.vector_store = vector_store self.cache_manager = cache_manager self.metrics_collector = metrics_collector async def analyze_patterns(self, code_text: str) -> Dict[str, Any]: """Analyze code patterns in the given text.""" start_time = time.time() try: # Try cache first cached_result = await self.cache_manager.result_cache.get_result( "analyze_patterns", code_text ) if cached_result: await self.metrics_collector.record_cache_access(hit=True) return cached_result await self.metrics_collector.record_cache_access(hit=False) # Get embedding vector = await self.vector_store.embed_text(code_text) await self.metrics_collector.record_vector_query() # Search for similar patterns similar_patterns = await self.vector_store.search( vector, filter_params={"must": [{"key": "type", "match": {"value": "pattern"}}]}, limit=5 ) result = { "patterns_found": len(similar_patterns), "matches": [ { "pattern": p.payload.get("pattern_name", "Unknown"), "description": p.payload.get("description", ""), "similarity": p.score, "examples": p.payload.get("examples", []) } for p in similar_patterns ] } # Cache the result await self.cache_manager.result_cache.store_result( "analyze_patterns", result, code_text ) # Record metrics duration = time.time() - start_time await self.metrics_collector.record_request( tool_name="analyze_patterns", duration=duration, success=True, metadata={ "patterns_found": len(similar_patterns) } ) return result except Exception as e: # Record error metrics duration = time.time() - start_time await self.metrics_collector.record_request( tool_name="analyze_patterns", duration=duration, success=False, error=str(e) ) raise async def detect_architecture(self, codebase_path: str) -> Dict[str, Any]: """Detect architectural patterns in a codebase.""" start_time = time.time() try: # Try cache first cached_result = await self.cache_manager.result_cache.get_result( "detect_architecture", codebase_path ) if cached_result: await self.metrics_collector.record_cache_access(hit=True) return cached_result await self.metrics_collector.record_cache_access(hit=False) # This is a placeholder - actual implementation would analyze # the entire codebase structure result = { "architecture": "layered", "patterns": ["MVC", "Repository"], "components": ["controllers", "models", "views"] } # Cache the result await self.cache_manager.result_cache.store_result( "detect_architecture", result, codebase_path ) # Record metrics duration = time.time() - start_time await self.metrics_collector.record_request( tool_name="detect_architecture", duration=duration, success=True ) return result except Exception as e: # Record error metrics duration = time.time() - start_time await self.metrics_collector.record_request( tool_name="detect_architecture", duration=duration, success=False, error=str(e) ) raise @asynccontextmanager async def server_lifespan(server: Server) -> AsyncIterator[Dict]: """Initialize server components and manage their lifecycle.""" config = ServerConfig.from_env() cache_manager = None health_monitor = None metrics_collector = None try: # Initialize vector store embedding_model = SentenceTransformer(config.embedding_model) embedder = EmbeddingProvider(embedding_model) # Initialize Qdrant client qdrant_client = QdrantClient( url=config.qdrant_url, timeout=config.qdrant_timeout ) vector_store = VectorStore(qdrant_client, embedder, config.collection_name) await vector_store.initialize() # Initialize supporting components cache_manager = CacheManager(config.to_dict()) health_monitor = HealthMonitor(config) metrics_collector = MetricsCollector() # Initialize analyzer analyzer = CodebaseAnalyzer( vector_store=vector_store, cache_manager=cache_manager, metrics_collector=metrics_collector ) yield { "config": config, "vector_store": vector_store, "cache_manager": cache_manager, "health_monitor": health_monitor, "metrics_collector": metrics_collector, "analyzer": analyzer } finally: if vector_store: await vector_store.close() if cache_manager: await cache_manager.clear_all() if metrics_collector: await metrics_collector.reset() # Create FastMCP instance with lifespan management mcp = FastMCP(lifespan=server_lifespan) # Tool Schemas analyze_patterns_schema = { "type": "object", "properties": { "code": { "type": "string", "description": "Code text to analyze for patterns", } }, "required": ["code"], } detect_architecture_schema = { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the codebase to analyze", } }, "required": ["path"], } health_check_schema = { "type": "object", "properties": { "force": { "type": "boolean", "description": "Force a new health check", "default": False } } } metrics_schema = { "type": "object", "properties": {} } # Tool Implementations @mcp.tool(name="analyze-patterns", description="Analyze code for common patterns") async def analyze_patterns(ctx: Context, code: str) -> Dict[str, Any]: """Analyze code text for common patterns.""" analyzer: CodebaseAnalyzer = ctx.request_context.lifespan_context["analyzer"] return await analyzer.analyze_patterns(code) @mcp.tool(name="detect-architecture", description="Detect architectural patterns in a codebase") async def detect_architecture(ctx: Context, path: str) -> Dict[str, Any]: """Detect architectural patterns in a codebase.""" analyzer: CodebaseAnalyzer = ctx.request_context.lifespan_context["analyzer"] return await analyzer.detect_architecture(path) @mcp.tool(name="health-check", description="Check server health status") async def health_check(ctx: Context, force: bool = False) -> Dict[str, Any]: """Check the health status of server components.""" health_monitor: HealthMonitor = ctx.request_context.lifespan_context["health_monitor"] return await health_monitor.check_health(force) @mcp.tool(name="get-metrics", description="Get server performance metrics") async def get_metrics(ctx: Context) -> Dict[str, Any]: """Get server performance metrics.""" metrics_collector: MetricsCollector = ctx.request_context.lifespan_context["metrics_collector"] return await metrics_collector.get_all_metrics()