Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
structured_logger.py30.2 kB
""" Enhanced structured logging infrastructure for backtesting system. This module provides comprehensive structured logging capabilities with: - Correlation ID generation and tracking across async boundaries - Request context propagation - JSON formatting for log aggregation - Performance metrics logging - Resource usage tracking - Debug mode with verbose logging - Async logging to avoid blocking operations - Log rotation and compression - Multiple output handlers (console, file, remote) """ import asyncio import gc import json import logging import logging.handlers import os import sys import threading import time import traceback import uuid from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor from contextvars import ContextVar from datetime import UTC, datetime from functools import wraps from pathlib import Path from typing import Any import psutil # Context variables for request tracking across async boundaries correlation_id_var: ContextVar[str | None] = ContextVar("correlation_id", default=None) request_start_var: ContextVar[float | None] = ContextVar("request_start", default=None) user_id_var: ContextVar[str | None] = ContextVar("user_id", default=None) tool_name_var: ContextVar[str | None] = ContextVar("tool_name", default=None) operation_context_var: ContextVar[dict[str, Any] | None] = ContextVar( "operation_context", default=None ) # Global logger registry for performance metrics aggregation _performance_logger_registry: dict[str, "PerformanceMetricsLogger"] = {} _log_level_counts: dict[str, int] = { "DEBUG": 0, "INFO": 0, "WARNING": 0, "ERROR": 0, "CRITICAL": 0, } # Thread pool for async logging operations _async_log_executor: ThreadPoolExecutor | None = None _async_log_lock = threading.Lock() class CorrelationIDGenerator: """Enhanced correlation ID generation with backtesting context.""" @staticmethod def generate_correlation_id(prefix: str = "bt") -> str: """Generate a unique correlation ID with backtesting prefix.""" timestamp = int(time.time() * 1000) % 1000000 # Last 6 digits of timestamp random_part = uuid.uuid4().hex[:8] return f"{prefix}-{timestamp}-{random_part}" @staticmethod def set_correlation_id( correlation_id: str | None = None, prefix: str = "bt" ) -> str: """Set correlation ID in context with automatic generation.""" if not correlation_id: correlation_id = CorrelationIDGenerator.generate_correlation_id(prefix) correlation_id_var.set(correlation_id) return correlation_id @staticmethod def get_correlation_id() -> str | None: """Get current correlation ID from context.""" return correlation_id_var.get() @staticmethod def propagate_context(target_context: dict[str, Any]) -> dict[str, Any]: """Propagate correlation context to target dict.""" target_context.update( { "correlation_id": correlation_id_var.get(), "user_id": user_id_var.get(), "tool_name": tool_name_var.get(), "operation_context": operation_context_var.get(), } ) return target_context class EnhancedStructuredFormatter(logging.Formatter): """Enhanced JSON formatter with performance metrics and resource tracking.""" def __init__( self, include_performance: bool = True, include_resources: bool = True ): super().__init__() self.include_performance = include_performance self.include_resources = include_resources self._process = psutil.Process() def format(self, record: logging.LogRecord) -> str: """Format log record with comprehensive structured data.""" # Base structured log data log_data = { "timestamp": datetime.now(UTC).isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno, "thread": record.thread, "process_id": record.process, } # Add correlation context CorrelationIDGenerator.propagate_context(log_data) # Add performance metrics if enabled if self.include_performance: request_start = request_start_var.get() if request_start: log_data["duration_ms"] = int((time.time() - request_start) * 1000) # Add resource usage if enabled if self.include_resources: try: memory_info = self._process.memory_info() log_data["memory_rss_mb"] = round(memory_info.rss / 1024 / 1024, 2) log_data["memory_vms_mb"] = round(memory_info.vms / 1024 / 1024, 2) log_data["cpu_percent"] = self._process.cpu_percent(interval=None) except (psutil.NoSuchProcess, psutil.AccessDenied): # Process might have ended or access denied pass # Add exception information if record.exc_info: log_data["exception"] = { "type": record.exc_info[0].__name__ if record.exc_info[0] else "Unknown", "message": str(record.exc_info[1]), "traceback": traceback.format_exception(*record.exc_info), } # Add extra fields from the record extra_fields = {} for key, value in record.__dict__.items(): if key not in { "name", "msg", "args", "created", "filename", "funcName", "levelname", "levelno", "lineno", "module", "msecs", "pathname", "process", "processName", "relativeCreated", "thread", "threadName", "exc_info", "exc_text", "stack_info", "getMessage", "message", }: # Sanitize sensitive data if self._is_sensitive_field(key): extra_fields[key] = "***MASKED***" else: extra_fields[key] = self._serialize_value(value) if extra_fields: log_data["extra"] = extra_fields return json.dumps(log_data, default=str, ensure_ascii=False) def _is_sensitive_field(self, field_name: str) -> bool: """Check if field contains sensitive information.""" sensitive_keywords = { "password", "token", "key", "secret", "auth", "credential", "bearer", "session", "cookie", "api_key", "access_token", "refresh_token", "private", "confidential", } return any(keyword in field_name.lower() for keyword in sensitive_keywords) def _serialize_value(self, value: Any) -> Any: """Safely serialize complex values for JSON output.""" if isinstance(value, str | int | float | bool) or value is None: return value elif isinstance(value, dict): return {k: self._serialize_value(v) for k, v in value.items()} elif isinstance(value, list | tuple): return [self._serialize_value(item) for item in value] else: return str(value) class AsyncLogHandler(logging.Handler): """Non-blocking async log handler to prevent performance impact.""" def __init__(self, target_handler: logging.Handler, max_queue_size: int = 10000): super().__init__() self.target_handler = target_handler self.max_queue_size = max_queue_size self._queue: list[logging.LogRecord] = [] self._queue_lock = threading.Lock() self._shutdown = False # Start background thread for processing logs self._worker_thread = threading.Thread(target=self._process_logs, daemon=True) self._worker_thread.start() def emit(self, record: logging.LogRecord): """Queue log record for async processing.""" if self._shutdown: return with self._queue_lock: if len(self._queue) < self.max_queue_size: self._queue.append(record) # If queue is full, drop oldest records elif self._queue: self._queue.pop(0) self._queue.append(record) def _process_logs(self): """Background thread to process queued log records.""" while not self._shutdown: records_to_process = [] with self._queue_lock: if self._queue: records_to_process = self._queue[:] self._queue.clear() for record in records_to_process: try: self.target_handler.emit(record) except Exception: # Silently ignore errors to prevent infinite recursion pass # Brief sleep to prevent busy waiting time.sleep(0.01) def close(self): """Close the handler and wait for queue to flush.""" self._shutdown = True self._worker_thread.join(timeout=5.0) self.target_handler.close() super().close() class PerformanceMetricsLogger: """Comprehensive performance metrics logging for backtesting operations.""" def __init__(self, logger_name: str = "maverick_mcp.performance"): self.logger = logging.getLogger(logger_name) self.metrics: dict[str, list[float]] = { "execution_times": [], "memory_usage": [], "cpu_usage": [], "operation_counts": [], } self._start_times: dict[str, float] = {} self._lock = threading.Lock() # Register for global aggregation _performance_logger_registry[logger_name] = self def start_operation(self, operation_id: str, operation_type: str, **context): """Start tracking a performance-critical operation.""" start_time = time.time() with self._lock: self._start_times[operation_id] = start_time # Set request context request_start_var.set(start_time) if "tool_name" in context: tool_name_var.set(context["tool_name"]) self.logger.info( f"Started {operation_type} operation", extra={ "operation_id": operation_id, "operation_type": operation_type, "start_time": start_time, **context, }, ) def end_operation(self, operation_id: str, success: bool = True, **metrics): """End tracking of a performance-critical operation.""" end_time = time.time() with self._lock: start_time = self._start_times.pop(operation_id, end_time) duration_ms = (end_time - start_time) * 1000 # Collect system metrics try: process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 cpu_percent = process.cpu_percent(interval=None) except (psutil.NoSuchProcess, psutil.AccessDenied): memory_mb = 0 cpu_percent = 0 # Update internal metrics with self._lock: self.metrics["execution_times"].append(duration_ms) self.metrics["memory_usage"].append(memory_mb) self.metrics["cpu_usage"].append(cpu_percent) self.metrics["operation_counts"].append(1) log_level = logging.INFO if success else logging.ERROR self.logger.log( log_level, f"{'Completed' if success else 'Failed'} operation in {duration_ms:.2f}ms", extra={ "operation_id": operation_id, "duration_ms": duration_ms, "memory_mb": memory_mb, "cpu_percent": cpu_percent, "success": success, **metrics, }, ) def log_business_metric(self, metric_name: str, value: int | float, **context): """Log business-specific metrics like strategies processed, success rates.""" self.logger.info( f"Business metric: {metric_name} = {value}", extra={ "metric_name": metric_name, "metric_value": value, "metric_type": "business", **context, }, ) def get_performance_summary(self) -> dict[str, Any]: """Get aggregated performance metrics summary.""" with self._lock: if not self.metrics["execution_times"]: return {"message": "No performance data available"} execution_times = self.metrics["execution_times"] memory_usage = self.metrics["memory_usage"] cpu_usage = self.metrics["cpu_usage"] return { "operations_count": len(execution_times), "execution_time_stats": { "avg_ms": sum(execution_times) / len(execution_times), "min_ms": min(execution_times), "max_ms": max(execution_times), "total_ms": sum(execution_times), }, "memory_stats": { "avg_mb": sum(memory_usage) / len(memory_usage) if memory_usage else 0, "peak_mb": max(memory_usage) if memory_usage else 0, }, "cpu_stats": { "avg_percent": sum(cpu_usage) / len(cpu_usage) if cpu_usage else 0, "peak_percent": max(cpu_usage) if cpu_usage else 0, }, "timestamp": datetime.now(UTC).isoformat(), } class DebugModeManager: """Manages debug mode configuration and verbose logging.""" def __init__(self): self._debug_enabled = os.getenv("MAVERICK_DEBUG", "false").lower() in ( "true", "1", "on", ) self._verbose_modules: set = set() self._debug_filters: dict[str, Any] = {} def is_debug_enabled(self, module_name: str = "") -> bool: """Check if debug mode is enabled globally or for specific module.""" if not self._debug_enabled: return False if not module_name: return True # Check if specific module debug is enabled return module_name in self._verbose_modules or not self._verbose_modules def enable_verbose_logging(self, module_pattern: str): """Enable verbose logging for specific module pattern.""" self._verbose_modules.add(module_pattern) def add_debug_filter(self, filter_name: str, filter_config: dict[str, Any]): """Add custom debug filter configuration.""" self._debug_filters[filter_name] = filter_config def should_log_request_response(self, operation_name: str) -> bool: """Check if request/response should be logged for operation.""" if not self._debug_enabled: return False # Check specific filters for _filter_name, config in self._debug_filters.items(): if config.get("log_request_response") and operation_name in config.get( "operations", [] ): return True return True # Default to true in debug mode class StructuredLoggerManager: """Central manager for structured logging configuration.""" def __init__(self): self.debug_manager = DebugModeManager() self.performance_loggers: dict[str, PerformanceMetricsLogger] = {} self._configured = False def setup_structured_logging( self, log_level: str = "INFO", log_format: str = "json", log_file: str | None = None, enable_async: bool = True, enable_rotation: bool = True, max_log_size: int = 10 * 1024 * 1024, # 10MB backup_count: int = 5, console_output: str = "stdout", # stdout, stderr remote_handler_config: dict[str, Any] | None = None, ): """Setup comprehensive structured logging infrastructure.""" if self._configured: return # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level.upper())) # Clear existing handlers for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) handlers = [] # Console handler console_stream = sys.stdout if console_output == "stdout" else sys.stderr console_handler = logging.StreamHandler(console_stream) if log_format == "json": console_formatter = EnhancedStructuredFormatter( include_performance=True, include_resources=True ) else: console_formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) console_handler.setFormatter(console_formatter) handlers.append(console_handler) # File handler with rotation if specified if log_file: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) if enable_rotation: file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=max_log_size, backupCount=backup_count ) else: file_handler = logging.FileHandler(log_file) file_handler.setFormatter(EnhancedStructuredFormatter()) handlers.append(file_handler) # Remote handler if configured (for log aggregation) if remote_handler_config: remote_handler = self._create_remote_handler(remote_handler_config) if remote_handler: handlers.append(remote_handler) # Wrap handlers with async processing if enabled if enable_async: handlers = [AsyncLogHandler(handler) for handler in handlers] # Add all handlers to root logger for handler in handlers: root_logger.addHandler(handler) # Set specific logger levels to reduce noise logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) # Enable debug mode loggers if configured if self.debug_manager.is_debug_enabled(): self._setup_debug_loggers() self._configured = True def _create_remote_handler(self, config: dict[str, Any]) -> logging.Handler | None: """Create remote handler for log aggregation (placeholder for future implementation).""" # This would implement remote logging to services like ELK, Splunk, etc. # For now, return None as it's not implemented return None def _setup_debug_loggers(self): """Setup additional loggers for debug mode.""" debug_logger = logging.getLogger("maverick_mcp.debug") debug_logger.setLevel(logging.DEBUG) request_logger = logging.getLogger("maverick_mcp.requests") request_logger.setLevel(logging.DEBUG) def get_performance_logger(self, logger_name: str) -> PerformanceMetricsLogger: """Get or create performance logger for specific component.""" if logger_name not in self.performance_loggers: self.performance_loggers[logger_name] = PerformanceMetricsLogger( logger_name ) return self.performance_loggers[logger_name] def get_logger(self, name: str) -> logging.Logger: """Get structured logger with correlation support.""" return logging.getLogger(name) def create_dashboard_metrics(self) -> dict[str, Any]: """Create comprehensive metrics for performance dashboard.""" global _log_level_counts dashboard_data = { "system_metrics": { "timestamp": datetime.now(UTC).isoformat(), "log_level_counts": _log_level_counts.copy(), "active_correlation_ids": len( [cid for cid in [correlation_id_var.get()] if cid] ), }, "performance_metrics": {}, "memory_stats": {}, } # Aggregate performance metrics from all loggers for logger_name, perf_logger in _performance_logger_registry.items(): dashboard_data["performance_metrics"][logger_name] = ( perf_logger.get_performance_summary() ) # System memory stats try: process = psutil.Process() memory_info = process.memory_info() dashboard_data["memory_stats"] = { "rss_mb": round(memory_info.rss / 1024 / 1024, 2), "vms_mb": round(memory_info.vms / 1024 / 1024, 2), "cpu_percent": process.cpu_percent(interval=None), "gc_stats": { "generation_0": gc.get_count()[0], "generation_1": gc.get_count()[1], "generation_2": gc.get_count()[2], }, } except (psutil.NoSuchProcess, psutil.AccessDenied): dashboard_data["memory_stats"] = {"error": "Unable to collect memory stats"} return dashboard_data # Global instance _logger_manager: StructuredLoggerManager | None = None def get_logger_manager() -> StructuredLoggerManager: """Get global logger manager instance.""" global _logger_manager if _logger_manager is None: _logger_manager = StructuredLoggerManager() return _logger_manager def with_structured_logging( operation_name: str, include_performance: bool = True, log_params: bool = True, log_result: bool = False, ): """Decorator for automatic structured logging of operations.""" def decorator(func: Callable) -> Callable: @wraps(func) async def async_wrapper(*args, **kwargs): # Generate correlation ID if not present correlation_id = CorrelationIDGenerator.get_correlation_id() if not correlation_id: correlation_id = CorrelationIDGenerator.set_correlation_id() # Setup operation context operation_id = f"{operation_name}_{int(time.time() * 1000)}" tool_name_var.set(operation_name) logger = get_logger_manager().get_logger(f"maverick_mcp.{operation_name}") perf_logger = None if include_performance: perf_logger = get_logger_manager().get_performance_logger( f"performance.{operation_name}" ) perf_logger.start_operation( operation_id=operation_id, operation_type=operation_name, tool_name=operation_name, ) # Log operation start extra_data = { "operation_id": operation_id, "correlation_id": correlation_id, } if log_params: # Sanitize parameters safe_kwargs = { k: "***MASKED***" if "password" in k.lower() or "token" in k.lower() else v for k, v in kwargs.items() } extra_data["parameters"] = safe_kwargs logger.info(f"Starting {operation_name}", extra=extra_data) try: # Execute the function result = await func(*args, **kwargs) # Log success success_data = {"operation_id": operation_id, "success": True} if log_result and result is not None: # Limit result size for logging result_str = str(result) success_data["result"] = ( result_str[:1000] + "..." if len(result_str) > 1000 else result_str ) logger.info(f"Completed {operation_name}", extra=success_data) if perf_logger: perf_logger.end_operation(operation_id, success=True) return result except Exception as e: # Log error logger.error( f"Failed {operation_name}: {str(e)}", exc_info=True, extra={ "operation_id": operation_id, "error_type": type(e).__name__, "success": False, }, ) if perf_logger: perf_logger.end_operation(operation_id, success=False, error=str(e)) raise @wraps(func) def sync_wrapper(*args, **kwargs): # Similar logic for sync functions correlation_id = CorrelationIDGenerator.get_correlation_id() if not correlation_id: correlation_id = CorrelationIDGenerator.set_correlation_id() operation_id = f"{operation_name}_{int(time.time() * 1000)}" tool_name_var.set(operation_name) logger = get_logger_manager().get_logger(f"maverick_mcp.{operation_name}") perf_logger = None if include_performance: perf_logger = get_logger_manager().get_performance_logger( f"performance.{operation_name}" ) perf_logger.start_operation( operation_id=operation_id, operation_type=operation_name, tool_name=operation_name, ) extra_data = { "operation_id": operation_id, "correlation_id": correlation_id, } if log_params: safe_kwargs = { k: "***MASKED***" if any( sensitive in k.lower() for sensitive in ["password", "token", "key", "secret"] ) else v for k, v in kwargs.items() } extra_data["parameters"] = safe_kwargs logger.info(f"Starting {operation_name}", extra=extra_data) try: result = func(*args, **kwargs) success_data = {"operation_id": operation_id, "success": True} if log_result and result is not None: result_str = str(result) success_data["result"] = ( result_str[:1000] + "..." if len(result_str) > 1000 else result_str ) logger.info(f"Completed {operation_name}", extra=success_data) if perf_logger: perf_logger.end_operation(operation_id, success=True) return result except Exception as e: logger.error( f"Failed {operation_name}: {str(e)}", exc_info=True, extra={ "operation_id": operation_id, "error_type": type(e).__name__, "success": False, }, ) if perf_logger: perf_logger.end_operation(operation_id, success=False, error=str(e)) raise return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator # Convenience functions def get_structured_logger(name: str) -> logging.Logger: """Get structured logger instance.""" return get_logger_manager().get_logger(name) def get_performance_logger(component: str) -> PerformanceMetricsLogger: """Get performance logger for specific component.""" return get_logger_manager().get_performance_logger(component) def setup_backtesting_logging( log_level: str = "INFO", enable_debug: bool = False, log_file: str | None = None ): """Setup logging specifically configured for backtesting operations.""" # Set debug environment if requested if enable_debug: os.environ["MAVERICK_DEBUG"] = "true" # Setup structured logging manager = get_logger_manager() manager.setup_structured_logging( log_level=log_level, log_format="json", log_file=log_file or "logs/backtesting.log", enable_async=True, enable_rotation=True, console_output="stderr", # Use stderr for MCP compatibility ) # Configure debug filters for backtesting if enable_debug: manager.debug_manager.add_debug_filter( "backtesting", { "log_request_response": True, "operations": [ "run_backtest", "optimize_parameters", "get_historical_data", ], }, ) # Update log level counts (for dashboard metrics) class LogLevelCounterFilter(logging.Filter): """Filter to count log levels for dashboard metrics.""" def filter(self, record: logging.LogRecord) -> bool: global _log_level_counts _log_level_counts[record.levelname] = ( _log_level_counts.get(record.levelname, 0) + 1 ) return True # Add the counter filter to root logger logging.getLogger().addFilter(LogLevelCounterFilter())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server