server.py•20.2 kB
"""
MCP Server for semantic code search and indexing
This server implements the Model Context Protocol, making code search
available to any MCP-compatible LLM client.
"""
import asyncio
import os
from typing import Any, Optional
from pathlib import Path
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcpindexer.indexer import MultiRepoIndexer
from mcpindexer.embeddings import EmbeddingStore
# Global state for the server
_multi_indexer: Optional[MultiRepoIndexer] = None
_embedding_store: Optional[EmbeddingStore] = None
def get_indexer() -> MultiRepoIndexer:
"""Get or create the multi-repo indexer"""
global _multi_indexer, _embedding_store
if _multi_indexer is None:
# Initialize embedding store
db_path = os.getenv("MCP_INDEXER_DB_PATH", "./mcp_index_data")
_embedding_store = EmbeddingStore(
db_path=db_path,
collection_name="mcp_code_index"
)
_multi_indexer = MultiRepoIndexer(embedding_store=_embedding_store)
return _multi_indexer
# Initialize MCP server
app = Server("mcpindexer")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List all available MCP tools"""
return [
Tool(
name="semantic_search",
description="Search for code using natural language queries across indexed repositories",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language search query"
},
"repos": {
"type": "array",
"items": {"type": "string"},
"description": "Optional: List of repo names to search. If empty, searches all."
},
"language": {
"type": "string",
"description": "Optional: Filter by language (python, javascript, typescript, ruby, go)"
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 10
}
},
"required": ["query"]
}
),
Tool(
name="find_definition",
description="Find the definition of a function, class, or variable",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "The symbol name to find"
},
"repos": {
"type": "array",
"items": {"type": "string"},
"description": "Optional: List of repo names to search"
}
},
"required": ["symbol"]
}
),
Tool(
name="find_references",
description="Find all references/usages of a symbol (searches by semantic similarity)",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "The symbol name to find references for"
},
"repos": {
"type": "array",
"items": {"type": "string"},
"description": "Optional: List of repo names to search"
}
},
"required": ["symbol"]
}
),
Tool(
name="find_related_code",
description="Find architecturally related code for a given file",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file"
},
"repo": {
"type": "string",
"description": "Repository name"
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 10
}
},
"required": ["file_path", "repo"]
}
),
Tool(
name="get_repo_stats",
description="Get statistics for a repository (files, chunks, dependencies)",
inputSchema={
"type": "object",
"properties": {
"repo_name": {
"type": "string",
"description": "Repository name"
}
},
"required": ["repo_name"]
}
),
Tool(
name="reindex_repo",
description="Trigger reindexing of a specific repository",
inputSchema={
"type": "object",
"properties": {
"repo_name": {
"type": "string",
"description": "Name of the repository to reindex"
},
"force": {
"type": "boolean",
"description": "Force full reindex (default: false)",
"default": False
}
},
"required": ["repo_name"]
}
),
Tool(
name="add_repo_to_stack",
description="Add a new repository to the user's stack and index it",
inputSchema={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Local path to the repository"
},
"repo_name": {
"type": "string",
"description": "Name to identify the repository"
}
},
"required": ["repo_path", "repo_name"]
}
),
Tool(
name="remove_repo",
description="Remove a repository from the stack",
inputSchema={
"type": "object",
"properties": {
"repo_name": {
"type": "string",
"description": "Name of the repository to remove"
}
},
"required": ["repo_name"]
}
),
Tool(
name="list_repos",
description="List all repositories in the current stack",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_cross_repo_dependencies",
description="Find dependencies between indexed repositories",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="suggest_missing_repos",
description="Suggest repositories to add based on dependency analysis",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_stack_status",
description="Get overall stack status and statistics",
inputSchema={
"type": "object",
"properties": {}
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls"""
try:
indexer = get_indexer()
if name == "semantic_search":
query = arguments.get("query", "")
repos = arguments.get("repos", None)
language = arguments.get("language", None)
limit = arguments.get("limit", 10)
# Perform semantic search
results = indexer.embedding_store.semantic_search(
query=query,
n_results=limit,
repo_filter=repos,
language_filter=language
)
if not results:
return [TextContent(
type="text",
text=f"No results found for query: '{query}'"
)]
# Format results
output = [f"Found {len(results)} results for '{query}':\n"]
for i, result in enumerate(results, 1):
output.append(f"\n{i}. {result.file_path}")
if result.symbol_name:
output.append(f" Symbol: {result.symbol_name}")
output.append(f" Repo: {result.repo_name}")
output.append(f" Lines: {result.metadata.get('start_line', '?')}-{result.metadata.get('end_line', '?')}")
output.append(f" Relevance: {result.score:.4f}")
# Code preview
preview = result.code_text[:200].replace('\n', '\n ')
output.append(f" Code:\n {preview}")
if len(result.code_text) > 200:
output.append(" ...")
return [TextContent(type="text", text="\n".join(output))]
elif name == "find_definition":
symbol = arguments.get("symbol", "")
repos = arguments.get("repos", None)
# Find by exact symbol name
results = indexer.embedding_store.find_by_symbol(
symbol_name=symbol,
repo_filter=repos
)
if not results:
return [TextContent(
type="text",
text=f"No definition found for symbol: '{symbol}'"
)]
# Format results
output = [f"Found {len(results)} definition(s) for '{symbol}':\n"]
for i, result in enumerate(results, 1):
output.append(f"\n{i}. {result.file_path}:{result.metadata.get('start_line', '?')}")
output.append(f" Repo: {result.repo_name}")
output.append(f" Type: {result.metadata.get('chunk_type', 'unknown')}")
output.append(f" Code:\n {result.code_text[:300].replace(chr(10), chr(10) + ' ')}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "find_references":
symbol = arguments.get("symbol", "")
repos = arguments.get("repos", None)
# Use semantic search to find references
results = indexer.embedding_store.semantic_search(
query=f"usage of {symbol} function or class",
n_results=10,
repo_filter=repos
)
if not results:
return [TextContent(
type="text",
text=f"No references found for symbol: '{symbol}'"
)]
output = [f"Found {len(results)} potential reference(s) to '{symbol}':\n"]
for i, result in enumerate(results, 1):
output.append(f"\n{i}. {result.file_path}:{result.metadata.get('start_line', '?')}")
output.append(f" Repo: {result.repo_name}")
if result.symbol_name:
output.append(f" In: {result.symbol_name}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "find_related_code":
file_path = arguments.get("file_path", "")
repo = arguments.get("repo", "")
limit = arguments.get("limit", 10)
# Find related code by file
results = indexer.embedding_store.find_related_by_file(
file_path=file_path,
repo_name=repo,
n_results=limit
)
if not results:
return [TextContent(
type="text",
text=f"No related code found for {file_path}"
)]
output = [f"Found {len(results)} related file(s) to {file_path}:\n"]
for i, result in enumerate(results, 1):
output.append(f"\n{i}. {result.file_path}")
if result.symbol_name:
output.append(f" Symbol: {result.symbol_name}")
output.append(f" Similarity: {result.score:.4f}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "get_repo_stats":
repo_name = arguments.get("repo_name", "")
stats = indexer.get_repo_stats(repo_name)
if not stats:
return [TextContent(
type="text",
text=f"Repository '{repo_name}' not found"
)]
output = [f"Statistics for '{repo_name}':\n"]
output.append(f" Path: {stats.get('repo_path', 'N/A')}")
output.append(f" Git branch: {stats.get('git_branch', 'N/A')}")
output.append(f" Git commit: {stats.get('git_commit', 'N/A')[:8] if stats.get('git_commit') else 'N/A'}")
output.append(f" Files indexed: {stats.get('files_indexed', 0)}")
output.append(f" Chunks indexed: {stats.get('chunks_indexed', 0)}")
output.append(f" Languages: {', '.join(stats.get('languages', []))}")
if 'dependencies' in stats:
deps = stats['dependencies']
output.append(f"\n Dependencies:")
output.append(f" Total: {deps.get('total_dependencies', 0)}")
output.append(f" Internal: {deps.get('internal_dependencies', 0)}")
output.append(f" External: {deps.get('external_dependencies', 0)}")
output.append(f" External packages: {deps.get('external_packages', 0)}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "reindex_repo":
repo_name = arguments.get("repo_name", "")
force = arguments.get("force", False)
if repo_name not in indexer.repo_indexers:
return [TextContent(
type="text",
text=f"Repository '{repo_name}' not found in stack. Use add_repo_to_stack first."
)]
repo_indexer = indexer.repo_indexers[repo_name]
result = repo_indexer.reindex(force=force)
output = [f"Reindexed '{repo_name}':\n"]
output.append(f" Files processed: {result.files_processed}")
output.append(f" Files skipped: {result.files_skipped}")
output.append(f" Chunks created: {result.chunks_created}")
output.append(f" Chunks indexed: {result.chunks_indexed}")
if result.errors:
output.append(f"\n Errors ({len(result.errors)}):")
for error in result.errors[:5]:
output.append(f" - {error}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "add_repo_to_stack":
repo_path = arguments.get("repo_path", "")
repo_name = arguments.get("repo_name", "")
# Verify path exists
if not Path(repo_path).exists():
return [TextContent(
type="text",
text=f"Error: Path '{repo_path}' does not exist"
)]
# Add and index the repo
result = indexer.add_repo(
repo_path=repo_path,
repo_name=repo_name,
auto_index=True
)
output = [f"Added and indexed '{repo_name}':\n"]
output.append(f" Path: {repo_path}")
output.append(f" Files processed: {result.files_processed}")
output.append(f" Files skipped: {result.files_skipped}")
output.append(f" Chunks indexed: {result.chunks_indexed}")
if result.errors:
output.append(f"\n Errors: {len(result.errors)}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "remove_repo":
repo_name = arguments.get("repo_name", "")
deleted_count = indexer.remove_repo(repo_name)
return [TextContent(
type="text",
text=f"Removed '{repo_name}' from stack. Deleted {deleted_count} chunks."
)]
elif name == "list_repos":
repos = indexer.list_repos()
if not repos:
return [TextContent(
type="text",
text="No repositories indexed. Use add_repo_to_stack to add one."
)]
output = [f"Indexed repositories ({len(repos)}):\n"]
for repo in repos:
stats = indexer.embedding_store.get_repo_stats(repo)
output.append(f"\n • {repo}")
output.append(f" Files: {len(stats.get('files', []))}")
output.append(f" Chunks: {stats.get('chunk_count', 0)}")
output.append(f" Languages: {', '.join(stats.get('languages', []))}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "get_cross_repo_dependencies":
cross_deps = indexer.get_cross_repo_dependencies()
if not cross_deps:
return [TextContent(
type="text",
text="No cross-repository dependencies found."
)]
output = [f"Found {len(cross_deps)} cross-repository dependencies:\n"]
for dep in cross_deps:
output.append(f"\n {dep['source_repo']} → {dep['target_repo']}")
output.append(f" Package: {dep['package']}")
return [TextContent(type="text", text="\n".join(output))]
elif name == "suggest_missing_repos":
suggestions = indexer.suggest_missing_repos()
if not suggestions:
return [TextContent(
type="text",
text="No missing repositories detected. Your stack looks complete!"
)]
output = [f"Suggested repositories to add ({len(suggestions)}):\n"]
for suggestion in suggestions:
output.append(f"\n • {suggestion}")
output.append("\n\nThese packages are imported but not indexed. Consider adding them to your stack.")
return [TextContent(type="text", text="\n".join(output))]
elif name == "get_stack_status":
stats = indexer.get_stack_status()
output = [f"Repository Stack Status:\n"]
output.append(f"\n Total Repositories: {stats['total_repos']}")
output.append(f" Total Files Indexed: {stats['total_files_indexed']}")
output.append(f" Total Chunks Indexed: {stats['total_chunks_indexed']}\n")
output.append(f"\n Status Breakdown:")
for status, count in stats['by_status'].items():
if count > 0:
output.append(f" {status}: {count}")
return [TextContent(type="text", text="\n".join(output))]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def main():
"""Run the MCP server"""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())