Skip to main content
Glama

MCP Git Server

by MementoRC
server_github.py23.1 kB
""" GitHub service implementation for MCP Git Server. This module provides a comprehensive GitHub service that handles GitHub API integration, webhook processing, and GitHub-specific functionality. The service provides high-level interfaces, handles authentication, validation, error recovery, and state management. Design principles: - Complete GitHub integration: API calls, webhooks, CLI operations - State management: Maintains operational state and configuration - Error recovery: Robust error handling and rate limiting - Observability: Comprehensive metrics and debugging support - Testability: Extensive test coverage and mocking support Critical for TDD Compliance: This service implements the interface defined by test specifications. DO NOT modify tests to match this implementation - this implementation must satisfy the test requirements to prevent LLM compliance issues. """ import asyncio import json import logging import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from datetime import datetime from threading import Lock from typing import Any from ..configuration.github_config import GitHubConfig from ..protocols.debugging_protocol import DebuggableComponent logger = logging.getLogger(__name__) @dataclass class GitHubServiceConfig: """Configuration for GitHubService following GitServiceConfig pattern.""" github_config: GitHubConfig = field(default_factory=GitHubConfig) enable_webhooks: bool = field(default=False) enable_cli_operations: bool = field(default=True) enable_rate_limiting: bool = field(default=True) max_concurrent_operations: int = field(default=10) operation_timeout: float = field(default=30.0) retry_attempts: int = field(default=3) retry_delay: float = field(default=1.0) @dataclass class GitHubOperationResult: """Result of GitHub operations following GitOperationResult pattern.""" success: bool operation: str data: dict[str, Any] = field(default_factory=dict) error_message: str | None = None status_code: int | None = None rate_limit_remaining: int | None = None execution_time: float = field(default=0.0) retry_count: int = field(default=0) @dataclass class GitHubServiceState: """Internal state management following GitServiceState pattern.""" is_initialized: bool = field(default=False) is_running: bool = field(default=False) operation_count: int = field(default=0) error_count: int = field(default=0) last_operation_time: datetime | None = None active_operations: int = field(default=0) rate_limit_status: dict[str, Any] = field(default_factory=dict) configuration: GitHubServiceConfig | None = None class GitHubService(DebuggableComponent): """ Comprehensive GitHub service for MCP Git Server. Provides GitHub API integration, webhook handling, and GitHub-specific functionality following the established service patterns. """ def __init__(self, config: GitHubServiceConfig | None = None) -> None: """Initialize GitHub service with configuration.""" self._config = config or GitHubServiceConfig() self._state = GitHubServiceState(configuration=self._config) self._executor = ThreadPoolExecutor( max_workers=self._config.max_concurrent_operations, thread_name_prefix="github-service", ) self._operation_lock = Lock() self._operation_history: list[dict[str, Any]] = [] logger.info("GitHub service initialized") async def start(self) -> None: """Start the GitHub service with token validation.""" if self._state.is_running: logger.warning("GitHub service already running") return try: # Validate GitHub configuration await self._validate_configuration() # Initialize rate limiting if self._config.enable_rate_limiting: await self._initialize_rate_limiting() self._state.is_initialized = True self._state.is_running = True logger.info("GitHub service started successfully") except Exception as e: logger.error(f"Failed to start GitHub service: {e}") raise async def stop(self) -> None: """Graceful shutdown of GitHub service.""" if not self._state.is_running: return try: # Wait for active operations to complete timeout = 30.0 start_time = time.time() while ( self._state.active_operations > 0 and (time.time() - start_time) < timeout ): await asyncio.sleep(0.1) # Shutdown executor self._executor.shutdown(wait=True) self._state.is_running = False logger.info("GitHub service stopped successfully") except Exception as e: logger.error(f"Error during GitHub service shutdown: {e}") raise # GitHub API Operations async def get_pr_checks( self, repo_owner: str, repo_name: str, pr_number: int, conclusion: str | None = None, status: str | None = None, ) -> GitHubOperationResult: """Get check runs for a pull request.""" return await self._execute_github_operation( "get_pr_checks", self._get_pr_checks_impl, repo_owner=repo_owner, repo_name=repo_name, pr_number=pr_number, conclusion=conclusion, status=status, ) async def get_failing_jobs( self, repo_owner: str, repo_name: str, pr_number: int, include_annotations: bool = True, include_logs: bool = True, ) -> GitHubOperationResult: """Get detailed information about failing jobs in a PR.""" return await self._execute_github_operation( "get_failing_jobs", self._get_failing_jobs_impl, repo_owner=repo_owner, repo_name=repo_name, pr_number=pr_number, include_annotations=include_annotations, include_logs=include_logs, ) async def get_workflow_run( self, repo_owner: str, repo_name: str, run_id: int, include_logs: bool = False ) -> GitHubOperationResult: """Get detailed workflow run information.""" return await self._execute_github_operation( "get_workflow_run", self._get_workflow_run_impl, repo_owner=repo_owner, repo_name=repo_name, run_id=run_id, include_logs=include_logs, ) async def get_pr_details( self, repo_owner: str, repo_name: str, pr_number: int, include_files: bool = False, include_reviews: bool = False, ) -> GitHubOperationResult: """Get comprehensive PR details.""" return await self._execute_github_operation( "get_pr_details", self._get_pr_details_impl, repo_owner=repo_owner, repo_name=repo_name, pr_number=pr_number, include_files=include_files, include_reviews=include_reviews, ) async def list_pull_requests( self, repo_owner: str, repo_name: str, state: str = "open", head: str | None = None, base: str | None = None, sort: str = "created", direction: str = "desc", per_page: int = 30, page: int = 1, ) -> GitHubOperationResult: """List pull requests for a repository with filtering and pagination.""" return await self._execute_github_operation( "list_pull_requests", self._list_pull_requests_impl, repo_owner=repo_owner, repo_name=repo_name, state=state, head=head, base=base, sort=sort, direction=direction, per_page=per_page, page=page, ) async def get_pr_status( self, repo_owner: str, repo_name: str, pr_number: int ) -> GitHubOperationResult: """Get the status and check runs for a pull request.""" return await self._execute_github_operation( "get_pr_status", self._get_pr_status_impl, repo_owner=repo_owner, repo_name=repo_name, pr_number=pr_number, ) async def get_pr_files( self, repo_owner: str, repo_name: str, pr_number: int, per_page: int = 30, page: int = 1, include_patch: bool = False, ) -> GitHubOperationResult: """Get files changed in a pull request with pagination support.""" return await self._execute_github_operation( "get_pr_files", self._get_pr_files_impl, repo_owner=repo_owner, repo_name=repo_name, pr_number=pr_number, per_page=per_page, page=page, include_patch=include_patch, ) # Private Implementation Methods async def _execute_github_operation( self, operation_name: str, operation_func, **kwargs ) -> GitHubOperationResult: """Execute a GitHub operation with error handling and metrics.""" start_time = time.time() with self._operation_lock: self._state.operation_count += 1 self._state.active_operations += 1 self._state.last_operation_time = datetime.now() try: # Check rate limits if self._config.enable_rate_limiting: await self._check_rate_limits() # Execute operation result = await operation_func(**kwargs) # Record success execution_time = time.time() - start_time operation_result = GitHubOperationResult( success=True, operation=operation_name, data=result, execution_time=execution_time, ) self._record_operation_history(operation_result) return operation_result except Exception as e: # Record error with self._operation_lock: self._state.error_count += 1 execution_time = time.time() - start_time operation_result = GitHubOperationResult( success=False, operation=operation_name, error_message=str(e), execution_time=execution_time, ) self._record_operation_history(operation_result) logger.error(f"GitHub operation {operation_name} failed: {e}") return operation_result finally: with self._operation_lock: self._state.active_operations -= 1 async def _validate_configuration(self) -> None: """Validate GitHub configuration.""" # TODO: Implement GitHub token validation pass async def _initialize_rate_limiting(self) -> None: """Initialize GitHub API rate limiting.""" # TODO: Implement rate limiting initialization pass async def _check_rate_limits(self) -> None: """Check and enforce GitHub API rate limits.""" # TODO: Implement rate limit checking pass def _record_operation_history(self, result: GitHubOperationResult) -> None: """Record operation in history for debugging.""" history_entry = { "timestamp": datetime.now().isoformat(), "operation": result.operation, "success": result.success, "execution_time": result.execution_time, "error_message": result.error_message, } self._operation_history.append(history_entry) # Keep only last 100 operations if len(self._operation_history) > 100: self._operation_history.pop(0) # GitHub API Implementation Methods (placeholders for extracted logic) async def _get_pr_checks_impl(self, **kwargs) -> dict[str, Any]: """Implementation for getting PR checks.""" # TODO: Move actual GitHub API logic here from server.py return {} async def _get_failing_jobs_impl(self, **kwargs) -> dict[str, Any]: """Implementation for getting failing jobs.""" # TODO: Move actual GitHub API logic here from server.py return {} async def _get_workflow_run_impl(self, **kwargs) -> dict[str, Any]: """Implementation for getting workflow runs.""" # TODO: Move actual GitHub API logic here from server.py return {} async def _get_pr_details_impl(self, **kwargs) -> dict[str, Any]: """Implementation for getting PR details.""" # TODO: Move actual GitHub API logic here from server.py return {} async def _list_pull_requests_impl(self, **kwargs) -> dict[str, Any]: """Implementation for listing pull requests.""" # TODO: Move actual GitHub API logic here from server.py return {} async def _get_pr_status_impl(self, **kwargs) -> dict[str, Any]: """Implementation for getting PR status.""" # TODO: Move actual GitHub API logic here from server.py return {} async def _get_pr_files_impl(self, **kwargs) -> dict[str, Any]: """Implementation for getting PR files.""" # TODO: Move actual GitHub API logic here from server.py return {} # DebuggableComponent Protocol Implementation def get_component_state(self): """Get current component state for debugging.""" from dataclasses import asdict class GitHubServiceComponentState: def __init__(self, state: GitHubServiceState): self._state = state @property def component_id(self) -> str: return f"github_service_{id(self._state)}" @property def component_type(self) -> str: return "GitHubService" @property def state_data(self) -> dict[str, Any]: return asdict(self._state) @property def last_updated(self) -> datetime: return self._state.last_operation_time or datetime.now() return GitHubServiceComponentState(self._state) def validate_component(self): """Validate component configuration and state.""" class GitHubServiceValidationResult: def __init__(self, is_valid: bool, errors: list[str], warnings: list[str]): self._is_valid = is_valid self._errors = errors self._warnings = warnings self._timestamp = datetime.now() @property def is_valid(self) -> bool: return self._is_valid @property def validation_errors(self) -> list[str]: return self._errors @property def validation_warnings(self) -> list[str]: return self._warnings @property def validation_timestamp(self) -> datetime: return self._timestamp errors = [] warnings = [] if not self._state.is_initialized: errors.append("GitHub service not initialized") if self._state.error_count > self._state.operation_count * 0.5: errors.append("High error rate detected") if self._state.active_operations > self._config.max_concurrent_operations: errors.append("Too many concurrent operations") if not self._config.github_config.api_token: warnings.append("No GitHub API token configured") return GitHubServiceValidationResult(len(errors) == 0, errors, warnings) def get_debug_info(self, debug_level: str = "INFO"): """Get debugging information.""" class GitHubServiceDebugInfo: def __init__( self, debug_level: str, state: GitHubServiceState, config: GitHubServiceConfig, operation_history: list[dict[str, Any]], ): self._debug_level = debug_level self._state = state self._config = config self._operation_history = operation_history @property def debug_level(self) -> str: return self._debug_level @property def debug_data(self) -> dict[str, Any]: data = { "service_state": { "initialized": self._state.is_initialized, "running": self._state.is_running, "operations": self._state.operation_count, "errors": self._state.error_count, }, "configuration": { "rate_limiting_enabled": self._config.enable_rate_limiting, "webhooks_enabled": self._config.enable_webhooks, "max_concurrent": self._config.max_concurrent_operations, }, } if self._debug_level in ["DEBUG", "TRACE"]: data["recent_operations"] = self._operation_history[-10:] data["rate_limit_status"] = self._state.rate_limit_status return data @property def stack_trace(self) -> list[str] | None: return None # No stack trace for normal operation @property def performance_metrics(self) -> dict[str, int | float]: return { "success_rate": self._calculate_success_rate(), "average_execution_time": self._calculate_average_execution_time(), "operations_per_minute": self._calculate_operations_per_minute(), } def _calculate_success_rate(self) -> float: if self._state.operation_count == 0: return 1.0 success_count = self._state.operation_count - self._state.error_count return success_count / self._state.operation_count def _calculate_average_execution_time(self) -> float: if not self._operation_history: return 0.0 total_time = sum( op.get("execution_time", 0.0) for op in self._operation_history ) return total_time / len(self._operation_history) def _calculate_operations_per_minute(self) -> float: if not self._state.last_operation_time: return 0.0 # Simple calculation based on recent operations return min(60.0, self._state.operation_count) return GitHubServiceDebugInfo( debug_level, self._state, self._config, self._operation_history ) def inspect_state(self, path: str | None = None) -> dict[str, Any]: """Inspect internal state for debugging.""" state_data = { "service_id": f"github_service_{id(self._state)}", "is_initialized": self._state.is_initialized, "is_running": self._state.is_running, "operation_count": self._state.operation_count, "error_count": self._state.error_count, "active_operations": self._state.active_operations, "configuration": { "github_config": self._config.github_config.model_dump() if self._config.github_config else {}, "enable_webhooks": self._config.enable_webhooks, "enable_cli_operations": self._config.enable_cli_operations, "enable_rate_limiting": self._config.enable_rate_limiting, "max_concurrent_operations": self._config.max_concurrent_operations, "operation_timeout": self._config.operation_timeout, }, "metrics": { "success_rate": self._calculate_success_rate(), "last_operation": self._state.last_operation_time.isoformat() if self._state.last_operation_time else None, }, "rate_limit_status": self._state.rate_limit_status, } if path is None: return state_data # Navigate to specific path using dot notation keys = path.split(".") current: Any = state_data for key in keys: if isinstance(current, dict) and key in current: current = current[key] else: return {"error": f"Path '{path}' not found"} # Ensure we return the expected format if not isinstance(current, dict): current = {"value": current} return {path: current} def get_component_dependencies(self) -> list[str]: """Get component dependencies.""" return [ "GitHubConfig", "ThreadPoolExecutor", "GitHub API", "Network connectivity", ] def export_state_json(self) -> str: """Export state as JSON string.""" state_data = self.inspect_state() # Make datetime objects and other types JSON serializable def json_serializer(obj): if isinstance(obj, datetime): return obj.isoformat() # Handle Pydantic HttpUrl and other complex types if hasattr(obj, "__str__"): return str(obj) raise TypeError(f"Object of type {type(obj)} is not JSON serializable") return json.dumps(state_data, indent=2, default=json_serializer) def health_check(self) -> dict[str, Any]: """Perform health check and return status.""" success_rate = self._calculate_success_rate() # Consider service healthy if initialized, running, and has good success rate is_healthy = ( self._state.is_initialized and self._state.is_running and success_rate >= 0.8 ) return { "healthy": is_healthy, "initialized": self._state.is_initialized, "running": self._state.is_running, "operation_count": self._state.operation_count, "error_count": self._state.error_count, "success_rate": success_rate, "active_operations": self._state.active_operations, } def _calculate_success_rate(self) -> float: """Calculate operation success rate.""" if self._state.operation_count == 0: return 1.0 success_count = self._state.operation_count - self._state.error_count return success_count / self._state.operation_count

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server