MCP Code Expert System

by tomsiwik
Verified
""" MCP Code Expert System Server A server that provides code review capabilities through expert personas using the Model Context Protocol (MCP). """ import os import anyio import click import json from typing import Dict, List, Any import mcp.types as types from mcp.server.lowlevel import Server from dotenv import load_dotenv from knowledge_graph import KnowledgeGraph from ollama_service import OllamaService from experts import get_all_experts # Load environment variables load_dotenv() # Initialize knowledge graph STORAGE_PATH = os.environ.get("KNOWLEDGE_GRAPH_PATH", "data/knowledge_graph.json") os.makedirs(os.path.dirname(STORAGE_PATH), exist_ok=True) knowledge_graph = KnowledgeGraph(STORAGE_PATH) # Initialize Ollama service ollama_service = OllamaService() # Initialize experts with shared resources experts = list(get_all_experts(knowledge_graph, ollama_service)) experts_by_tool = {expert.tool_name: expert for expert in experts} @click.command() @click.option("--port", default=8000, help="Port to listen on for SSE") @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type" ) def main(port: int, transport: str) -> int: """Run the MCP Code Expert System server.""" print(f"Starting MCP Code Expert System") print(f"Transport: {transport}") print(f"Ollama service available: {ollama_service.is_available}") print(f"Experts loaded: {', '.join(expert.name for expert in experts)}") # Create server app = Server("Code Expert System") @app.list_tools() async def list_tools() -> List[types.Tool]: """List available tools""" tools = [] # Add expert tools for expert in experts: tools.append( types.Tool( name=expert.tool_name, description=expert.tool_description, inputSchema=expert.input_schema ) ) # Add knowledge graph tools tools.extend([ types.Tool( name="read_graph", description="Read the entire knowledge graph", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="search_nodes", description="Search for nodes in the knowledge graph based on a query", inputSchema={ "type": "object", "required": ["query"], "properties": { "query": { "type": "string", "description": "The search query" } } } ), types.Tool( name="open_nodes", description="Open specific nodes in the knowledge graph by their names", inputSchema={ "type": "object", "required": ["names"], "properties": { "names": { "type": "array", "items": {"type": "string"}, "description": "List of node names to open" } } } ), ]) print(f"Listing {len(tools)} tools") for tool in tools: print(f" - {tool.name}: {tool.description}") return tools @app.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> Any: """Handle tool calls""" print(f"Tool call: {name} with arguments: {json.dumps(arguments)[:100]}") try: # Handle expert tools if name in experts_by_tool: expert = experts_by_tool[name] from experts import CodeReviewRequest response = await expert.review_code(CodeReviewRequest(**arguments)) return response.model_dump() # Handle knowledge graph tools elif name == "read_graph": return knowledge_graph.get_all() elif name == "search_nodes": query = arguments.get("query", "") return knowledge_graph.search_nodes(query) elif name == "open_nodes": names = arguments.get("names", []) results = [] for name in names: node = knowledge_graph.get_node(name) if node: results.append({"name": name, **node}) return results else: raise ValueError(f"Unknown tool: {name}") except Exception as e: print(f"Error handling tool call {name}: {e}") return { "error": str(e), "success": False } # Run with the appropriate transport if transport == "sse": from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware # Create SSE transport sse = SseServerTransport("/messages/") async def handle_sse(request): print(f"New SSE connection received") async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) # Create Starlette app with CORS middleware starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], middleware=[ Middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ), ] ) import uvicorn print(f"SSE endpoint: http://localhost:{port}/sse") uvicorn.run(starlette_app, host="0.0.0.0", port=port) else: from mcp.server.stdio import stdio_server async def arun(): async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) anyio.run(arun) return 0 if __name__ == "__main__": main()