Skip to main content
Glama

MCP Enhanced Data Retrieval System

by kalpalathika
mcp_handler.py•23.5 kB
""" MCP Protocol Handler - Part 1: Basic Structure This file handles the MCP protocol (JSON-RPC 2.0). It's broken into logical sections for easier understanding. SECTIONS IN THIS FILE: 1. Class initialization (__init__) 2. Startup/shutdown (initialize, cleanup) 3. Main request handler (handle_request) 4. Protocol method handlers (_handle_initialize, _handle_tools_list, etc.) 5. Helper methods (_success_response, _error_response) """ import structlog from typing import Dict, Any, Optional import asyncio from config import settings from src.github.client import GitHubClient logger = structlog.get_logger() # ============================================================================ # SECTION 1: CLASS DEFINITION AND INITIALIZATION # ============================================================================ class MCPHandler: """ Handles MCP protocol requests from Claude. This is the brain that: - Knows what tools are available - Routes requests to the right handler - Formats responses properly - (Future) Executes RAG retrieval pipeline """ def __init__(self): """ Initialize the MCP handler. Sets up: - Server information (name, version) - Capabilities (what tools/resources we offer) - GitHub client for API access """ # Server info that Claude will see self.server_info = { "name": settings.mcp_server_name, "version": settings.mcp_version, "protocol_version": "2024-11-05", # MCP spec version } # What our server can do (filled in initialize()) self.capabilities = { "tools": {}, # GitHub tools we'll offer "resources": {}, # GitHub resources we'll offer "prompts": {}, # Pre-built prompts (optional) } # Initialize GitHub client self.github_client = GitHubClient() logger.info("MCP handler created", server_info=self.server_info) # ============================================================================ # SECTION 2: STARTUP AND SHUTDOWN # ============================================================================ async def initialize(self): """ Called once when server starts. Registers all the tools that Claude can use. Think of it as publishing a menu of what we can do. """ logger.info("Registering MCP capabilities...") # Register GitHub tools self._register_tools() # Register GitHub resources self._register_resources() logger.info("Capabilities registered", tools=len(self.capabilities["tools"]), resources=len(self.capabilities["resources"])) def _register_tools(self): """ Register available tools. These are the "functions" that Claude can call. Each tool has: - name: Unique identifier - description: What it does (Claude uses this to decide when to call it) - parameters: What inputs it needs """ self.capabilities["tools"] = { # Tool 1: Get repository info "github_get_repo": { "description": "Retrieve GitHub repository information including README, structure, and metadata", "parameters": { "owner": { "type": "string", "required": True, "description": "Repository owner (user or organization)" }, "repo": { "type": "string", "required": True, "description": "Repository name" } } }, # Tool 2: Search code with vector similarity "github_search_code": { "description": "Search code in GitHub repositories using semantic search", "parameters": { "query": { "type": "string", "required": True, "description": "Search query (e.g., 'authentication flow')" }, "repo": { "type": "string", "required": False, "description": "Optional: Limit search to specific repo (format: owner/repo)" } } }, # Tool 3: Get specific file "github_get_file": { "description": "Get specific file content from a GitHub repository", "parameters": { "owner": { "type": "string", "required": True, "description": "Repository owner" }, "repo": { "type": "string", "required": True, "description": "Repository name" }, "path": { "type": "string", "required": True, "description": "File path in repository (e.g., 'src/main.py')" } } } } def _register_resources(self): """ Register available resources. Resources are like "data sources" that can be accessed. Think of them as URIs that point to data. """ self.capabilities["resources"] = { "github_repo": { "description": "GitHub repository as a resource", "uri_template": "github://{owner}/{repo}", "mime_type": "text/plain" } } async def cleanup(self): """ Called once when server shuts down. Clean up any resources: - Close database connections - Close GitHub API connections - Save any pending data """ logger.info("Cleaning up MCP handler...") # Close GitHub client self.github_client.close() # TODO: Close vector DB when we add it # - vector_db.close() logger.info("Cleanup complete") # ============================================================================ # SECTION 3: MAIN REQUEST HANDLER # ============================================================================ async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """ Main entry point for all MCP requests. This is called by the FastAPI endpoint with every request from Claude. Args: request: JSON-RPC 2.0 request from Claude { "jsonrpc": "2.0", "method": "tools/call", "params": {...}, "id": 1 } Returns: JSON-RPC 2.0 response { "jsonrpc": "2.0", "id": 1, "result": {...} } """ # Step 1: Validate it's proper JSON-RPC 2.0 if not self._validate_request(request): return self._error_response( request.get("id"), -32600, "Invalid Request: must be JSON-RPC 2.0" ) # Step 2: Extract the important parts method = request["method"] params = request.get("params", {}) request_id = request.get("id") logger.info("Processing request", method=method, request_id=request_id) # Step 3: Route to the appropriate handler try: result = await self._route_request(method, params) return self._success_response(request_id, result) except Exception as e: logger.error("Error processing request", method=method, error=str(e)) return self._error_response(request_id, -32603, f"Internal error: {str(e)}") def _validate_request(self, request: Dict[str, Any]) -> bool: """ Validate JSON-RPC 2.0 format. Must have: - "jsonrpc": "2.0" - "method": string """ if "jsonrpc" not in request or request["jsonrpc"] != "2.0": return False if "method" not in request: return False return True async def _route_request(self, method: str, params: Dict[str, Any]) -> Any: """ Route request to the appropriate handler based on method. MCP methods: - initialize: First handshake - tools/list: List available tools - tools/call: Execute a tool - resources/list: List available resources - resources/read: Read a resource """ if method == "initialize": return await self._handle_initialize(params) elif method == "tools/list": return await self._handle_tools_list(params) elif method == "tools/call": return await self._handle_tools_call(params) elif method == "resources/list": return await self._handle_resources_list(params) elif method == "resources/read": return await self._handle_resources_read(params) else: raise ValueError(f"Method not found: {method}") # ============================================================================ # SECTION 4: PROTOCOL METHOD HANDLERS # ============================================================================ async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle 'initialize' method. Called by Claude when it first connects. Claude asks: "What can you do? What protocol do you speak?" We respond with our capabilities. """ logger.info("Handling initialize request") return { "protocolVersion": "2024-11-05", "serverInfo": self.server_info, "capabilities": { "tools": {"listChanged": True}, "resources": {"subscribe": True, "listChanged": True}, } } async def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle 'tools/list' method. Called by Claude to discover what tools we have. Claude asks: "What tools do you offer?" """ logger.info("Listing available tools") # Convert our internal tool format to MCP format tools = [] for tool_name, tool_info in self.capabilities["tools"].items(): tools.append({ "name": tool_name, "description": tool_info["description"], "inputSchema": { "type": "object", "properties": tool_info["parameters"], "required": [ name for name, info in tool_info["parameters"].items() if info.get("required", False) ] } }) return {"tools": tools} async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle 'tools/call' method. THIS IS THE MOST IMPORTANT METHOD! Called by Claude when it wants to use a tool. This is where the RAG retrieval will happen! """ tool_name = params.get("name") arguments = params.get("arguments", {}) logger.info("Executing tool", tool=tool_name, arguments=arguments) # Route to specific tool handler if tool_name == "github_get_repo": return await self._execute_github_get_repo(arguments) elif tool_name == "github_search_code": return await self._execute_github_search_code(arguments) elif tool_name == "github_get_file": return await self._execute_github_get_file(arguments) else: raise ValueError(f"Unknown tool: {tool_name}") async def _handle_resources_list(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle 'resources/list' method. Lists available resources (data sources). """ logger.info("Listing available resources") resources = [] for resource_name, resource_info in self.capabilities["resources"].items(): resources.append({ "uri": resource_info["uri_template"], "name": resource_name, "description": resource_info["description"], "mimeType": resource_info.get("mime_type", "text/plain") }) return {"resources": resources} async def _handle_resources_read(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle 'resources/read' method. Read a specific resource by URI. Example: github://anthropics/mcp """ uri = params.get("uri") logger.info("Reading resource", uri=uri) # TODO: Parse URI and fetch actual resource # For now, return placeholder return { "contents": [{ "uri": uri, "mimeType": "text/plain", "text": f"[PLACEHOLDER] Resource content for {uri}" }] } # ============================================================================ # SECTION 5: TOOL EXECUTION METHODS (RAG PIPELINE WILL GO HERE) # ============================================================================ async def _execute_github_get_repo(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Execute the 'github_get_repo' tool. Gets repository information from GitHub API. Steps: 1. Fetch repo from GitHub API 2. Format comprehensive repository context 3. (TODO) Chunk to 1500 tokens 4. Return optimized context """ owner = arguments.get("owner") repo = arguments.get("repo") if not owner or not repo: raise ValueError("Both 'owner' and 'repo' parameters are required") logger.info("Getting repo", owner=owner, repo=repo) try: # Fetch repository data from GitHub repo_info = await self.github_client.get_repository(owner, repo) # Fetch repository structure structure = await self.github_client.get_repository_structure(owner, repo) # Build comprehensive context context_text = self._format_repo_context(repo_info, structure) # TODO: Apply 1500-token chunking here # For now, return full context return { "content": [{ "type": "text", "text": context_text }] } except ValueError as e: logger.error("GitHub API error", error=str(e)) return { "content": [{ "type": "text", "text": f"Error fetching repository: {str(e)}" }], "isError": True } except Exception as e: logger.error("Unexpected error", error=str(e)) return { "content": [{ "type": "text", "text": f"Unexpected error: {str(e)}" }], "isError": True } async def _execute_github_search_code(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Execute the 'github_search_code' tool. Searches code using GitHub's search API. Steps: 1. Execute GitHub code search 2. Format results 3. (TODO) Add vector similarity ranking 4. Return top results """ query = arguments.get("query") repo = arguments.get("repo") if not query: raise ValueError("'query' parameter is required") logger.info("Searching code", query=query, repo=repo) try: # Execute GitHub code search results = await self.github_client.search_code(query, repo) # Format search results if not results: return { "content": [{ "type": "text", "text": f"No code found matching query: '{query}'" }] } # Build result text result_text = f"## Code Search Results for: '{query}'\n" if repo: result_text += f"Repository: {repo}\n" result_text += f"\nFound {len(results)} results:\n\n" for i, result in enumerate(results, 1): result_text += f"{i}. **{result['name']}**\n" result_text += f" - Path: `{result['path']}`\n" result_text += f" - Repository: {result['repository']}\n" result_text += f" - URL: {result['url']}\n\n" # TODO: Add vector similarity ranking # TODO: Fetch and include code snippets return { "content": [{ "type": "text", "text": result_text }] } except ValueError as e: logger.error("GitHub API error", error=str(e)) return { "content": [{ "type": "text", "text": f"Error searching code: {str(e)}" }], "isError": True } except Exception as e: logger.error("Unexpected error", error=str(e)) return { "content": [{ "type": "text", "text": f"Unexpected error: {str(e)}" }], "isError": True } async def _execute_github_get_file(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Execute the 'github_get_file' tool. Gets specific file content from GitHub. Steps: 1. Fetch file from GitHub 2. Format with metadata 3. (TODO) Add surrounding context from related files 4. Return with proper formatting """ owner = arguments.get("owner") repo = arguments.get("repo") path = arguments.get("path") if not owner or not repo or not path: raise ValueError("All parameters 'owner', 'repo', and 'path' are required") logger.info("Getting file", owner=owner, repo=repo, path=path) try: # Fetch file content from GitHub file_data = await self.github_client.get_file_content(owner, repo, path) # Handle directory vs file if file_data["type"] == "directory": # List directory contents file_list = "\n".join([f"- {f}" for f in file_data["files"]]) return { "content": [{ "type": "text", "text": f"## Directory: {path}\n\nContents:\n{file_list}" }] } else: # Format file content content_text = f"## File: {file_data['name']}\n" content_text += f"**Path:** `{file_data['path']}`\n" content_text += f"**Size:** {file_data['size']} bytes\n" content_text += f"**URL:** {file_data['url']}\n\n" content_text += f"### Content:\n```\n{file_data['content']}\n```" # TODO: Add surrounding context from related files # TODO: Apply 1500-token chunking if content is too large return { "content": [{ "type": "text", "text": content_text }] } except ValueError as e: logger.error("GitHub API error", error=str(e)) return { "content": [{ "type": "text", "text": f"Error fetching file: {str(e)}" }], "isError": True } except Exception as e: logger.error("Unexpected error", error=str(e)) return { "content": [{ "type": "text", "text": f"Unexpected error: {str(e)}" }], "isError": True } # ============================================================================ # SECTION 6: HELPER METHODS # ============================================================================ def _format_repo_context(self, repo_info: Dict[str, Any], structure: Dict[str, Any]) -> str: """ Format repository information into a comprehensive context string. Args: repo_info: Repository metadata from GitHub API structure: Repository file structure Returns: Formatted context string """ context = f"# Repository: {repo_info['full_name']}\n\n" # Metadata context += f"**Description:** {repo_info['description']}\n" context += f"**Language:** {repo_info['language']}\n" context += f"**Stars:** {repo_info['stars']} | **Forks:** {repo_info['forks']}\n" context += f"**Created:** {repo_info['created_at']}\n" context += f"**Updated:** {repo_info['updated_at']}\n" context += f"**URL:** {repo_info['url']}\n" # Topics if repo_info.get('topics'): topics_str = ", ".join(repo_info['topics']) context += f"**Topics:** {topics_str}\n" context += "\n---\n\n" # README context += "## README\n\n" context += repo_info['readme'] + "\n\n" context += "---\n\n" # File Structure context += "## Repository Structure\n\n" if structure.get('children'): for item in structure['children']: if item['type'] == 'file': context += f"- šŸ“„ {item['name']} ({item.get('size', 0)} bytes)\n" else: context += f"- šŸ“ {item['name']}/\n" return context def _success_response(self, request_id: Optional[Any], result: Any) -> Dict[str, Any]: """ Create a JSON-RPC 2.0 success response. Format: { "jsonrpc": "2.0", "id": <request_id>, "result": <result> } """ return { "jsonrpc": "2.0", "id": request_id, "result": result } def _error_response(self, request_id: Optional[Any], code: int, message: str) -> Dict[str, Any]: """ Create a JSON-RPC 2.0 error response. Format: { "jsonrpc": "2.0", "id": <request_id>, "error": { "code": <code>, "message": <message> } } Standard error codes: -32600: Invalid Request -32601: Method not found -32602: Invalid params -32603: Internal error """ return { "jsonrpc": "2.0", "id": request_id, "error": { "code": code, "message": message } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kalpalathika/MCP-Enhanced-Data-Retrieval-System'

If you have feedback or need assistance with the MCP directory API, please join our Discord server