mcp_handler.pyā¢23.5 kB
"""
MCP Protocol Handler - Part 1: Basic Structure
This file handles the MCP protocol (JSON-RPC 2.0).
It's broken into logical sections for easier understanding.
SECTIONS IN THIS FILE:
1. Class initialization (__init__)
2. Startup/shutdown (initialize, cleanup)
3. Main request handler (handle_request)
4. Protocol method handlers (_handle_initialize, _handle_tools_list, etc.)
5. Helper methods (_success_response, _error_response)
"""
import structlog
from typing import Dict, Any, Optional
import asyncio
from config import settings
from src.github.client import GitHubClient
logger = structlog.get_logger()
# ============================================================================
# SECTION 1: CLASS DEFINITION AND INITIALIZATION
# ============================================================================
class MCPHandler:
"""
Handles MCP protocol requests from Claude.
This is the brain that:
- Knows what tools are available
- Routes requests to the right handler
- Formats responses properly
- (Future) Executes RAG retrieval pipeline
"""
def __init__(self):
"""
Initialize the MCP handler.
Sets up:
- Server information (name, version)
- Capabilities (what tools/resources we offer)
- GitHub client for API access
"""
# Server info that Claude will see
self.server_info = {
"name": settings.mcp_server_name,
"version": settings.mcp_version,
"protocol_version": "2024-11-05", # MCP spec version
}
# What our server can do (filled in initialize())
self.capabilities = {
"tools": {}, # GitHub tools we'll offer
"resources": {}, # GitHub resources we'll offer
"prompts": {}, # Pre-built prompts (optional)
}
# Initialize GitHub client
self.github_client = GitHubClient()
logger.info("MCP handler created", server_info=self.server_info)
# ============================================================================
# SECTION 2: STARTUP AND SHUTDOWN
# ============================================================================
async def initialize(self):
"""
Called once when server starts.
Registers all the tools that Claude can use.
Think of it as publishing a menu of what we can do.
"""
logger.info("Registering MCP capabilities...")
# Register GitHub tools
self._register_tools()
# Register GitHub resources
self._register_resources()
logger.info("Capabilities registered",
tools=len(self.capabilities["tools"]),
resources=len(self.capabilities["resources"]))
def _register_tools(self):
"""
Register available tools.
These are the "functions" that Claude can call.
Each tool has:
- name: Unique identifier
- description: What it does (Claude uses this to decide when to call it)
- parameters: What inputs it needs
"""
self.capabilities["tools"] = {
# Tool 1: Get repository info
"github_get_repo": {
"description": "Retrieve GitHub repository information including README, structure, and metadata",
"parameters": {
"owner": {
"type": "string",
"required": True,
"description": "Repository owner (user or organization)"
},
"repo": {
"type": "string",
"required": True,
"description": "Repository name"
}
}
},
# Tool 2: Search code with vector similarity
"github_search_code": {
"description": "Search code in GitHub repositories using semantic search",
"parameters": {
"query": {
"type": "string",
"required": True,
"description": "Search query (e.g., 'authentication flow')"
},
"repo": {
"type": "string",
"required": False,
"description": "Optional: Limit search to specific repo (format: owner/repo)"
}
}
},
# Tool 3: Get specific file
"github_get_file": {
"description": "Get specific file content from a GitHub repository",
"parameters": {
"owner": {
"type": "string",
"required": True,
"description": "Repository owner"
},
"repo": {
"type": "string",
"required": True,
"description": "Repository name"
},
"path": {
"type": "string",
"required": True,
"description": "File path in repository (e.g., 'src/main.py')"
}
}
}
}
def _register_resources(self):
"""
Register available resources.
Resources are like "data sources" that can be accessed.
Think of them as URIs that point to data.
"""
self.capabilities["resources"] = {
"github_repo": {
"description": "GitHub repository as a resource",
"uri_template": "github://{owner}/{repo}",
"mime_type": "text/plain"
}
}
async def cleanup(self):
"""
Called once when server shuts down.
Clean up any resources:
- Close database connections
- Close GitHub API connections
- Save any pending data
"""
logger.info("Cleaning up MCP handler...")
# Close GitHub client
self.github_client.close()
# TODO: Close vector DB when we add it
# - vector_db.close()
logger.info("Cleanup complete")
# ============================================================================
# SECTION 3: MAIN REQUEST HANDLER
# ============================================================================
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Main entry point for all MCP requests.
This is called by the FastAPI endpoint with every request from Claude.
Args:
request: JSON-RPC 2.0 request from Claude
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {...},
"id": 1
}
Returns:
JSON-RPC 2.0 response
{
"jsonrpc": "2.0",
"id": 1,
"result": {...}
}
"""
# Step 1: Validate it's proper JSON-RPC 2.0
if not self._validate_request(request):
return self._error_response(
request.get("id"),
-32600,
"Invalid Request: must be JSON-RPC 2.0"
)
# Step 2: Extract the important parts
method = request["method"]
params = request.get("params", {})
request_id = request.get("id")
logger.info("Processing request", method=method, request_id=request_id)
# Step 3: Route to the appropriate handler
try:
result = await self._route_request(method, params)
return self._success_response(request_id, result)
except Exception as e:
logger.error("Error processing request", method=method, error=str(e))
return self._error_response(request_id, -32603, f"Internal error: {str(e)}")
def _validate_request(self, request: Dict[str, Any]) -> bool:
"""
Validate JSON-RPC 2.0 format.
Must have:
- "jsonrpc": "2.0"
- "method": string
"""
if "jsonrpc" not in request or request["jsonrpc"] != "2.0":
return False
if "method" not in request:
return False
return True
async def _route_request(self, method: str, params: Dict[str, Any]) -> Any:
"""
Route request to the appropriate handler based on method.
MCP methods:
- initialize: First handshake
- tools/list: List available tools
- tools/call: Execute a tool
- resources/list: List available resources
- resources/read: Read a resource
"""
if method == "initialize":
return await self._handle_initialize(params)
elif method == "tools/list":
return await self._handle_tools_list(params)
elif method == "tools/call":
return await self._handle_tools_call(params)
elif method == "resources/list":
return await self._handle_resources_list(params)
elif method == "resources/read":
return await self._handle_resources_read(params)
else:
raise ValueError(f"Method not found: {method}")
# ============================================================================
# SECTION 4: PROTOCOL METHOD HANDLERS
# ============================================================================
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle 'initialize' method.
Called by Claude when it first connects.
Claude asks: "What can you do? What protocol do you speak?"
We respond with our capabilities.
"""
logger.info("Handling initialize request")
return {
"protocolVersion": "2024-11-05",
"serverInfo": self.server_info,
"capabilities": {
"tools": {"listChanged": True},
"resources": {"subscribe": True, "listChanged": True},
}
}
async def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle 'tools/list' method.
Called by Claude to discover what tools we have.
Claude asks: "What tools do you offer?"
"""
logger.info("Listing available tools")
# Convert our internal tool format to MCP format
tools = []
for tool_name, tool_info in self.capabilities["tools"].items():
tools.append({
"name": tool_name,
"description": tool_info["description"],
"inputSchema": {
"type": "object",
"properties": tool_info["parameters"],
"required": [
name for name, info in tool_info["parameters"].items()
if info.get("required", False)
]
}
})
return {"tools": tools}
async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle 'tools/call' method.
THIS IS THE MOST IMPORTANT METHOD!
Called by Claude when it wants to use a tool.
This is where the RAG retrieval will happen!
"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
logger.info("Executing tool", tool=tool_name, arguments=arguments)
# Route to specific tool handler
if tool_name == "github_get_repo":
return await self._execute_github_get_repo(arguments)
elif tool_name == "github_search_code":
return await self._execute_github_search_code(arguments)
elif tool_name == "github_get_file":
return await self._execute_github_get_file(arguments)
else:
raise ValueError(f"Unknown tool: {tool_name}")
async def _handle_resources_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle 'resources/list' method.
Lists available resources (data sources).
"""
logger.info("Listing available resources")
resources = []
for resource_name, resource_info in self.capabilities["resources"].items():
resources.append({
"uri": resource_info["uri_template"],
"name": resource_name,
"description": resource_info["description"],
"mimeType": resource_info.get("mime_type", "text/plain")
})
return {"resources": resources}
async def _handle_resources_read(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle 'resources/read' method.
Read a specific resource by URI.
Example: github://anthropics/mcp
"""
uri = params.get("uri")
logger.info("Reading resource", uri=uri)
# TODO: Parse URI and fetch actual resource
# For now, return placeholder
return {
"contents": [{
"uri": uri,
"mimeType": "text/plain",
"text": f"[PLACEHOLDER] Resource content for {uri}"
}]
}
# ============================================================================
# SECTION 5: TOOL EXECUTION METHODS (RAG PIPELINE WILL GO HERE)
# ============================================================================
async def _execute_github_get_repo(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute the 'github_get_repo' tool.
Gets repository information from GitHub API.
Steps:
1. Fetch repo from GitHub API
2. Format comprehensive repository context
3. (TODO) Chunk to 1500 tokens
4. Return optimized context
"""
owner = arguments.get("owner")
repo = arguments.get("repo")
if not owner or not repo:
raise ValueError("Both 'owner' and 'repo' parameters are required")
logger.info("Getting repo", owner=owner, repo=repo)
try:
# Fetch repository data from GitHub
repo_info = await self.github_client.get_repository(owner, repo)
# Fetch repository structure
structure = await self.github_client.get_repository_structure(owner, repo)
# Build comprehensive context
context_text = self._format_repo_context(repo_info, structure)
# TODO: Apply 1500-token chunking here
# For now, return full context
return {
"content": [{
"type": "text",
"text": context_text
}]
}
except ValueError as e:
logger.error("GitHub API error", error=str(e))
return {
"content": [{
"type": "text",
"text": f"Error fetching repository: {str(e)}"
}],
"isError": True
}
except Exception as e:
logger.error("Unexpected error", error=str(e))
return {
"content": [{
"type": "text",
"text": f"Unexpected error: {str(e)}"
}],
"isError": True
}
async def _execute_github_search_code(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute the 'github_search_code' tool.
Searches code using GitHub's search API.
Steps:
1. Execute GitHub code search
2. Format results
3. (TODO) Add vector similarity ranking
4. Return top results
"""
query = arguments.get("query")
repo = arguments.get("repo")
if not query:
raise ValueError("'query' parameter is required")
logger.info("Searching code", query=query, repo=repo)
try:
# Execute GitHub code search
results = await self.github_client.search_code(query, repo)
# Format search results
if not results:
return {
"content": [{
"type": "text",
"text": f"No code found matching query: '{query}'"
}]
}
# Build result text
result_text = f"## Code Search Results for: '{query}'\n"
if repo:
result_text += f"Repository: {repo}\n"
result_text += f"\nFound {len(results)} results:\n\n"
for i, result in enumerate(results, 1):
result_text += f"{i}. **{result['name']}**\n"
result_text += f" - Path: `{result['path']}`\n"
result_text += f" - Repository: {result['repository']}\n"
result_text += f" - URL: {result['url']}\n\n"
# TODO: Add vector similarity ranking
# TODO: Fetch and include code snippets
return {
"content": [{
"type": "text",
"text": result_text
}]
}
except ValueError as e:
logger.error("GitHub API error", error=str(e))
return {
"content": [{
"type": "text",
"text": f"Error searching code: {str(e)}"
}],
"isError": True
}
except Exception as e:
logger.error("Unexpected error", error=str(e))
return {
"content": [{
"type": "text",
"text": f"Unexpected error: {str(e)}"
}],
"isError": True
}
async def _execute_github_get_file(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute the 'github_get_file' tool.
Gets specific file content from GitHub.
Steps:
1. Fetch file from GitHub
2. Format with metadata
3. (TODO) Add surrounding context from related files
4. Return with proper formatting
"""
owner = arguments.get("owner")
repo = arguments.get("repo")
path = arguments.get("path")
if not owner or not repo or not path:
raise ValueError("All parameters 'owner', 'repo', and 'path' are required")
logger.info("Getting file", owner=owner, repo=repo, path=path)
try:
# Fetch file content from GitHub
file_data = await self.github_client.get_file_content(owner, repo, path)
# Handle directory vs file
if file_data["type"] == "directory":
# List directory contents
file_list = "\n".join([f"- {f}" for f in file_data["files"]])
return {
"content": [{
"type": "text",
"text": f"## Directory: {path}\n\nContents:\n{file_list}"
}]
}
else:
# Format file content
content_text = f"## File: {file_data['name']}\n"
content_text += f"**Path:** `{file_data['path']}`\n"
content_text += f"**Size:** {file_data['size']} bytes\n"
content_text += f"**URL:** {file_data['url']}\n\n"
content_text += f"### Content:\n```\n{file_data['content']}\n```"
# TODO: Add surrounding context from related files
# TODO: Apply 1500-token chunking if content is too large
return {
"content": [{
"type": "text",
"text": content_text
}]
}
except ValueError as e:
logger.error("GitHub API error", error=str(e))
return {
"content": [{
"type": "text",
"text": f"Error fetching file: {str(e)}"
}],
"isError": True
}
except Exception as e:
logger.error("Unexpected error", error=str(e))
return {
"content": [{
"type": "text",
"text": f"Unexpected error: {str(e)}"
}],
"isError": True
}
# ============================================================================
# SECTION 6: HELPER METHODS
# ============================================================================
def _format_repo_context(self, repo_info: Dict[str, Any], structure: Dict[str, Any]) -> str:
"""
Format repository information into a comprehensive context string.
Args:
repo_info: Repository metadata from GitHub API
structure: Repository file structure
Returns:
Formatted context string
"""
context = f"# Repository: {repo_info['full_name']}\n\n"
# Metadata
context += f"**Description:** {repo_info['description']}\n"
context += f"**Language:** {repo_info['language']}\n"
context += f"**Stars:** {repo_info['stars']} | **Forks:** {repo_info['forks']}\n"
context += f"**Created:** {repo_info['created_at']}\n"
context += f"**Updated:** {repo_info['updated_at']}\n"
context += f"**URL:** {repo_info['url']}\n"
# Topics
if repo_info.get('topics'):
topics_str = ", ".join(repo_info['topics'])
context += f"**Topics:** {topics_str}\n"
context += "\n---\n\n"
# README
context += "## README\n\n"
context += repo_info['readme'] + "\n\n"
context += "---\n\n"
# File Structure
context += "## Repository Structure\n\n"
if structure.get('children'):
for item in structure['children']:
if item['type'] == 'file':
context += f"- š {item['name']} ({item.get('size', 0)} bytes)\n"
else:
context += f"- š {item['name']}/\n"
return context
def _success_response(self, request_id: Optional[Any], result: Any) -> Dict[str, Any]:
"""
Create a JSON-RPC 2.0 success response.
Format:
{
"jsonrpc": "2.0",
"id": <request_id>,
"result": <result>
}
"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
def _error_response(self, request_id: Optional[Any], code: int, message: str) -> Dict[str, Any]:
"""
Create a JSON-RPC 2.0 error response.
Format:
{
"jsonrpc": "2.0",
"id": <request_id>,
"error": {
"code": <code>,
"message": <message>
}
}
Standard error codes:
-32600: Invalid Request
-32601: Method not found
-32602: Invalid params
-32603: Internal error
"""
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": code,
"message": message
}
}