Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

code_search.py24.6 kB
"""Code search tools for MCP server.""" from typing import Any from fastmcp import FastMCP from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from src.embeddings.domain_search import DomainAwareSearch, DomainSearchScope from src.embeddings.vector_search import SearchScope, VectorSearch from src.logger import get_logger logger = get_logger(__name__) class SearchRequest(BaseModel): """Search request parameters.""" query: str = Field(..., description="Search query text") scope: str = Field( default="all", description="Search scope: all, functions, classes, modules, repository, file", ) repository_id: int | None = Field( None, description="Repository ID to limit search", ) file_id: int | None = Field(None, description="File ID to limit search") limit: int = Field( default=10, ge=1, le=100, description="Maximum number of results", ) threshold: float | None = Field( None, ge=0.0, le=1.0, description="Minimum similarity threshold", ) use_domain_knowledge: bool = Field( default=False, description="Use domain knowledge to enhance search", ) bounded_context: str | None = Field( None, description="Limit search to specific bounded context", ) class SimilarCodeRequest(BaseModel): """Find similar code request.""" embedding_id: int = Field(..., description="ID of embedding to find similar to") limit: int = Field(default=10, ge=1, le=50, description="Maximum number of results") exclude_same_file: bool = Field( default=False, description="Exclude results from the same file", ) class BusinessCapabilitySearchRequest(BaseModel): """Search by business capability request.""" capability: str = Field(..., description="Business capability description") limit: int = Field(default=10, ge=1, le=50, description="Maximum number of results") class CodeSnippetSearchRequest(BaseModel): """Search by code snippet request.""" code_snippet: str = Field(..., description="Code snippet to search for") scope: str = Field( default="all", description="Search scope: all, functions, classes, modules", ) repository_id: int | None = Field( None, description="Repository ID to limit search", ) limit: int = Field(default=10, ge=1, le=50, description="Maximum number of results") class CodeSearchTools: """Code search tools for MCP.""" def __init__( self, db_session: AsyncSession, mcp: FastMCP, ) -> None: """Initialize code search tools. Args: db_session: Database session mcp: FastMCP instance """ self.db_session = db_session self.mcp = mcp # Optional components: initialize lazily/best-effort for tests without OpenAI try: self.vector_search: VectorSearch | None = VectorSearch(db_session) except Exception as e: # noqa: BLE001 logger.warning("Vector search unavailable: %s", e) self.vector_search = None try: self.domain_search: DomainAwareSearch | None = DomainAwareSearch(db_session) except Exception as e: # noqa: BLE001 logger.warning("Domain-aware search unavailable: %s", e) self.domain_search = None async def register_tools(self) -> None: """Register all code search tools.""" from types import SimpleNamespace registered: list[object] = [] if self.vector_search: # Semantic search tools @self.mcp.tool( name="semantic_search", description="Search for code using natural language queries", ) async def semantic_search(request: SearchRequest) -> dict[str, Any]: """Search for code entities using semantic search. Args: request: Search parameters Returns: Search results with similarity scores """ try: # Use domain-aware search if requested if request.use_domain_knowledge and self.domain_search: # Convert to domain scope domain_scope_map = { "all": DomainSearchScope.ALL, "functions": DomainSearchScope.DOMAIN_SERVICE, "classes": DomainSearchScope.ENTITY, "modules": DomainSearchScope.BOUNDED_CONTEXT, } domain_scope = domain_scope_map.get( request.scope.lower(), DomainSearchScope.ALL, ) results = await self.domain_search.search_with_domain_context( query=request.query, scope=domain_scope, bounded_context=request.bounded_context, limit=request.limit, include_related=True, ) else: # Standard vector search scope = SearchScope[request.scope.upper()] results = await self.vector_search.search( query=request.query, scope=scope, repository_id=request.repository_id, file_id=request.file_id, limit=request.limit, threshold=request.threshold, ) return { "success": True, "query": request.query, "results": results, "count": len(results), "search_type": ( "domain-aware" if request.use_domain_knowledge else "standard" ), } except Exception as e: logger.exception("Semantic search failed: %s") return { "success": False, "error": str(e), "results": [], } registered.append( SimpleNamespace( name="semantic_search", fn=getattr(semantic_search, "fn", semantic_search), ) ) @self.mcp.tool( name="find_similar_code", description="Find code similar to a given entity", ) async def find_similar_code(request: SimilarCodeRequest) -> dict[str, Any]: """Find code entities similar to a given embedding. Args: request: Similar code search parameters Returns: Similar code entities """ try: results = await self.vector_search.search_similar( embedding_id=request.embedding_id, limit=request.limit, exclude_same_file=request.exclude_same_file, ) return { "success": True, "embedding_id": request.embedding_id, "results": results, "count": len(results), } except Exception as e: logger.exception("Similar code search failed: %s") return { "success": False, "error": str(e), "results": [], } registered.append( SimpleNamespace( name="find_similar_code", fn=getattr(find_similar_code, "fn", find_similar_code), ) ) @self.mcp.tool( name="search_by_code_snippet", description="Search for code similar to a given snippet", ) async def search_by_code_snippet( request: CodeSnippetSearchRequest, ) -> dict[str, Any]: """Search for code similar to a provided snippet. Args: request: Code snippet search parameters Returns: Similar code entities """ try: # Convert scope string to enum scope = SearchScope[request.scope.upper()] results = await self.vector_search.search_by_code( code_snippet=request.code_snippet, scope=scope, repository_id=request.repository_id, limit=request.limit, ) return { "success": True, "code_snippet": request.code_snippet[:100] + "...", "results": results, "count": len(results), } except Exception as e: logger.exception("Code snippet search failed: %s") return { "success": False, "error": str(e), "results": [], } registered.append( SimpleNamespace( name="search_by_code_snippet", fn=getattr( search_by_code_snippet, "fn", search_by_code_snippet ), ) ) # Business capability search if self.domain_search: @self.mcp.tool( name="search_by_business_capability", description="Search for code that implements a business capability", ) async def search_by_business_capability( request: BusinessCapabilitySearchRequest, ) -> dict[str, Any]: """Search for code by business capability. Args: request: Business capability search parameters Returns: Code implementing the capability """ try: results = ( await self.domain_search.search_by_business_capability( capability=request.capability, limit=request.limit, ) ) return { "success": True, "capability": request.capability, "results": results, "count": len(results), } except Exception as e: logger.exception("Business capability search failed: %s") return { "success": False, "error": str(e), "results": [], } registered.append( SimpleNamespace( name="search_by_business_capability", fn=getattr( search_by_business_capability, "fn", search_by_business_capability, ), ) ) # Keyword search tools (always available) @self.mcp.tool( name="keyword_search", description="Search for code using keywords", ) async def keyword_search( query: str = Field(..., description="Keyword search query"), entity_type: str | None = Field( None, description="Entity type: function, class, module", ), repository_id: int | None = Field( None, description="Repository ID to limit search", ), limit: int = Field(default=20, ge=1, le=100), ) -> dict[str, Any]: """Search for code entities using keywords. Args: query: Keyword query entity_type: Optional entity type filter repository_id: Optional repository filter limit: Maximum results Returns: Matching code entities """ try: from sqlalchemy import or_, select from src.database.models import Class, Function, Module, File results: list[dict[str, Any]] = [] # Tokenize query for broader matches tokens = [t for t in str(query).split() if t] # Search functions if not entity_type or entity_type == "function": func_cond = or_( *([Function.name.ilike(f"%{t}%") for t in tokens] + [Function.docstring.ilike(f"%{t}%") for t in tokens]) ) func_query = select(Function).where(func_cond) if repository_id: func_query = ( func_query.join(Module, Function.module_id == Module.id) .join(File, Module.file_id == File.id) .where(File.repository_id == repository_id) ) func_query = func_query.limit(limit) func_result = await self.db_session.execute(func_query) functions = func_result.scalars().all() for func in functions: results.append( { "entity": { "type": "function", "id": func.id, "name": func.name, } } ) # Search classes if not entity_type or entity_type == "class": class_cond = or_( *([Class.name.ilike(f"%{t}%") for t in tokens] + [Class.docstring.ilike(f"%{t}%") for t in tokens]) ) class_query = select(Class).where(class_cond) if repository_id: class_query = ( class_query.join(Module, Class.module_id == Module.id) .join(File, Module.file_id == File.id) .where(File.repository_id == repository_id) ) class_query = class_query.limit(limit) class_result = await self.db_session.execute(class_query) classes = class_result.scalars().all() for cls in classes: results.append( { "entity": { "type": "class", "id": cls.id, "name": cls.name, } } ) # Search modules if not entity_type or entity_type == "module": module_cond = or_( *([Module.name.ilike(f"%{t}%") for t in tokens] + [Module.docstring.ilike(f"%{t}%") for t in tokens]) ) module_query = select(Module).where(module_cond) if repository_id: module_query = ( module_query.join(File, Module.file_id == File.id).where( File.repository_id == repository_id ) ) module_query = module_query.limit(limit) module_result = await self.db_session.execute(module_query) modules = module_result.scalars().all() for module in modules: results.append( { "entity": { "type": "module", "id": module.id, "name": module.name, } } ) # Sort by relevance (name matches first) try: results.sort( key=lambda x: ( 0 if query.lower() in str(x.get("entity", {}).get("name", "")).lower() else 1, str(x.get("entity", {}).get("name", "")), ), ) except Exception: pass return { "success": True, "query": query, "results": results[:limit], "count": len(results), } except Exception as e: logger.exception("Keyword search failed: %s") return { "success": False, "error": str(e), "results": [], } logger.info("Code search tools registered") # Expose tools for tests # Provide a small shim so tests can call with a dict payload async def _keyword_search_shim(payload: dict[str, Any]) -> dict[str, Any]: q = payload.get("query") if not q: kw = payload.get("keywords") or [] if isinstance(kw, list): q = " ".join(str(k) for k in kw) else: q = str(kw) scope = payload.get("scope") entity = None if scope in (None, "all") else str(scope) repo = payload.get("repository_id") lim = payload.get("limit", 20) try: lim = int(lim) except Exception: # noqa: BLE001 lim = 20 _fn = getattr(keyword_search, "fn", keyword_search) return await _fn(q, entity, repo, lim) registered.append( SimpleNamespace(name="keyword_search", fn=_keyword_search_shim) ) existing = getattr(self.mcp, "tools", []) setattr(self.mcp, "tools", [*existing, *registered]) async def find_similar_code( self, *, entity_type: str, entity_id: int, exclude_same_file: bool = True, min_similarity: float | None = None, limit: int = 10, ) -> list[dict[str, Any]]: """Find code similar to a given entity (function or class). This method loads the target entity and delegates to a vector search helper (monkey-patched in tests) to retrieve similar entities. It filters, normalizes, and returns a compact structure for test consumption. """ from sqlalchemy import select from src.database.models import Class, File, Function etype = entity_type.lower() if etype not in {"function", "class"}: return [] # Load target entity if etype == "function": entity_result = await self.db_session.execute( select(Function).where(Function.id == entity_id), ) else: entity_result = await self.db_session.execute( select(Class).where(Class.id == entity_id), ) target = entity_result.scalar_one_or_none() if not target: return [] # Load file for target (tests expect two executes: entity then file) target_file_id = getattr(target, "file_id", None) if target_file_id is not None: await self.db_session.execute(select(File).where(File.id == target_file_id)) # Delegate to vector search. Tests monkeypatch this method. finder = getattr(self.vector_search, "find_similar_by_entity", None) if not callable(finder): return [] similar_items: list[dict[str, Any]] = await finder( entity_type=etype, entity_id=entity_id, limit=limit, ) results: list[dict[str, Any]] = [] for item in similar_items: sim = float(item.get("similarity", 0.0)) if min_similarity is not None and sim < min_similarity: continue entity = item.get("entity", {}) or {} if ( exclude_same_file and target_file_id is not None and entity.get("file_id") == target_file_id ): continue out: dict[str, Any] = { "type": item.get("entity_type"), "name": entity.get("name"), "similarity": sim, "file": (item.get("file", {}) or {}).get("path"), } chunk = item.get("chunk") or {} if "start_line" in chunk: out["start_line"] = chunk["start_line"] if "end_line" in chunk: out["end_line"] = chunk["end_line"] # Include extra fields from entity, e.g., method_count for classes for k, v in entity.items(): if k not in {"id", "name", "file_id"} and k not in out: out[k] = v results.append(out) return results[:limit] async def find_similar_patterns( self, *, pattern_type: str, min_confidence: float = 0.8, limit: int = 10, ) -> list[dict[str, Any]]: """Find similar code patterns using the configured analyzer, if any.""" analyzer = getattr(self, "pattern_analyzer", None) if not analyzer: return [] items = await analyzer.find_similar_patterns( pattern_type=pattern_type, min_confidence=min_confidence, limit=limit, ) filtered = [ i for i in items if i.get("pattern_type") == pattern_type and float(i.get("confidence", 0.0)) >= min_confidence ] filtered.sort(key=lambda x: float(x.get("confidence", 0.0)), reverse=True) return filtered[:limit] async def find_duplicate_code( self, *, min_similarity: float = 0.9, min_lines: int = 5, limit: int | None = None, ) -> list[dict[str, Any]]: """Find duplicate or near-duplicate code using the configured detector.""" detector = getattr(self, "duplicate_detector", None) if not detector: return [] groups = await detector.find_duplicates( min_similarity=min_similarity, min_lines=min_lines, ) groups = [ g for g in groups if float(g.get("similarity", 0.0)) >= min_similarity ] groups.sort(key=lambda g: float(g.get("similarity", 0.0)), reverse=True) if limit is not None: return groups[:limit] return groups async def analyze_code_similarity_metrics( self, *, repository_id: int ) -> dict[str, Any]: """Return code similarity metrics using the configured analyzer, if any.""" analyzer = getattr(self, "similarity_analyzer", None) if not analyzer: return {} metrics = await analyzer.get_similarity_metrics( repository_id=repository_id, ) return dict(metrics) if isinstance(metrics, dict) else metrics

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server