Python MCP Server

from __future__ import annotations import os import re import ast from typing import Any, Dict, List, Optional from pathlib import Path from code_grapher import CodeGrapher # Load token limit from environment variable or use default import os default_token_limit = 8000 try: import dotenv dotenv.load_dotenv() token_limit = int(os.getenv('TOKEN_LIMIT', default_token_limit)) except (ImportError, ValueError): token_limit = default_token_limit def get_python_code(target_file: str, root_repo_path: Optional[str] = None) -> Dict[str, Any]: """Return the code of the target file and related Python files. Analyzes the target file and its imports to find the most relevant Python files in the codebase. Returns the code in an LLM-friendly format with proper context. Always includes README.md files (or variants) as additional files. Args: target_file: Path to the Python file to analyze. root_repo_path: Root directory of the repository. If None, will use the directory of the target file. Returns: A dictionary containing the target file's code and related files. """ # Initialize CodeGrapher with token limit token_limit = 8000 # Default token limit code_grapher = CodeGrapher(token_limit=token_limit) # Set root_repo_path if not provided if root_repo_path is None: if os.path.isabs(target_file): root_repo_path = os.path.dirname(target_file) else: root_repo_path = os.path.dirname(os.path.abspath(target_file)) # Ensure absolute path for target file if not os.path.isabs(target_file): target_file = os.path.join(root_repo_path, target_file) target_file = os.path.abspath(target_file) # Make sure it's a Python file if not target_file.endswith('.py'): raise ValueError(f"The target file must be a Python file (.py): {target_file}") # Find and include README files readme_files = find_readme_files(root_repo_path) # Extract the code graph for the target file result = code_grapher.extract_code(target_file, project_root=root_repo_path) if "error" in result: raise ValueError(result["error"]) # Format the target file code target_file_rel = os.path.relpath(target_file, root_repo_path) target_code = { "file_path": target_file_rel, "code": result["main_object"]["code"], "type": "target", "docstring": result["main_object"]["docstring"] or "" } # Calculate token count for target file target_token_count = code_grapher._count_tokens(result["main_object"]["code"]) current_token_count = target_token_count token_limit = code_grapher.token_limit # Format the related files related_files = [] # Create a list to hold files that import the target files_importing_target = [] # Create a list to hold files that are imported by the target files_imported_by_target = [] for obj in result["referenced_objects"]: # Get relative path for better readability rel_path = os.path.relpath(obj["file"], root_repo_path) file_token_count = code_grapher._count_tokens(obj["code"]) file_data = { "file_path": rel_path, "object_name": obj["name"], "object_type": obj["type"], "code": obj["code"], "docstring": obj["docstring"] or "", "truncated": obj.get("truncated", False), "token_count": file_token_count } # Categorize if this file is imported by the target or imports the target # We'll determine this based on the reference type or relationship # This is a placeholder logic that should be adapted based on your actual data structure if "referenced_from" in obj and obj["referenced_from"] == target_file: # This file is imported by the target files_imported_by_target.append(file_data) else: # This file imports the target files_importing_target.append(file_data) # Find additional Python files in the same directory and related modules target_dir = os.path.dirname(target_file) additional_files = [] # Get all Python files from the directory structure all_python_files = code_grapher.find_all_python_files(root_repo_path) # Add files from the same directory first (if not already included) included_paths = {target_file} | {obj["file"] for obj in result["referenced_objects"]} # Find potential imports in the target file that weren't resolved potential_imports = set() project_imports = set() # Track full project-specific import paths try: with open(target_file, 'r', encoding='utf-8') as f: content = f.read() tree = ast.parse(content) # Extract import names for node in ast.walk(tree): if isinstance(node, ast.Import): for name in node.names: import_name = name.name potential_imports.add(import_name.split('.')[0]) # Store the full import name for project-specific imports project_imports.add(import_name) elif isinstance(node, ast.ImportFrom): if node.module: module_name = node.module potential_imports.add(module_name.split('.')[0]) # Store the full module name for project-specific imports project_imports.add(module_name) # Also add the imported objects for name in node.names: full_import = f"{module_name}.{name.name}" project_imports.add(full_import) print(f"Potential imports found in {target_file}: {potential_imports}") print(f"Project-specific imports: {project_imports}") except Exception as e: print(f"Error analyzing imports in {target_file}: {e}") # Process files by relevance, with special handling for project imports for py_file in all_python_files: if py_file not in included_paths: # Skip if we've already included enough files if len(additional_files) >= 15: # Increased limit for more context break # Calculate relevance score (higher is more relevant) relevance = 0 # Files in same directory are highly relevant if os.path.dirname(py_file) == target_dir: relevance += 3 # Files that match potential import names are relevant basename = os.path.basename(py_file).replace('.py', '') if basename in potential_imports: relevance += 4 print(f"Found matching import: {basename} in {py_file}") # Handle project-specific imports by mapping file paths to import paths rel_path = os.path.relpath(py_file, root_repo_path) module_path = rel_path.replace('/', '.').replace('.py', '') # Check if this file matches any of the project imports is_import_related = False matching_import = None for project_import in project_imports: # Check if the module path might match a project import if project_import.startswith(module_path) or module_path.endswith(project_import): relevance += 5 is_import_related = True matching_import = project_import print(f"Found matching project import: {project_import} -> {module_path} in {py_file}") # Only include files with some relevance if relevance > 0: try: with open(py_file, 'r', encoding='utf-8') as f: content = f.read() # Parse the file to get docstring try: tree = ast.parse(content) docstring = ast.get_docstring(tree) or "" except: docstring = "" rel_path = os.path.relpath(py_file, root_repo_path) token_count = code_grapher._count_tokens(content) # If file is related by import, add to files_imported_by_target # so it will be included in referenced_files if is_import_related or basename in potential_imports: files_imported_by_target.append({ "file_path": rel_path, "object_name": matching_import or basename, "object_type": "module", "code": content, "docstring": docstring, "truncated": False, "token_count": token_count }) print(f"Added imported file: {rel_path} (import: {matching_import or basename})") else: # Otherwise add to additional_files additional_files.append({ "file_path": rel_path, "code": content, "type": "related_by_directory" if os.path.dirname(py_file) == target_dir else "related", "docstring": docstring, "relevance": relevance, "token_count": token_count }) print(f"Added related file: {rel_path} (relevance: {relevance})") except Exception as e: print(f"Error reading file {py_file}: {e}") # Sort additional files by relevance (but we'll use token count later when adding files) if additional_files: # Only sort if there are files to sort additional_files.sort(key=lambda x: x.pop('relevance', 0), reverse=True) # Add README files to additional files and track token count readme_files_data = [] for readme_path in readme_files: try: with open(readme_path, 'r', encoding='utf-8') as f: readme_content = f.read() rel_path = os.path.relpath(readme_path, root_repo_path) readme_token_count = code_grapher._count_tokens(readme_content) readme_files_data.append({ "file_path": rel_path, "code": readme_content, "type": "readme", "docstring": "Project documentation", "token_count": readme_token_count }) print(f"Added README file: {rel_path} (tokens: {readme_token_count})") # Add README token count to current count current_token_count += readme_token_count except Exception as e: print(f"Error reading README file {readme_path}: {e}") # Begin building the final list of referenced files with target file already counted final_referenced_files = [] # Sort imported files by size (smallest to largest) if any exist if files_imported_by_target: files_imported_by_target.sort(key=lambda x: x["token_count"]) # Sort files that import the target by size (smallest to largest) if any exist if files_importing_target: files_importing_target.sort(key=lambda x: x["token_count"]) # Log information about the number of files found print(f"Found {len(files_imported_by_target)} files imported by target") print(f"Found {len(files_importing_target)} files importing target") print(f"Found {len(additional_files)} additional related files") # Add files that the target imports, from smallest to largest for file_data in files_imported_by_target: # Check if we have enough token budget if current_token_count + file_data["token_count"] <= token_limit: final_referenced_files.append({ "file_path": file_data["file_path"], "object_name": file_data["object_name"], "object_type": file_data["object_type"], "code": file_data["code"], "docstring": file_data["docstring"], "truncated": file_data["truncated"] }) current_token_count += file_data["token_count"] print(f"Added imported file: {file_data['file_path']} (tokens: {file_data['token_count']})") # Add files that import the target, from smallest to largest for file_data in files_importing_target: # Check if we have enough token budget if current_token_count + file_data["token_count"] <= token_limit: final_referenced_files.append({ "file_path": file_data["file_path"], "object_name": file_data["object_name"], "object_type": file_data["object_type"], "code": file_data["code"], "docstring": file_data["docstring"], "truncated": file_data["truncated"] }) current_token_count += file_data["token_count"] print(f"Added file importing target: {file_data['file_path']} (tokens: {file_data['token_count']})") # Format the additional files without the token counts final_additional_files = [] for readme_file in readme_files_data: final_additional_files.append({ "file_path": readme_file["file_path"], "code": readme_file["code"], "type": readme_file["type"], "docstring": readme_file["docstring"] }) # Add other additional files if there's token budget left for file_data in additional_files: # Check if we have enough token budget if current_token_count + file_data["token_count"] <= token_limit: final_additional_files.append({ "file_path": file_data["file_path"], "code": file_data["code"], "type": file_data["type"], "docstring": file_data["docstring"] }) current_token_count += file_data["token_count"] # Format the response as LLM-friendly content llm_friendly_format = { "target_file": target_code, "referenced_files": final_referenced_files, "additional_files": final_additional_files, "total_files": 1 + len(final_referenced_files) + len(final_additional_files), "token_count": current_token_count, "token_limit": token_limit, # Include metadata about the original numbers before token filtering "metadata": { "original_imported_files_count": len(files_imported_by_target), "original_importing_files_count": len(files_importing_target), "original_additional_files_count": len(additional_files), "readme_files_count": len(readme_files_data) } } return llm_friendly_format def find_readme_files(root_path: str) -> List[str]: """Find README files in the repository. Searches for README.md and variants in the repository root and subdirectories. Args: root_path: Root directory of the repository. Returns: List of paths to README files. """ readme_files = [] readme_patterns = ['README.md', 'README.txt', 'README', 'readme.md', 'Readme.md'] # Check for README in the root directory first for pattern in readme_patterns: readme_path = os.path.join(root_path, pattern) if os.path.isfile(readme_path): readme_files.append(readme_path) print(f"Found README file in root: {readme_path}") break # Only include one README from the root directory return readme_files # Simple JSON-RPC handler to expose the tool def handle_mcp_request(request_data: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP protocol JSON-RPC requests. This function implements a minimal MCP server that can list and call tools. Args: request_data: A JSON-RPC request following the MCP protocol. Returns: A JSON-RPC response following the MCP protocol. """ # Extract the method and params from the request method = request_data.get("method", "") params = request_data.get("params", {}) req_id = request_data.get("id") # Handle tool listing if method == "tools/list": return { "jsonrpc": "2.0", "id": req_id, "result": { "tools": [ { "name": "get_python_code", "description": "Return the code of a target Python file and related files based on import/export proximity.", "inputSchema": { "type": "object", "properties": { "target_file": { "type": "string", "description": "Path to the Python file to analyze." }, "root_repo_path": { "type": "string", "description": "Root directory of the repository. If not provided, the directory of the target file will be used." } }, "required": ["target_file"] } } ] } } # Handle tool calling elif method == "tools/call": tool_name = params.get("name") args = params.get("arguments", {}) if tool_name == "get_python_code": try: target_file = args.get("target_file") root_repo_path = args.get("root_repo_path") if not target_file: return { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32602, "message": "Missing required argument: target_file" } } result = get_python_code(target_file, root_repo_path) # Convert to MCP-friendly format return { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ { "type": "text", "text": f"Python code analysis for {target_file}" }, { "type": "resource", "resource": { "uri": f"resource://python-code/{os.path.basename(target_file)}", "mimeType": "application/json", "data": result } } ], "isError": False } } except Exception as e: return { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ { "type": "text", "text": f"Error processing Python code: {str(e)}" } ], "isError": True } } else: return { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32601, "message": f"Unknown tool: {tool_name}" } } # Handle capability negotiation elif method == "initialize": return { "jsonrpc": "2.0", "id": req_id, "result": { "capabilities": { "tools": { "listChanged": False # We don't support dynamic tool changes } } } } # Handle unknown methods else: return { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32601, "message": f"Method not found: {method}" } }