server_github.py•23.1 kB
"""
GitHub service implementation for MCP Git Server.
This module provides a comprehensive GitHub service that handles GitHub API
integration, webhook processing, and GitHub-specific functionality.
The service provides high-level interfaces, handles authentication, validation,
error recovery, and state management.
Design principles:
- Complete GitHub integration: API calls, webhooks, CLI operations
- State management: Maintains operational state and configuration
- Error recovery: Robust error handling and rate limiting
- Observability: Comprehensive metrics and debugging support
- Testability: Extensive test coverage and mocking support
Critical for TDD Compliance:
This service implements the interface defined by test specifications.
DO NOT modify tests to match this implementation - this implementation
must satisfy the test requirements to prevent LLM compliance issues.
"""
import asyncio
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from threading import Lock
from typing import Any
from ..configuration.github_config import GitHubConfig
from ..protocols.debugging_protocol import DebuggableComponent
logger = logging.getLogger(__name__)
@dataclass
class GitHubServiceConfig:
"""Configuration for GitHubService following GitServiceConfig pattern."""
github_config: GitHubConfig = field(default_factory=GitHubConfig)
enable_webhooks: bool = field(default=False)
enable_cli_operations: bool = field(default=True)
enable_rate_limiting: bool = field(default=True)
max_concurrent_operations: int = field(default=10)
operation_timeout: float = field(default=30.0)
retry_attempts: int = field(default=3)
retry_delay: float = field(default=1.0)
@dataclass
class GitHubOperationResult:
"""Result of GitHub operations following GitOperationResult pattern."""
success: bool
operation: str
data: dict[str, Any] = field(default_factory=dict)
error_message: str | None = None
status_code: int | None = None
rate_limit_remaining: int | None = None
execution_time: float = field(default=0.0)
retry_count: int = field(default=0)
@dataclass
class GitHubServiceState:
"""Internal state management following GitServiceState pattern."""
is_initialized: bool = field(default=False)
is_running: bool = field(default=False)
operation_count: int = field(default=0)
error_count: int = field(default=0)
last_operation_time: datetime | None = None
active_operations: int = field(default=0)
rate_limit_status: dict[str, Any] = field(default_factory=dict)
configuration: GitHubServiceConfig | None = None
class GitHubService(DebuggableComponent):
"""
Comprehensive GitHub service for MCP Git Server.
Provides GitHub API integration, webhook handling, and GitHub-specific
functionality following the established service patterns.
"""
def __init__(self, config: GitHubServiceConfig | None = None) -> None:
"""Initialize GitHub service with configuration."""
self._config = config or GitHubServiceConfig()
self._state = GitHubServiceState(configuration=self._config)
self._executor = ThreadPoolExecutor(
max_workers=self._config.max_concurrent_operations,
thread_name_prefix="github-service",
)
self._operation_lock = Lock()
self._operation_history: list[dict[str, Any]] = []
logger.info("GitHub service initialized")
async def start(self) -> None:
"""Start the GitHub service with token validation."""
if self._state.is_running:
logger.warning("GitHub service already running")
return
try:
# Validate GitHub configuration
await self._validate_configuration()
# Initialize rate limiting
if self._config.enable_rate_limiting:
await self._initialize_rate_limiting()
self._state.is_initialized = True
self._state.is_running = True
logger.info("GitHub service started successfully")
except Exception as e:
logger.error(f"Failed to start GitHub service: {e}")
raise
async def stop(self) -> None:
"""Graceful shutdown of GitHub service."""
if not self._state.is_running:
return
try:
# Wait for active operations to complete
timeout = 30.0
start_time = time.time()
while (
self._state.active_operations > 0
and (time.time() - start_time) < timeout
):
await asyncio.sleep(0.1)
# Shutdown executor
self._executor.shutdown(wait=True)
self._state.is_running = False
logger.info("GitHub service stopped successfully")
except Exception as e:
logger.error(f"Error during GitHub service shutdown: {e}")
raise
# GitHub API Operations
async def get_pr_checks(
self,
repo_owner: str,
repo_name: str,
pr_number: int,
conclusion: str | None = None,
status: str | None = None,
) -> GitHubOperationResult:
"""Get check runs for a pull request."""
return await self._execute_github_operation(
"get_pr_checks",
self._get_pr_checks_impl,
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
conclusion=conclusion,
status=status,
)
async def get_failing_jobs(
self,
repo_owner: str,
repo_name: str,
pr_number: int,
include_annotations: bool = True,
include_logs: bool = True,
) -> GitHubOperationResult:
"""Get detailed information about failing jobs in a PR."""
return await self._execute_github_operation(
"get_failing_jobs",
self._get_failing_jobs_impl,
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
include_annotations=include_annotations,
include_logs=include_logs,
)
async def get_workflow_run(
self, repo_owner: str, repo_name: str, run_id: int, include_logs: bool = False
) -> GitHubOperationResult:
"""Get detailed workflow run information."""
return await self._execute_github_operation(
"get_workflow_run",
self._get_workflow_run_impl,
repo_owner=repo_owner,
repo_name=repo_name,
run_id=run_id,
include_logs=include_logs,
)
async def get_pr_details(
self,
repo_owner: str,
repo_name: str,
pr_number: int,
include_files: bool = False,
include_reviews: bool = False,
) -> GitHubOperationResult:
"""Get comprehensive PR details."""
return await self._execute_github_operation(
"get_pr_details",
self._get_pr_details_impl,
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
include_files=include_files,
include_reviews=include_reviews,
)
async def list_pull_requests(
self,
repo_owner: str,
repo_name: str,
state: str = "open",
head: str | None = None,
base: str | None = None,
sort: str = "created",
direction: str = "desc",
per_page: int = 30,
page: int = 1,
) -> GitHubOperationResult:
"""List pull requests for a repository with filtering and pagination."""
return await self._execute_github_operation(
"list_pull_requests",
self._list_pull_requests_impl,
repo_owner=repo_owner,
repo_name=repo_name,
state=state,
head=head,
base=base,
sort=sort,
direction=direction,
per_page=per_page,
page=page,
)
async def get_pr_status(
self, repo_owner: str, repo_name: str, pr_number: int
) -> GitHubOperationResult:
"""Get the status and check runs for a pull request."""
return await self._execute_github_operation(
"get_pr_status",
self._get_pr_status_impl,
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
)
async def get_pr_files(
self,
repo_owner: str,
repo_name: str,
pr_number: int,
per_page: int = 30,
page: int = 1,
include_patch: bool = False,
) -> GitHubOperationResult:
"""Get files changed in a pull request with pagination support."""
return await self._execute_github_operation(
"get_pr_files",
self._get_pr_files_impl,
repo_owner=repo_owner,
repo_name=repo_name,
pr_number=pr_number,
per_page=per_page,
page=page,
include_patch=include_patch,
)
# Private Implementation Methods
async def _execute_github_operation(
self, operation_name: str, operation_func, **kwargs
) -> GitHubOperationResult:
"""Execute a GitHub operation with error handling and metrics."""
start_time = time.time()
with self._operation_lock:
self._state.operation_count += 1
self._state.active_operations += 1
self._state.last_operation_time = datetime.now()
try:
# Check rate limits
if self._config.enable_rate_limiting:
await self._check_rate_limits()
# Execute operation
result = await operation_func(**kwargs)
# Record success
execution_time = time.time() - start_time
operation_result = GitHubOperationResult(
success=True,
operation=operation_name,
data=result,
execution_time=execution_time,
)
self._record_operation_history(operation_result)
return operation_result
except Exception as e:
# Record error
with self._operation_lock:
self._state.error_count += 1
execution_time = time.time() - start_time
operation_result = GitHubOperationResult(
success=False,
operation=operation_name,
error_message=str(e),
execution_time=execution_time,
)
self._record_operation_history(operation_result)
logger.error(f"GitHub operation {operation_name} failed: {e}")
return operation_result
finally:
with self._operation_lock:
self._state.active_operations -= 1
async def _validate_configuration(self) -> None:
"""Validate GitHub configuration."""
# TODO: Implement GitHub token validation
pass
async def _initialize_rate_limiting(self) -> None:
"""Initialize GitHub API rate limiting."""
# TODO: Implement rate limiting initialization
pass
async def _check_rate_limits(self) -> None:
"""Check and enforce GitHub API rate limits."""
# TODO: Implement rate limit checking
pass
def _record_operation_history(self, result: GitHubOperationResult) -> None:
"""Record operation in history for debugging."""
history_entry = {
"timestamp": datetime.now().isoformat(),
"operation": result.operation,
"success": result.success,
"execution_time": result.execution_time,
"error_message": result.error_message,
}
self._operation_history.append(history_entry)
# Keep only last 100 operations
if len(self._operation_history) > 100:
self._operation_history.pop(0)
# GitHub API Implementation Methods (placeholders for extracted logic)
async def _get_pr_checks_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for getting PR checks."""
# TODO: Move actual GitHub API logic here from server.py
return {}
async def _get_failing_jobs_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for getting failing jobs."""
# TODO: Move actual GitHub API logic here from server.py
return {}
async def _get_workflow_run_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for getting workflow runs."""
# TODO: Move actual GitHub API logic here from server.py
return {}
async def _get_pr_details_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for getting PR details."""
# TODO: Move actual GitHub API logic here from server.py
return {}
async def _list_pull_requests_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for listing pull requests."""
# TODO: Move actual GitHub API logic here from server.py
return {}
async def _get_pr_status_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for getting PR status."""
# TODO: Move actual GitHub API logic here from server.py
return {}
async def _get_pr_files_impl(self, **kwargs) -> dict[str, Any]:
"""Implementation for getting PR files."""
# TODO: Move actual GitHub API logic here from server.py
return {}
# DebuggableComponent Protocol Implementation
def get_component_state(self):
"""Get current component state for debugging."""
from dataclasses import asdict
class GitHubServiceComponentState:
def __init__(self, state: GitHubServiceState):
self._state = state
@property
def component_id(self) -> str:
return f"github_service_{id(self._state)}"
@property
def component_type(self) -> str:
return "GitHubService"
@property
def state_data(self) -> dict[str, Any]:
return asdict(self._state)
@property
def last_updated(self) -> datetime:
return self._state.last_operation_time or datetime.now()
return GitHubServiceComponentState(self._state)
def validate_component(self):
"""Validate component configuration and state."""
class GitHubServiceValidationResult:
def __init__(self, is_valid: bool, errors: list[str], warnings: list[str]):
self._is_valid = is_valid
self._errors = errors
self._warnings = warnings
self._timestamp = datetime.now()
@property
def is_valid(self) -> bool:
return self._is_valid
@property
def validation_errors(self) -> list[str]:
return self._errors
@property
def validation_warnings(self) -> list[str]:
return self._warnings
@property
def validation_timestamp(self) -> datetime:
return self._timestamp
errors = []
warnings = []
if not self._state.is_initialized:
errors.append("GitHub service not initialized")
if self._state.error_count > self._state.operation_count * 0.5:
errors.append("High error rate detected")
if self._state.active_operations > self._config.max_concurrent_operations:
errors.append("Too many concurrent operations")
if not self._config.github_config.api_token:
warnings.append("No GitHub API token configured")
return GitHubServiceValidationResult(len(errors) == 0, errors, warnings)
def get_debug_info(self, debug_level: str = "INFO"):
"""Get debugging information."""
class GitHubServiceDebugInfo:
def __init__(
self,
debug_level: str,
state: GitHubServiceState,
config: GitHubServiceConfig,
operation_history: list[dict[str, Any]],
):
self._debug_level = debug_level
self._state = state
self._config = config
self._operation_history = operation_history
@property
def debug_level(self) -> str:
return self._debug_level
@property
def debug_data(self) -> dict[str, Any]:
data = {
"service_state": {
"initialized": self._state.is_initialized,
"running": self._state.is_running,
"operations": self._state.operation_count,
"errors": self._state.error_count,
},
"configuration": {
"rate_limiting_enabled": self._config.enable_rate_limiting,
"webhooks_enabled": self._config.enable_webhooks,
"max_concurrent": self._config.max_concurrent_operations,
},
}
if self._debug_level in ["DEBUG", "TRACE"]:
data["recent_operations"] = self._operation_history[-10:]
data["rate_limit_status"] = self._state.rate_limit_status
return data
@property
def stack_trace(self) -> list[str] | None:
return None # No stack trace for normal operation
@property
def performance_metrics(self) -> dict[str, int | float]:
return {
"success_rate": self._calculate_success_rate(),
"average_execution_time": self._calculate_average_execution_time(),
"operations_per_minute": self._calculate_operations_per_minute(),
}
def _calculate_success_rate(self) -> float:
if self._state.operation_count == 0:
return 1.0
success_count = self._state.operation_count - self._state.error_count
return success_count / self._state.operation_count
def _calculate_average_execution_time(self) -> float:
if not self._operation_history:
return 0.0
total_time = sum(
op.get("execution_time", 0.0) for op in self._operation_history
)
return total_time / len(self._operation_history)
def _calculate_operations_per_minute(self) -> float:
if not self._state.last_operation_time:
return 0.0
# Simple calculation based on recent operations
return min(60.0, self._state.operation_count)
return GitHubServiceDebugInfo(
debug_level, self._state, self._config, self._operation_history
)
def inspect_state(self, path: str | None = None) -> dict[str, Any]:
"""Inspect internal state for debugging."""
state_data = {
"service_id": f"github_service_{id(self._state)}",
"is_initialized": self._state.is_initialized,
"is_running": self._state.is_running,
"operation_count": self._state.operation_count,
"error_count": self._state.error_count,
"active_operations": self._state.active_operations,
"configuration": {
"github_config": self._config.github_config.model_dump()
if self._config.github_config
else {},
"enable_webhooks": self._config.enable_webhooks,
"enable_cli_operations": self._config.enable_cli_operations,
"enable_rate_limiting": self._config.enable_rate_limiting,
"max_concurrent_operations": self._config.max_concurrent_operations,
"operation_timeout": self._config.operation_timeout,
},
"metrics": {
"success_rate": self._calculate_success_rate(),
"last_operation": self._state.last_operation_time.isoformat()
if self._state.last_operation_time
else None,
},
"rate_limit_status": self._state.rate_limit_status,
}
if path is None:
return state_data
# Navigate to specific path using dot notation
keys = path.split(".")
current: Any = state_data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return {"error": f"Path '{path}' not found"}
# Ensure we return the expected format
if not isinstance(current, dict):
current = {"value": current}
return {path: current}
def get_component_dependencies(self) -> list[str]:
"""Get component dependencies."""
return [
"GitHubConfig",
"ThreadPoolExecutor",
"GitHub API",
"Network connectivity",
]
def export_state_json(self) -> str:
"""Export state as JSON string."""
state_data = self.inspect_state()
# Make datetime objects and other types JSON serializable
def json_serializer(obj):
if isinstance(obj, datetime):
return obj.isoformat()
# Handle Pydantic HttpUrl and other complex types
if hasattr(obj, "__str__"):
return str(obj)
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
return json.dumps(state_data, indent=2, default=json_serializer)
def health_check(self) -> dict[str, Any]:
"""Perform health check and return status."""
success_rate = self._calculate_success_rate()
# Consider service healthy if initialized, running, and has good success rate
is_healthy = (
self._state.is_initialized
and self._state.is_running
and success_rate >= 0.8
)
return {
"healthy": is_healthy,
"initialized": self._state.is_initialized,
"running": self._state.is_running,
"operation_count": self._state.operation_count,
"error_count": self._state.error_count,
"success_rate": success_rate,
"active_operations": self._state.active_operations,
}
def _calculate_success_rate(self) -> float:
"""Calculate operation success rate."""
if self._state.operation_count == 0:
return 1.0
success_count = self._state.operation_count - self._state.error_count
return success_count / self._state.operation_count