Model Control Plane (MCP) Server

import os import requests import json from typing import Dict, Any, Optional, List, Union import sys import tempfile import subprocess from datetime import datetime def analyze_repo(repo_url: str, base_url: str = "http://localhost:8000", capture_output: bool = False) -> Dict[str, Any]: """ Analyze a Git repository and return useful information about it. Args: repo_url: URL of the Git repository to analyze base_url: URL of the MCP server capture_output: If True, capture and return the output instead of printing Returns: Dict containing repository analysis information """ results = {} # Helper function to log or print results def log_result(key: str, value: Any): if capture_output: results[key] = value else: print(f"\n=== {key} ===") if isinstance(value, (dict, list)): print(json.dumps(value, indent=2)) else: print(value) # Clone repo to temporary directory try: temp_dir = tempfile.mkdtemp(prefix="mcp_git_analysis_") log_result("temporary_directory", temp_dir) # Clone the repository clone_start = log_result("cloning_repo", f"Cloning {repo_url} into {temp_dir}")["git", "clone", "--depth", "1", repo_url, temp_dir], check=True, capture_output=True) clone_duration = ( - clone_start).total_seconds() log_result("clone_duration_seconds", clone_duration) # Get basic repository info repo_info = {} # Get commit count (approximate with depth=1) try: commit_count_cmd = ["git", "rev-list", "--count", "HEAD"], cwd=temp_dir, check=True, capture_output=True, text=True ) repo_info["commit_count"] = int(commit_count_cmd.stdout.strip()) except (subprocess.SubprocessError, ValueError): repo_info["commit_count"] = "unknown" # Get last commit info try: last_commit_cmd = ["git", "log", "-1", "--pretty=format:%h|%an|%ad|%s"], cwd=temp_dir, check=True, capture_output=True, text=True ) commit_parts = last_commit_cmd.stdout.split("|") repo_info["last_commit"] = { "hash": commit_parts[0], "author": commit_parts[1], "date": commit_parts[2], "message": commit_parts[3] if len(commit_parts) > 3 else "" } except subprocess.SubprocessError: repo_info["last_commit"] = "unknown" # Get file types and count file_types = {} try: for root, _, files in os.walk(temp_dir): if ".git" in root: continue for file in files: ext = os.path.splitext(file)[1].lower() if ext: file_types[ext] = file_types.get(ext, 0) + 1 else: file_types["no_extension"] = file_types.get("no_extension", 0) + 1 # Sort by count file_types = {k: v for k, v in sorted(file_types.items(), key=lambda item: item[1], reverse=True)} repo_info["file_types"] = file_types repo_info["file_count"] = sum(file_types.values()) # Determine primary language lang_map = { ".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", ".java": "Java", ".c": "C", ".cpp": "C++", ".cs": "C#", ".go": "Go", ".rs": "Rust", ".rb": "Ruby", ".php": "PHP", ".html": "HTML", ".css": "CSS", ".sh": "Shell", ".md": "Markdown" } # Find the most common extension that maps to a language primary_lang = "Unknown" for ext, count in file_types.items(): if ext in lang_map: primary_lang = lang_map[ext] break repo_info["primary_language"] = primary_lang except Exception as e: repo_info["file_count"] = "error" repo_info["error"] = str(e) log_result("repository_info", repo_info) # Add repository URL to results results["repository"] = repo_url results["timestamp"] = # Enhance with commit history try: # Get recent commits (limited to avoid excessive data) commit_history_cmd = ["git", "log", "--pretty=format:%h|%an|%ad|%s", "-n", "5"], cwd=temp_dir, check=True, capture_output=True, text=True ) commit_history = [] for line in commit_history_cmd.stdout.strip().split('\n'): if line: parts = line.split('|') commit_history.append({ "hash": parts[0], "author": parts[1], "date": parts[2], "message": parts[3] if len(parts) > 3 else "" }) if commit_history: results["commit_history"] = commit_history except Exception as e: log_result("commit_history_error", str(e)) # Also try server-side analysis if the MCP server is available try: # First check if the server is responding at all try: models_response = requests.get(f"{base_url}/v1/models", timeout=5) if models_response.status_code != 200: log_result("server_connection_error", f"MCP server returned status {models_response.status_code}") raise Exception(f"MCP server returned status {models_response.status_code}") except requests.RequestException as e: log_result("server_connection_error", f"Failed to connect to MCP server: {str(e)}") raise e # Test repository analysis through MCP API analyze_payload = {"repo_url": repo_url} try: response ="{base_url}/v1/git/analyze", json=analyze_payload, timeout=10) if response.status_code == 200: server_analysis = response.json() log_result("server_analysis", server_analysis) elif response.status_code == 404: # Endpoint doesn't exist - handle gracefully log_result("server_analysis_status", "Git analysis endpoint not available (404)") log_result("server_analysis_error", "The /v1/git/analyze endpoint is not implemented in this MCP server version") else: log_result("server_analysis_error", f"Error: {response.status_code} - {response.text}") except requests.RequestException as e: log_result("server_analysis_error", f"Request failed: {str(e)}") except Exception as e: log_result("server_connection_error", str(e)) # Clean up try: import shutil shutil.rmtree(temp_dir) log_result("cleanup", "Removed temporary directory") except Exception as e: log_result("cleanup_error", str(e)) except Exception as e: log_result("error", str(e)) # Return the collected results if in capture mode if capture_output: return results else: return {"status": "completed", "output": "printed to console"} def test_git_integration(base_url: str = "http://localhost:8000", repo_url: str = None): """Test the Git integration in the MCP server""" if not repo_url: # If no repo_url is provided as an argument, check if it was passed from CLI if len(sys.argv) > 1: repo_url = sys.argv[1] else: print("Error: Please provide a Git repository URL") print("Usage: python <git-repo-url>") sys.exit(1) print(f"Testing Git integration with repository: {repo_url}") print(f"MCP server: {base_url}") # Run the repository analysis analyze_repo(repo_url, base_url, capture_output=False) print("\nGit integration test completed.") if __name__ == "__main__": test_git_integration()