Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
debug_utils.py22 kB
""" Debug utilities for backtesting system troubleshooting. This module provides comprehensive debugging tools including: - Request/response logging - Performance profiling - Memory analysis - Error tracking - Debug mode utilities """ import inspect import json import time import traceback from collections.abc import Callable, Generator from contextlib import contextmanager from datetime import UTC, datetime from functools import wraps from typing import Any import psutil from maverick_mcp.utils.structured_logger import ( CorrelationIDGenerator, get_logger_manager, get_performance_logger, get_structured_logger, ) class DebugProfiler: """Comprehensive debug profiler for performance analysis.""" def __init__(self): self.logger = get_structured_logger("maverick_mcp.debug") self.performance_logger = get_performance_logger("debug_profiler") self._profiles: dict[str, dict[str, Any]] = {} def start_profile(self, profile_name: str, **context): """Start a debug profiling session.""" profile_id = f"{profile_name}_{int(time.time() * 1000)}" profile_data = { "profile_name": profile_name, "profile_id": profile_id, "start_time": time.time(), "start_memory": self._get_memory_usage(), "start_cpu": self._get_cpu_usage(), "context": context, "checkpoints": [], } self._profiles[profile_id] = profile_data self.logger.debug( f"Started debug profile: {profile_name}", extra={ "profile_id": profile_id, "start_memory_mb": profile_data["start_memory"], "start_cpu_percent": profile_data["start_cpu"], **context, }, ) return profile_id def checkpoint(self, profile_id: str, checkpoint_name: str, **data): """Add a checkpoint to an active profile.""" if profile_id not in self._profiles: self.logger.warning(f"Profile {profile_id} not found for checkpoint") return profile = self._profiles[profile_id] current_time = time.time() elapsed_ms = (current_time - profile["start_time"]) * 1000 checkpoint_data = { "name": checkpoint_name, "timestamp": current_time, "elapsed_ms": elapsed_ms, "memory_mb": self._get_memory_usage(), "cpu_percent": self._get_cpu_usage(), "data": data, } profile["checkpoints"].append(checkpoint_data) self.logger.debug( f"Profile checkpoint: {checkpoint_name} at {elapsed_ms:.2f}ms", extra={ "profile_id": profile_id, "checkpoint": checkpoint_name, "elapsed_ms": elapsed_ms, "memory_mb": checkpoint_data["memory_mb"], **data, }, ) def end_profile( self, profile_id: str, success: bool = True, **final_data ) -> dict[str, Any]: """End a debug profiling session and return comprehensive results.""" if profile_id not in self._profiles: self.logger.warning(f"Profile {profile_id} not found for ending") return {} profile = self._profiles.pop(profile_id) end_time = time.time() total_duration_ms = (end_time - profile["start_time"]) * 1000 # Calculate memory and CPU deltas end_memory = self._get_memory_usage() end_cpu = self._get_cpu_usage() memory_delta = end_memory - profile["start_memory"] results = { "profile_name": profile["profile_name"], "profile_id": profile_id, "success": success, "total_duration_ms": total_duration_ms, "start_time": profile["start_time"], "end_time": end_time, "memory_stats": { "start_mb": profile["start_memory"], "end_mb": end_memory, "delta_mb": memory_delta, "peak_usage": max(cp["memory_mb"] for cp in profile["checkpoints"]) if profile["checkpoints"] else end_memory, }, "cpu_stats": { "start_percent": profile["start_cpu"], "end_percent": end_cpu, "avg_percent": sum(cp["cpu_percent"] for cp in profile["checkpoints"]) / len(profile["checkpoints"]) if profile["checkpoints"] else end_cpu, }, "checkpoints": profile["checkpoints"], "checkpoint_count": len(profile["checkpoints"]), "context": profile["context"], "final_data": final_data, } # Log profile completion log_level = "info" if success else "error" getattr(self.logger, log_level)( f"Completed debug profile: {profile['profile_name']} in {total_duration_ms:.2f}ms", extra={ "profile_results": results, "performance_summary": { "duration_ms": total_duration_ms, "memory_delta_mb": memory_delta, "checkpoint_count": len(profile["checkpoints"]), "success": success, }, }, ) return results @staticmethod def _get_memory_usage() -> float: """Get current memory usage in MB.""" try: process = psutil.Process() return process.memory_info().rss / 1024 / 1024 except (psutil.NoSuchProcess, psutil.AccessDenied): return 0.0 @staticmethod def _get_cpu_usage() -> float: """Get current CPU usage percentage.""" try: process = psutil.Process() return process.cpu_percent(interval=None) except (psutil.NoSuchProcess, psutil.AccessDenied): return 0.0 class RequestResponseLogger: """Detailed request/response logging for debugging.""" def __init__(self, max_payload_size: int = 5000): self.logger = get_structured_logger("maverick_mcp.requests") self.max_payload_size = max_payload_size def log_request(self, operation: str, **request_data): """Log detailed request information.""" correlation_id = CorrelationIDGenerator.get_correlation_id() # Sanitize and truncate request data sanitized_data = self._sanitize_data(request_data) truncated_data = self._truncate_data(sanitized_data) self.logger.info( f"Request: {operation}", extra={ "operation": operation, "correlation_id": correlation_id, "request_data": truncated_data, "request_size": len(json.dumps(request_data, default=str)), "timestamp": datetime.now(UTC).isoformat(), }, ) def log_response( self, operation: str, success: bool, duration_ms: float, **response_data ): """Log detailed response information.""" correlation_id = CorrelationIDGenerator.get_correlation_id() # Sanitize and truncate response data sanitized_data = self._sanitize_data(response_data) truncated_data = self._truncate_data(sanitized_data) log_method = self.logger.info if success else self.logger.error log_method( f"Response: {operation} ({'success' if success else 'failure'}) in {duration_ms:.2f}ms", extra={ "operation": operation, "correlation_id": correlation_id, "success": success, "duration_ms": duration_ms, "response_data": truncated_data, "response_size": len(json.dumps(response_data, default=str)), "timestamp": datetime.now(UTC).isoformat(), }, ) def _sanitize_data(self, data: Any) -> Any: """Remove sensitive information from data.""" if isinstance(data, dict): sanitized = {} for key, value in data.items(): if any( sensitive in key.lower() for sensitive in ["password", "token", "key", "secret"] ): sanitized[key] = "***REDACTED***" else: sanitized[key] = self._sanitize_data(value) return sanitized elif isinstance(data, list | tuple): return [self._sanitize_data(item) for item in data] else: return data def _truncate_data(self, data: Any) -> Any: """Truncate data to prevent log overflow.""" data_str = json.dumps(data, default=str) if len(data_str) > self.max_payload_size: truncated = data_str[: self.max_payload_size] return f"{truncated}... (truncated, original size: {len(data_str)})" return data class ErrorTracker: """Comprehensive error tracking and analysis.""" def __init__(self): self.logger = get_structured_logger("maverick_mcp.errors") self._error_stats: dict[str, dict[str, Any]] = {} def track_error( self, error: Exception, operation: str, context: dict[str, Any], severity: str = "error", ): """Track error with detailed context and statistics.""" error_type = type(error).__name__ error_key = f"{operation}_{error_type}" # Update error statistics if error_key not in self._error_stats: self._error_stats[error_key] = { "first_seen": datetime.now(UTC), "last_seen": datetime.now(UTC), "count": 0, "operation": operation, "error_type": error_type, "contexts": [], } stats = self._error_stats[error_key] stats["last_seen"] = datetime.now(UTC) stats["count"] += 1 # Keep only recent contexts (last 10) stats["contexts"].append( { "timestamp": datetime.now(UTC).isoformat(), "context": context, "error_message": str(error), } ) stats["contexts"] = stats["contexts"][-10:] # Keep only last 10 # Get stack trace stack_trace = traceback.format_exception( type(error), error, error.__traceback__ ) # Log the error correlation_id = CorrelationIDGenerator.get_correlation_id() log_data = { "operation": operation, "correlation_id": correlation_id, "error_type": error_type, "error_message": str(error), "error_count": stats["count"], "first_seen": stats["first_seen"].isoformat(), "last_seen": stats["last_seen"].isoformat(), "context": context, "stack_trace": stack_trace, "severity": severity, } if severity == "critical": self.logger.critical( f"Critical error in {operation}: {error}", extra=log_data ) elif severity == "error": self.logger.error(f"Error in {operation}: {error}", extra=log_data) elif severity == "warning": self.logger.warning(f"Warning in {operation}: {error}", extra=log_data) def get_error_summary(self) -> dict[str, Any]: """Get comprehensive error statistics summary.""" if not self._error_stats: return {"message": "No errors tracked"} summary = { "total_error_types": len(self._error_stats), "total_errors": sum(stats["count"] for stats in self._error_stats.values()), "error_breakdown": {}, "most_common_errors": [], "recent_errors": [], } # Error breakdown by type for _error_key, stats in self._error_stats.items(): summary["error_breakdown"][stats["error_type"]] = ( summary["error_breakdown"].get(stats["error_type"], 0) + stats["count"] ) # Most common errors sorted_errors = sorted( self._error_stats.items(), key=lambda x: x[1]["count"], reverse=True ) summary["most_common_errors"] = [ { "operation": stats["operation"], "error_type": stats["error_type"], "count": stats["count"], "first_seen": stats["first_seen"].isoformat(), "last_seen": stats["last_seen"].isoformat(), } for _, stats in sorted_errors[:10] ] # Recent errors all_contexts = [] for stats in self._error_stats.values(): for context in stats["contexts"]: all_contexts.append( { "operation": stats["operation"], "error_type": stats["error_type"], **context, } ) summary["recent_errors"] = sorted( all_contexts, key=lambda x: x["timestamp"], reverse=True )[:20] return summary class DebugContextManager: """Context manager for debug sessions with automatic cleanup.""" def __init__( self, operation_name: str, enable_profiling: bool = True, enable_request_logging: bool = True, enable_error_tracking: bool = True, **context, ): self.operation_name = operation_name self.enable_profiling = enable_profiling self.enable_request_logging = enable_request_logging self.enable_error_tracking = enable_error_tracking self.context = context # Initialize components self.profiler = DebugProfiler() if enable_profiling else None self.request_logger = ( RequestResponseLogger() if enable_request_logging else None ) self.error_tracker = ErrorTracker() if enable_error_tracking else None self.profile_id = None self.start_time = None def __enter__(self): """Enter debug context.""" self.start_time = time.time() # Set correlation ID if not present if not CorrelationIDGenerator.get_correlation_id(): CorrelationIDGenerator.set_correlation_id() # Start profiling if self.profiler: self.profile_id = self.profiler.start_profile( self.operation_name, **self.context ) # Log request if self.request_logger: self.request_logger.log_request(self.operation_name, **self.context) return self def __exit__(self, exc_type, exc_val, exc_tb): """Exit debug context with cleanup.""" duration_ms = (time.time() - self.start_time) * 1000 if self.start_time else 0 success = exc_type is None # Track error if occurred if not success and self.error_tracker and exc_val: self.error_tracker.track_error( exc_val, self.operation_name, self.context, severity="error" ) # End profiling if self.profiler and self.profile_id: self.profiler.end_profile( self.profile_id, success=success, exception_type=exc_type.__name__ if exc_type else None, ) # Log response if self.request_logger: response_data = {"exception": str(exc_val)} if exc_val else {} self.request_logger.log_response( self.operation_name, success, duration_ms, **response_data ) def checkpoint(self, name: str, **data): """Add a checkpoint during debug session.""" if self.profiler and self.profile_id: self.profiler.checkpoint(self.profile_id, name, **data) # Decorator for automatic debug wrapping def debug_operation( operation_name: str | None = None, enable_profiling: bool = True, enable_request_logging: bool = True, enable_error_tracking: bool = True, **default_context, ): """Decorator to automatically wrap operations with debug context.""" def decorator(func: Callable) -> Callable: actual_operation_name = operation_name or func.__name__ @wraps(func) async def async_wrapper(*args, **kwargs): # Extract additional context from function signature sig = inspect.signature(func) bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() context = {**default_context} # Add non-sensitive parameters to context for param_name, param_value in bound_args.arguments.items(): if not any( sensitive in param_name.lower() for sensitive in ["password", "token", "key", "secret"] ): if ( isinstance(param_value, str | int | float | bool) or param_value is None ): context[param_name] = param_value with DebugContextManager( actual_operation_name, enable_profiling, enable_request_logging, enable_error_tracking, **context, ) as debug_ctx: result = await func(*args, **kwargs) debug_ctx.checkpoint( "function_completed", result_type=type(result).__name__ ) return result @wraps(func) def sync_wrapper(*args, **kwargs): # Similar logic for sync functions sig = inspect.signature(func) bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() context = {**default_context} for param_name, param_value in bound_args.arguments.items(): if not any( sensitive in param_name.lower() for sensitive in ["password", "token", "key", "secret"] ): if ( isinstance(param_value, str | int | float | bool) or param_value is None ): context[param_name] = param_value with DebugContextManager( actual_operation_name, enable_profiling, enable_request_logging, enable_error_tracking, **context, ) as debug_ctx: result = func(*args, **kwargs) debug_ctx.checkpoint( "function_completed", result_type=type(result).__name__ ) return result return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper return decorator @contextmanager def debug_session( session_name: str, **context ) -> Generator[DebugContextManager, None, None]: """Context manager for manual debug sessions.""" with DebugContextManager(session_name, **context) as debug_ctx: yield debug_ctx # Global debug utilities _debug_profiler = DebugProfiler() _error_tracker = ErrorTracker() def get_debug_profiler() -> DebugProfiler: """Get global debug profiler instance.""" return _debug_profiler def get_error_tracker() -> ErrorTracker: """Get global error tracker instance.""" return _error_tracker def print_debug_summary(): """Print comprehensive debug summary to console.""" print("\n" + "=" * 80) print("MAVERICK MCP DEBUG SUMMARY") print("=" * 80) # Performance metrics print("\n📊 PERFORMANCE METRICS") print("-" * 40) try: manager = get_logger_manager() dashboard_data = manager.create_dashboard_metrics() print( f"Log Level Counts: {dashboard_data.get('system_metrics', {}).get('log_level_counts', {})}" ) print( f"Active Correlation IDs: {dashboard_data.get('system_metrics', {}).get('active_correlation_ids', 0)}" ) if "memory_stats" in dashboard_data: memory_stats = dashboard_data["memory_stats"] print( f"Memory Usage: {memory_stats.get('rss_mb', 0):.1f}MB RSS, {memory_stats.get('cpu_percent', 0):.1f}% CPU" ) except Exception as e: print(f"Error getting performance metrics: {e}") # Error summary print("\n🚨 ERROR SUMMARY") print("-" * 40) try: error_summary = _error_tracker.get_error_summary() if "message" in error_summary: print(error_summary["message"]) else: print(f"Total Error Types: {error_summary['total_error_types']}") print(f"Total Errors: {error_summary['total_errors']}") if error_summary["most_common_errors"]: print("\nMost Common Errors:") for error in error_summary["most_common_errors"][:5]: print( f" {error['error_type']} in {error['operation']}: {error['count']} times" ) except Exception as e: print(f"Error getting error summary: {e}") print("\n" + "=" * 80) def enable_debug_mode(): """Enable comprehensive debug mode.""" import os os.environ["MAVERICK_DEBUG"] = "true" print("🐛 Debug mode enabled") print(" - Verbose logging activated") print(" - Request/response logging enabled") print(" - Performance profiling enabled") print(" - Error tracking enhanced") def disable_debug_mode(): """Disable debug mode.""" import os if "MAVERICK_DEBUG" in os.environ: del os.environ["MAVERICK_DEBUG"] print("🐛 Debug mode disabled")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server