Skip to main content
Glama

MCP Git Server

by MementoRC
debug_context.py21.7 kB
""" Debug context management for maintaining debugging state across operations. This module provides context managers and utilities for tracking debugging information across complex operations and maintaining context for LLM analysis. """ import threading import uuid from collections.abc import Iterator from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional from ..protocols.debugging_protocol import DebuggableComponent @dataclass class DebugOperation: """Information about a debugging operation.""" operation_id: str operation_name: str start_time: datetime end_time: datetime | None = None status: str = "running" # running, completed, failed, cancelled metadata: dict[str, Any] = field(default_factory=dict) errors: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) sub_operations: list["DebugOperation"] = field(default_factory=list) @property def duration_seconds(self) -> float | None: """Calculate operation duration in seconds.""" if self.end_time and self.start_time: return (self.end_time - self.start_time).total_seconds() return None @property def is_completed(self) -> bool: """Check if operation is completed.""" return self.status in ("completed", "failed", "cancelled") def add_error(self, error_message: str) -> None: """Add an error to this operation.""" self.errors.append(error_message) if self.status == "running": self.status = "failed" def add_warning(self, warning_message: str) -> None: """Add a warning to this operation.""" self.warnings.append(warning_message) def complete(self, status: str = "completed") -> None: """Mark operation as completed.""" self.end_time = datetime.now() self.status = status def to_dict(self) -> dict[str, Any]: """Convert to dictionary for serialization.""" return { "operation_id": self.operation_id, "operation_name": self.operation_name, "start_time": self.start_time.isoformat(), "end_time": self.end_time.isoformat() if self.end_time else None, "duration_seconds": self.duration_seconds, "status": self.status, "metadata": self.metadata, "errors": self.errors, "warnings": self.warnings, "sub_operations": [sub_op.to_dict() for sub_op in self.sub_operations], } class DebugContext: """ Context manager for debugging operations with hierarchical operation tracking. This class maintains debugging context across complex operations, tracking metadata, errors, warnings, and nested operations for comprehensive debugging. """ # Thread-local storage for current debug context _local_storage = threading.local() # Global registry of all debug contexts _global_contexts: dict[str, "DebugContext"] = {} _global_lock = threading.RLock() def __init__(self, context_name: str, context_id: str | None = None): """ Initialize debug context. Args: context_name: Human-readable name for the context context_id: Optional unique identifier (auto-generated if not provided) """ self.context_id = context_id or str(uuid.uuid4()) self.context_name = context_name self.creation_time = datetime.now() self.metadata: dict[str, Any] = {} self.operations: list[DebugOperation] = [] self.registered_components: dict[str, DebuggableComponent] = {} self.context_errors: list[str] = [] self.context_warnings: list[str] = [] self._lock = threading.RLock() self._current_operation: DebugOperation | None = None # Register globally with DebugContext._global_lock: DebugContext._global_contexts[self.context_id] = self def __enter__(self) -> "DebugContext": """Enter the debug context.""" # Set as current context in thread-local storage if not hasattr(DebugContext._local_storage, "context_stack"): DebugContext._local_storage.context_stack = [] DebugContext._local_storage.context_stack.append(self) return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Exit the debug context.""" # Remove from thread-local stack if hasattr(DebugContext._local_storage, "context_stack"): try: DebugContext._local_storage.context_stack.remove(self) except ValueError: pass # Complete any running operation if self._current_operation and not self._current_operation.is_completed: if exc_type: self._current_operation.add_error( f"Context exited with exception: {exc_type.__name__}: {exc_val}" ) self._current_operation.complete("failed") else: self._current_operation.complete("completed") @classmethod def get_current_context(cls) -> Optional["DebugContext"]: """Get the current debug context for this thread.""" if ( hasattr(cls._local_storage, "context_stack") and cls._local_storage.context_stack ): return cls._local_storage.context_stack[-1] return None @classmethod def get_context_by_id(cls, context_id: str) -> Optional["DebugContext"]: """Get a debug context by its ID.""" with cls._global_lock: return cls._global_contexts.get(context_id) @classmethod def list_all_contexts(cls) -> list[str]: """List all registered debug context IDs.""" with cls._global_lock: return list(cls._global_contexts.keys()) def add_metadata(self, key: str, value: Any) -> None: """Add metadata to the debug context.""" with self._lock: self.metadata[key] = value def add_error(self, error_message: str) -> None: """Add a context-level error.""" with self._lock: self.context_errors.append(error_message) if self._current_operation: self._current_operation.add_error(error_message) def add_warning(self, warning_message: str) -> None: """Add a context-level warning.""" with self._lock: self.context_warnings.append(warning_message) if self._current_operation: self._current_operation.add_warning(warning_message) def register_component( self, component_id: str, component: DebuggableComponent ) -> None: """Register a component with this debug context.""" with self._lock: self.registered_components[component_id] = component def start_operation( self, operation_name: str, operation_id: str | None = None ) -> DebugOperation: """ Start a new debug operation. Args: operation_name: Name of the operation operation_id: Optional unique identifier Returns: DebugOperation instance """ with self._lock: operation = DebugOperation( operation_id=operation_id or str(uuid.uuid4()), operation_name=operation_name, start_time=datetime.now(), ) # If there's a current operation, make this a sub-operation if self._current_operation and not self._current_operation.is_completed: self._current_operation.sub_operations.append(operation) else: self.operations.append(operation) self._current_operation = operation return operation def complete_operation(self, status: str = "completed") -> None: """Complete the current operation.""" with self._lock: if self._current_operation: self._current_operation.complete(status) # Find parent operation if this was a sub-operation parent_op = self._find_parent_operation(self._current_operation) self._current_operation = parent_op def _find_parent_operation( self, operation: DebugOperation ) -> DebugOperation | None: """Find the parent operation of a given operation.""" for op in self.operations: if operation in op.sub_operations: return op # Check nested sub-operations parent = self._find_parent_in_subops(op, operation) if parent: return parent return None def _find_parent_in_subops( self, parent: DebugOperation, target: DebugOperation ) -> DebugOperation | None: """Recursively find parent operation in sub-operations.""" for sub_op in parent.sub_operations: if target in sub_op.sub_operations: return sub_op result = self._find_parent_in_subops(sub_op, target) if result: return result return None @contextmanager def operation(self, operation_name: str) -> Iterator[DebugOperation]: """ Context manager for a debug operation. Args: operation_name: Name of the operation Yields: DebugOperation instance """ op = self.start_operation(operation_name) try: yield op self.complete_operation("completed") except Exception as e: op.add_error(f"Operation failed: {str(e)}") self.complete_operation("failed") raise def capture_component_states(self) -> dict[str, dict[str, Any]]: """Capture current state of all registered components.""" with self._lock: states = {} for comp_id, component in self.registered_components.items(): try: state = component.get_component_state() validation = component.validate_component() debug_info = component.get_debug_info() states[comp_id] = { "component_type": state.component_type, "state_data": state.state_data, "last_updated": state.last_updated.isoformat(), "validation": { "is_valid": validation.is_valid, "errors": validation.validation_errors, "warnings": validation.validation_warnings, }, "performance_metrics": debug_info.performance_metrics, } except Exception as e: states[comp_id] = { "error": f"Failed to capture state: {str(e)}", "error_type": type(e).__name__, } return states def generate_context_report(self) -> str: """Generate a comprehensive report of the debug context.""" with self._lock: report_lines = [] report_lines.append(f"# Debug Context Report: {self.context_name}") report_lines.append(f"**Context ID**: {self.context_id}") report_lines.append(f"**Created**: {self.creation_time.isoformat()}") report_lines.append("") # Metadata if self.metadata: report_lines.append("## Context Metadata") for key, value in self.metadata.items(): report_lines.append(f"- **{key}**: {value}") report_lines.append("") # Errors and warnings if self.context_errors: report_lines.append("## Context Errors") for error in self.context_errors: report_lines.append(f"- ❌ {error}") report_lines.append("") if self.context_warnings: report_lines.append("## Context Warnings") for warning in self.context_warnings: report_lines.append(f"- ⚠️ {warning}") report_lines.append("") # Operations report_lines.append("## Operations") if self.operations: for op in self.operations: report_lines.extend(self._format_operation_report(op, indent=0)) else: report_lines.append("No operations recorded.") report_lines.append("") # Registered components if self.registered_components: report_lines.append("## Registered Components") component_states = self.capture_component_states() for comp_id, state in component_states.items(): report_lines.append(f"### {comp_id}") if "error" in state: report_lines.append(f"❌ **Error**: {state['error']}") else: report_lines.append( f"**Type**: {state.get('component_type', 'Unknown')}" ) report_lines.append( f"**Last Updated**: {state.get('last_updated', 'Unknown')}" ) validation = state.get("validation", {}) status = ( "✅ Valid" if validation.get("is_valid", False) else "❌ Invalid" ) report_lines.append(f"**Validation**: {status}") if validation.get("errors"): report_lines.append("**Errors**:") for error in validation["errors"]: report_lines.append(f" - {error}") metrics = state.get("performance_metrics", {}) if metrics: report_lines.append("**Performance**:") for metric, value in metrics.items(): report_lines.append(f" - {metric}: {value}") report_lines.append("") return "\n".join(report_lines) def _format_operation_report( self, operation: DebugOperation, indent: int = 0 ) -> list[str]: """Format an operation for the report.""" lines = [] prefix = " " * indent # Operation header status_emoji = { "completed": "✅", "failed": "❌", "cancelled": "⏹️", "running": "🔄", }.get(operation.status, "❓") lines.append( f"{prefix}{status_emoji} **{operation.operation_name}** ({operation.operation_id})" ) lines.append(f"{prefix} - **Started**: {operation.start_time.isoformat()}") if operation.end_time: lines.append(f"{prefix} - **Completed**: {operation.end_time.isoformat()}") lines.append(f"{prefix} - **Duration**: {operation.duration_seconds:.3f}s") lines.append(f"{prefix} - **Status**: {operation.status}") # Metadata if operation.metadata: lines.append(f"{prefix} - **Metadata**:") for key, value in operation.metadata.items(): lines.append(f"{prefix} - {key}: {value}") # Errors and warnings if operation.errors: lines.append(f"{prefix} - **Errors**:") for error in operation.errors: lines.append(f"{prefix} - {error}") if operation.warnings: lines.append(f"{prefix} - **Warnings**:") for warning in operation.warnings: lines.append(f"{prefix} - {warning}") # Sub-operations if operation.sub_operations: lines.append(f"{prefix} - **Sub-operations**:") for sub_op in operation.sub_operations: lines.extend(self._format_operation_report(sub_op, indent + 2)) return lines def to_dict(self) -> dict[str, Any]: """Convert context to dictionary for serialization.""" with self._lock: return { "context_id": self.context_id, "context_name": self.context_name, "creation_time": self.creation_time.isoformat(), "metadata": self.metadata, "operations": [op.to_dict() for op in self.operations], "registered_components": list(self.registered_components.keys()), "context_errors": self.context_errors, "context_warnings": self.context_warnings, "component_states": self.capture_component_states(), } @contextmanager def debug_operation( operation_name: str, context: DebugContext | None = None ) -> Iterator[DebugOperation]: """ Standalone context manager for debug operations. Args: operation_name: Name of the operation context: Optional debug context (uses current context if not provided) Yields: DebugOperation instance """ ctx = context or DebugContext.get_current_context() if ctx: with ctx.operation(operation_name) as op: yield op else: # Create a minimal operation if no context available op = DebugOperation( operation_id=str(uuid.uuid4()), operation_name=operation_name, start_time=datetime.now(), ) try: yield op op.complete("completed") except Exception as e: op.add_error(f"Operation failed: {str(e)}") op.complete("failed") raise class GlobalDebugContextManager: """ Global manager for debug contexts with cleanup and monitoring capabilities. """ @staticmethod def cleanup_old_contexts(max_age_hours: int = 24) -> int: """ Clean up old debug contexts. Args: max_age_hours: Maximum age in hours before contexts are cleaned up Returns: Number of contexts cleaned up """ cutoff_time = datetime.now().timestamp() - (max_age_hours * 3600) cleaned_up = 0 with DebugContext._global_lock: contexts_to_remove = [] for context_id, context in DebugContext._global_contexts.items(): if context.creation_time.timestamp() < cutoff_time: contexts_to_remove.append(context_id) for context_id in contexts_to_remove: del DebugContext._global_contexts[context_id] cleaned_up += 1 return cleaned_up @staticmethod def get_global_statistics() -> dict[str, Any]: """Get global statistics about debug contexts.""" with DebugContext._global_lock: contexts = list(DebugContext._global_contexts.values()) total_contexts = len(contexts) total_operations = sum(len(ctx.operations) for ctx in contexts) total_errors = sum(len(ctx.context_errors) for ctx in contexts) total_warnings = sum(len(ctx.context_warnings) for ctx in contexts) # Calculate average context age if contexts: now = datetime.now() total_age = sum( (now - ctx.creation_time).total_seconds() for ctx in contexts ) avg_age_seconds = total_age / len(contexts) else: avg_age_seconds = 0 return { "total_contexts": total_contexts, "total_operations": total_operations, "total_errors": total_errors, "total_warnings": total_warnings, "average_context_age_seconds": avg_age_seconds, } @staticmethod def generate_global_report() -> str: """Generate a global report of all debug contexts.""" stats = GlobalDebugContextManager.get_global_statistics() report_lines = [ "# Global Debug Context Report", f"Generated: {datetime.now().isoformat()}", "", "## Statistics", f"- Total active contexts: {stats['total_contexts']}", f"- Total operations: {stats['total_operations']}", f"- Total errors: {stats['total_errors']}", f"- Total warnings: {stats['total_warnings']}", f"- Average context age: {stats['average_context_age_seconds']:.1f} seconds", "", ] with DebugContext._global_lock: contexts = list(DebugContext._global_contexts.values()) if contexts: report_lines.append("## Active Contexts") for ctx in sorted(contexts, key=lambda x: x.creation_time, reverse=True): report_lines.append(f"### {ctx.context_name} ({ctx.context_id})") report_lines.append(f"- Created: {ctx.creation_time.isoformat()}") report_lines.append(f"- Operations: {len(ctx.operations)}") report_lines.append(f"- Errors: {len(ctx.context_errors)}") report_lines.append(f"- Warnings: {len(ctx.context_warnings)}") report_lines.append(f"- Components: {len(ctx.registered_components)}") report_lines.append("") return "\n".join(report_lines)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server