Skip to main content
Glama

MCP Git Server

by MementoRC
USAGE_GUIDE.md17.3 kB
# MCP Git Server Usage Guide ## Table of Contents 1. [Quick Start](#quick-start) 2. [Installation and Setup](#installation-and-setup) 3. [Basic Usage](#basic-usage) 4. [Advanced Configuration](#advanced-configuration) 5. [Error Handling Patterns](#error-handling-patterns) 6. [Performance Optimization](#performance-optimization) 7. [Monitoring and Debugging](#monitoring-and-debugging) 8. [Best Practices](#best-practices) 9. [Troubleshooting](#troubleshooting) ## Quick Start ### Basic Server Setup ```python import asyncio from mcp_server_git.server import MCPGitServer async def main(): server = MCPGitServer() await server.start() print("MCP Git Server is running...") # Keep server running until interrupted try: await asyncio.Event().wait() except KeyboardInterrupt: print("Shutting down server...") finally: await server.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` ### Client Connection Example ```python # Example client connection (pseudo-code) import json import websockets async def connect_to_server(): uri = "ws://localhost:8080" async with websockets.connect(uri) as websocket: # Send initialization message init_message = { "type": "initialize", "id": "init_123", "params": { "capabilities": { "cancellation": True, "progress": True } } } await websocket.send(json.dumps(init_message)) # Receive response response = await websocket.recv() print(f"Server response: {response}") ``` ## Installation and Setup ### Environment Setup ```bash # Install dependencies pixi install # Set environment variables export MCP_GIT_LOG_LEVEL=INFO export MCP_GIT_SESSION_TIMEOUT=3600 export MCP_GIT_HEARTBEAT_INTERVAL=30 export MCP_GIT_MAX_CONCURRENT_SESSIONS=100 ``` ### Development Setup ```python # Development configuration with enhanced debugging import logging from mcp_server_git.server import MCPGitServer from mcp_server_git.logging_config import setup_logging # Setup enhanced logging setup_logging( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", enable_json_logging=True ) # Create server with debug options server = MCPGitServer(debug=True) ``` ## Basic Usage ### Message Processing The server handles various MCP message types automatically: ```python # Notification messages are processed by the NotificationInterceptor notification = { "type": "notifications/cancelled", "id": "cancel_123", "request_id": "operation_456", "reason": "User requested cancellation" } # Progress notifications progress = { "type": "notifications/progress", "id": "progress_123", "request_id": "operation_456", "progress": { "current": 50, "total": 100, "message": "Processing files..." } } ``` ### Session Management ```python from mcp_server_git.session import SessionManager # Create and manage sessions session_manager = SessionManager() # Create new session session = await session_manager.create_session() # Track session activity session.record_message() # Record processed message session.record_error() # Record error occurrence session.add_operation("git_clone_123") # Track active operation # Session cleanup await session_manager.cleanup_idle_sessions(max_idle_time=3600) ``` ### Error Handling ```python from mcp_server_git.error_handling import recoverable, ErrorSeverity # Automatic retry for transient errors @recoverable(max_retries=3, backoff_factor=1.5) async def unreliable_operation(): # Operation that might fail temporarily return await perform_network_request() # Error classification and handling try: result = await unreliable_operation() except Exception as e: # Log error with appropriate severity logger.error(f"Operation failed: {e}", extra={ "severity": ErrorSeverity.HIGH, "operation": "network_request", "recoverable": True }) ``` ## Advanced Configuration ### Custom Server Configuration ```python from mcp_server_git.server import MCPGitServer from mcp_server_git.session import SessionManager, HeartbeatManager from mcp_server_git.error_handling import CircuitBreaker class CustomMCPServer(MCPGitServer): def __init__(self): super().__init__() # Custom session manager self.session_manager = SessionManager( max_sessions=500, session_timeout=7200 # 2 hours ) # Custom heartbeat manager self.heartbeat_manager = HeartbeatManager( session_manager=self.session_manager, heartbeat_interval=15.0, # More frequent heartbeats missed_heartbeat_threshold=5 # More tolerant of missed beats ) # Circuit breakers for different operations self.git_circuit = CircuitBreaker( name="git_operations", failure_threshold=10, recovery_timeout=60.0 ) self.network_circuit = CircuitBreaker( name="network_operations", failure_threshold=5, recovery_timeout=30.0 ) async def start(self): await super().start() await self.heartbeat_manager.start() print("Custom MCP Server started with enhanced configuration") ``` ### Performance Tuning ```python from mcp_server_git.optimizations import ( enable_validation_cache, set_cache_size, PerformanceTimer ) # Enable and configure validation caching enable_validation_cache() set_cache_size(1000) # Cache up to 1000 validation results # Performance monitoring for all operations class PerformanceAwareMCPServer(MCPGitServer): async def handle_message(self, message, session): with PerformanceTimer(f"message_{message.get('type', 'unknown')}"): return await super().handle_message(message, session) ``` ## Error Handling Patterns ### Graceful Degradation ```python from mcp_server_git.models.enhanced_validation import enhanced_validate_message async def handle_message_with_fallback(self, raw_message, session): """Handle message with graceful degradation on validation errors.""" # Try strict validation first result = enhanced_validate_message(raw_message, strict_mode=True) if result.is_valid: return await self.process_validated_message(result.model, session) # Fall back to lenient validation result = enhanced_validate_message(raw_message, strict_mode=False) if result.is_valid: # Log warning about non-strict validation logger.warning(f"Message validated with fallback: {result.validation_warnings}") return await self.process_validated_message(result.model, session) # Final fallback: extract critical fields manually message_type = raw_message.get("type", "unknown") message_id = raw_message.get("id", "unknown") logger.error(f"Failed to validate {message_type} message {message_id}") # Send error response to client await self.send_error_response(session.session_id, { "error": "validation_failed", "message": f"Could not validate {message_type} message", "original_id": message_id }) ``` ### Circuit Breaker Integration ```python from mcp_server_git.error_handling import with_circuit_breaker, CircuitOpenError class RobustGitOperations: def __init__(self): self.git_circuit = CircuitBreaker("git_ops", failure_threshold=5) @with_circuit_breaker(self.git_circuit) async def perform_git_operation(self, operation_type, **kwargs): """Perform git operation with circuit breaker protection.""" try: return await self._execute_git_command(operation_type, **kwargs) except Exception as e: logger.error(f"Git operation failed: {e}") raise async def safe_git_operation(self, operation_type, **kwargs): """Perform git operation with circuit breaker fallback.""" try: return await self.perform_git_operation(operation_type, **kwargs) except CircuitOpenError: # Circuit is open, return cached result or error logger.warning("Git operations circuit is open, using fallback") return await self._get_cached_result(operation_type, **kwargs) ``` ## Performance Optimization ### Message Validation Caching ```python from mcp_server_git.optimizations import get_validation_cache_stats # Monitor cache performance async def log_cache_stats_periodically(): while True: await asyncio.sleep(300) # Every 5 minutes stats = get_validation_cache_stats() if stats['total'] > 0: hit_rate = stats['hits'] / stats['total'] logger.info(f"Validation cache hit rate: {hit_rate:.2%}", extra={ "cache_hits": stats['hits'], "cache_misses": stats['misses'], "cache_size": stats['current_size'] }) ``` ### Memory Management ```python import gc from mcp_server_git.optimizations import MemoryMonitor class MemoryEfficientServer(MCPGitServer): def __init__(self): super().__init__() self.memory_monitor = MemoryMonitor() async def start(self): await super().start() # Start memory monitoring asyncio.create_task(self._monitor_memory()) async def _monitor_memory(self): """Monitor memory usage and trigger cleanup when needed.""" while True: current_memory = self.memory_monitor.take_sample("periodic") # If memory usage is high, force garbage collection if current_memory > 100: # 100MB threshold gc.collect() logger.warning(f"High memory usage detected: {current_memory:.2f}MB") await asyncio.sleep(60) # Check every minute ``` ## Monitoring and Debugging ### Structured Logging ```python import structlog from mcp_server_git.logging_config import setup_structured_logging # Setup structured logging setup_structured_logging() logger = structlog.get_logger() # Log with structured data logger.info("Message processed", message_type="notifications/cancelled", session_id="session_123", processing_time_ms=45, operation_id="git_clone_456" ) ``` ### Metrics Collection ```python from mcp_server_git.metrics import ServerMetrics import prometheus_client class MonitoredMCPServer(MCPGitServer): def __init__(self): super().__init__() self.metrics = ServerMetrics() # Prometheus metrics self.message_counter = prometheus_client.Counter( 'mcp_messages_total', 'Total number of messages processed', ['message_type', 'status'] ) self.operation_histogram = prometheus_client.Histogram( 'mcp_operation_duration_seconds', 'Time spent processing operations', ['operation_type'] ) async def handle_message(self, message, session): message_type = message.get('type', 'unknown') with self.operation_histogram.labels(operation_type=message_type).time(): try: result = await super().handle_message(message, session) self.message_counter.labels( message_type=message_type, status='success' ).inc() return result except Exception as e: self.message_counter.labels( message_type=message_type, status='error' ).inc() raise ``` ### Health Checks ```python from datetime import datetime, timedelta class HealthCheckMCPServer(MCPGitServer): def __init__(self): super().__init__() self.start_time = datetime.now() self.last_health_check = datetime.now() async def health_check(self): """Perform comprehensive health check.""" health_status = { "status": "healthy", "uptime_seconds": (datetime.now() - self.start_time).total_seconds(), "checks": {} } # Check session manager sessions = await self.session_manager.get_all_sessions() health_status["checks"]["sessions"] = { "active_count": len(sessions), "status": "ok" if len(sessions) < 1000 else "warning" } # Check circuit breaker status health_status["checks"]["circuit_breakers"] = { "git_operations": self.git_circuit.state.value, "status": "ok" if self.git_circuit.state != CircuitState.OPEN else "error" } # Check memory usage memory_mb = self.memory_monitor.take_sample("health_check") health_status["checks"]["memory"] = { "usage_mb": memory_mb, "status": "ok" if memory_mb < 200 else "warning" } self.last_health_check = datetime.now() return health_status ``` ## Best Practices ### Error Handling 1. **Always classify errors by severity** ```python from mcp_server_git.error_handling import ErrorSeverity, ErrorContext # Critical errors should terminate sessions if error_type == "authentication_failed": context = ErrorContext(e, ErrorSeverity.CRITICAL, "auth", session_id) await handle_error(context) ``` 2. **Use circuit breakers for external dependencies** ```python # Protect against cascading failures @with_circuit_breaker(github_api_circuit) async def call_github_api(): return await github_client.get_repo_info() ``` 3. **Implement graceful degradation** ```python try: # Try primary approach result = await primary_operation() except Exception: # Fall back to secondary approach result = await fallback_operation() ``` ### Performance 1. **Enable validation caching in production** ```python from mcp_server_git.optimizations import enable_validation_cache # Always enable in production enable_validation_cache() ``` 2. **Monitor performance metrics** ```python # Track all operations with PerformanceTimer("operation_name"): await operation() ``` 3. **Use asynchronous operations** ```python # Process multiple operations concurrently tasks = [process_message(msg) for msg in messages] results = await asyncio.gather(*tasks, return_exceptions=True) ``` ### Session Management 1. **Configure appropriate timeouts** ```python # Configure based on expected usage patterns heartbeat_manager = HeartbeatManager( heartbeat_interval=30.0, # Based on network conditions missed_heartbeat_threshold=3 # Balance between responsiveness and tolerance ) ``` 2. **Clean up resources properly** ```python async def cleanup_session(self, session_id): # Cancel active operations session = await self.session_manager.get_session(session_id) for op_id in session.active_operations: await self.cancel_operation(op_id) # Close session await self.session_manager.close_session(session_id) ``` ## Troubleshooting ### Common Issues #### High Memory Usage ```python # Debug memory leaks from mcp_server_git.optimizations import MemoryMonitor monitor = MemoryMonitor() monitor.take_sample("start") # ... perform operations ... monitor.take_sample("end") growth = monitor.get_memory_growth() if growth > 50: # 50MB threshold logger.warning(f"Potential memory leak detected: {growth}MB growth") # Log details about active sessions and operations ``` #### Circuit Breaker Tripping ```python # Check circuit breaker status if circuit.state == CircuitState.OPEN: logger.info(f"Circuit {circuit.name} is open", extra={ "failure_count": circuit.failure_count, "last_failure_time": circuit.last_failure_time, "recovery_timeout": circuit.recovery_timeout }) # Wait for recovery or manually reset if needed if manual_reset_needed: circuit.reset() ``` #### Session Management Issues ```python # Debug session problems sessions = await session_manager.get_all_sessions() for session in sessions: if session.idle_time > 3600: # 1 hour logger.warning(f"Long idle session detected", extra={ "session_id": session.session_id, "idle_time": session.idle_time, "message_count": session.message_count, "error_count": session.error_count }) ``` ### Debugging Tools #### Enable Debug Logging ```python import logging from mcp_server_git.logging_config import setup_logging # Enable debug logging setup_logging(level=logging.DEBUG) # Add trace logging for specific components logging.getLogger("mcp_server_git.session").setLevel(logging.DEBUG) logging.getLogger("mcp_server_git.validation").setLevel(logging.DEBUG) ``` #### Performance Profiling ```python from mcp_server_git.optimizations import CPUProfiler # Profile performance bottlenecks with CPUProfiler("message_processing"): await process_messages(messages) # Analyze results # python -m pstats message_processing.stats ``` This usage guide provides comprehensive examples and patterns for effectively using the MCP Git Server's enhanced features.

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server