Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
metrics.py29.9 kB
""" Comprehensive Prometheus metrics for MaverickMCP backtesting system. This module provides specialized metrics for monitoring: - Backtesting execution performance and reliability - Strategy performance over time - API rate limiting and failure tracking - Resource usage and optimization - Anomaly detection and alerting """ import threading import time from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime from typing import Any from prometheus_client import ( CollectorRegistry, Counter, Gauge, Histogram, Summary, generate_latest, ) from maverick_mcp.utils.logging import get_logger logger = get_logger(__name__) # Custom registry for backtesting metrics to avoid conflicts BACKTESTING_REGISTRY = CollectorRegistry() # ============================================================================= # BACKTESTING EXECUTION METRICS # ============================================================================= # Backtest execution counters backtest_executions_total = Counter( "maverick_backtest_executions_total", "Total number of backtesting executions", ["strategy_name", "status", "symbol", "timeframe"], registry=BACKTESTING_REGISTRY, ) backtest_execution_duration = Histogram( "maverick_backtest_execution_duration_seconds", "Duration of backtesting executions in seconds", ["strategy_name", "symbol", "timeframe", "data_size"], buckets=( 0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, float("inf"), ), registry=BACKTESTING_REGISTRY, ) backtest_data_points_processed = Counter( "maverick_backtest_data_points_processed_total", "Total number of data points processed during backtesting", ["strategy_name", "symbol", "timeframe"], registry=BACKTESTING_REGISTRY, ) backtest_memory_usage = Histogram( "maverick_backtest_memory_usage_mb", "Memory usage during backtesting in MB", ["strategy_name", "symbol", "complexity"], buckets=(10, 25, 50, 100, 250, 500, 1000, 2500, 5000, float("inf")), registry=BACKTESTING_REGISTRY, ) # ============================================================================= # STRATEGY PERFORMANCE METRICS # ============================================================================= # Strategy returns and performance strategy_returns = Histogram( "maverick_strategy_returns_percent", "Strategy returns in percentage", ["strategy_name", "symbol", "period"], buckets=(-50, -25, -10, -5, -1, 0, 1, 5, 10, 25, 50, 100, float("inf")), registry=BACKTESTING_REGISTRY, ) strategy_sharpe_ratio = Histogram( "maverick_strategy_sharpe_ratio", "Strategy Sharpe ratio", ["strategy_name", "symbol", "period"], buckets=(-2, -1, -0.5, 0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, float("inf")), registry=BACKTESTING_REGISTRY, ) strategy_max_drawdown = Histogram( "maverick_strategy_max_drawdown_percent", "Maximum drawdown percentage for strategy", ["strategy_name", "symbol", "period"], buckets=(0, 5, 10, 15, 20, 30, 40, 50, 75, 100, float("inf")), registry=BACKTESTING_REGISTRY, ) strategy_win_rate = Histogram( "maverick_strategy_win_rate_percent", "Win rate percentage for strategy", ["strategy_name", "symbol", "period"], buckets=(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100), registry=BACKTESTING_REGISTRY, ) strategy_trades_total = Counter( "maverick_strategy_trades_total", "Total number of trades executed by strategy", ["strategy_name", "symbol", "trade_type", "outcome"], registry=BACKTESTING_REGISTRY, ) # Strategy execution latency strategy_execution_latency = Summary( "maverick_strategy_execution_latency_seconds", "Strategy execution latency for signal generation", ["strategy_name", "complexity"], registry=BACKTESTING_REGISTRY, ) # ============================================================================= # API RATE LIMITING AND FAILURE METRICS # ============================================================================= # API call tracking api_calls_total = Counter( "maverick_api_calls_total", "Total API calls made to external providers", ["provider", "endpoint", "method", "status_code"], registry=BACKTESTING_REGISTRY, ) api_call_duration = Histogram( "maverick_api_call_duration_seconds", "API call duration in seconds", ["provider", "endpoint"], buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, float("inf")), registry=BACKTESTING_REGISTRY, ) # Rate limiting metrics rate_limit_hits = Counter( "maverick_rate_limit_hits_total", "Total rate limit hits by provider", ["provider", "endpoint", "limit_type"], registry=BACKTESTING_REGISTRY, ) rate_limit_remaining = Gauge( "maverick_rate_limit_remaining", "Remaining API calls before hitting rate limit", ["provider", "endpoint", "window"], registry=BACKTESTING_REGISTRY, ) rate_limit_reset_time = Gauge( "maverick_rate_limit_reset_timestamp", "Timestamp when rate limit resets", ["provider", "endpoint"], registry=BACKTESTING_REGISTRY, ) # API failures and errors api_failures_total = Counter( "maverick_api_failures_total", "Total API failures by error type", ["provider", "endpoint", "error_type", "error_code"], registry=BACKTESTING_REGISTRY, ) api_retry_attempts = Counter( "maverick_api_retry_attempts_total", "Total API retry attempts", ["provider", "endpoint", "retry_number"], registry=BACKTESTING_REGISTRY, ) # Circuit breaker metrics circuit_breaker_state = Gauge( "maverick_circuit_breaker_state", "Circuit breaker state (0=closed, 1=open, 2=half-open)", ["provider", "endpoint"], registry=BACKTESTING_REGISTRY, ) circuit_breaker_failures = Counter( "maverick_circuit_breaker_failures_total", "Circuit breaker failure count", ["provider", "endpoint"], registry=BACKTESTING_REGISTRY, ) # ============================================================================= # RESOURCE USAGE AND PERFORMANCE METRICS # ============================================================================= # VectorBT specific metrics vectorbt_memory_usage = Gauge( "maverick_vectorbt_memory_usage_mb", "VectorBT memory usage in MB", ["operation_type"], registry=BACKTESTING_REGISTRY, ) vectorbt_computation_time = Histogram( "maverick_vectorbt_computation_time_seconds", "VectorBT computation time in seconds", ["operation_type", "data_size"], buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, float("inf")), registry=BACKTESTING_REGISTRY, ) # Database query performance database_query_duration = Histogram( "maverick_database_query_duration_seconds", "Database query execution time", ["query_type", "table_name", "operation"], buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, float("inf")), registry=BACKTESTING_REGISTRY, ) database_connection_pool_usage = Gauge( "maverick_database_connection_pool_usage", "Database connection pool usage", ["pool_type", "status"], registry=BACKTESTING_REGISTRY, ) # Cache performance metrics cache_operations_total = Counter( "maverick_cache_operations_total", "Total cache operations", ["cache_type", "operation", "status"], registry=BACKTESTING_REGISTRY, ) cache_hit_ratio = Histogram( "maverick_cache_hit_ratio", "Cache hit ratio percentage", ["cache_type", "key_pattern"], buckets=(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 99, 100), registry=BACKTESTING_REGISTRY, ) # ============================================================================= # ANOMALY DETECTION METRICS # ============================================================================= # Performance anomaly detection strategy_performance_anomalies = Counter( "maverick_strategy_performance_anomalies_total", "Detected strategy performance anomalies", ["strategy_name", "anomaly_type", "severity"], registry=BACKTESTING_REGISTRY, ) data_quality_issues = Counter( "maverick_data_quality_issues_total", "Data quality issues detected", ["data_source", "issue_type", "symbol"], registry=BACKTESTING_REGISTRY, ) resource_usage_alerts = Counter( "maverick_resource_usage_alerts_total", "Resource usage threshold alerts", ["resource_type", "threshold_type"], registry=BACKTESTING_REGISTRY, ) # Threshold monitoring gauges performance_thresholds = Gauge( "maverick_performance_thresholds", "Performance monitoring thresholds", ["metric_name", "threshold_type"], # threshold_type: warning, critical registry=BACKTESTING_REGISTRY, ) # ============================================================================= # BUSINESS METRICS FOR TRADING # ============================================================================= # Portfolio performance metrics portfolio_value = Gauge( "maverick_portfolio_value_usd", "Current portfolio value in USD", ["portfolio_id", "currency"], registry=BACKTESTING_REGISTRY, ) portfolio_daily_pnl = Histogram( "maverick_portfolio_daily_pnl_usd", "Daily PnL in USD", ["portfolio_id", "strategy"], buckets=( -10000, -5000, -1000, -500, -100, 0, 100, 500, 1000, 5000, 10000, float("inf"), ), registry=BACKTESTING_REGISTRY, ) active_positions = Gauge( "maverick_active_positions_count", "Number of active positions", ["portfolio_id", "symbol", "position_type"], registry=BACKTESTING_REGISTRY, ) # ============================================================================= # METRICS COLLECTOR CLASS # ============================================================================= @dataclass class PerformanceThreshold: """Configuration for performance thresholds.""" metric_name: str warning_value: float critical_value: float comparison_type: str = "greater_than" # greater_than, less_than, equal_to class BacktestingMetricsCollector: """ Comprehensive metrics collector for backtesting operations. Provides high-level interfaces for tracking backtesting performance, strategy metrics, API usage, and anomaly detection. """ def __init__(self): self.logger = get_logger(f"{__name__}.BacktestingMetricsCollector") self._anomaly_thresholds = self._initialize_default_thresholds() self._lock = threading.Lock() # Initialize performance thresholds in Prometheus self._setup_performance_thresholds() self.logger.info("Backtesting metrics collector initialized") def _initialize_default_thresholds(self) -> dict[str, PerformanceThreshold]: """Initialize default performance thresholds for anomaly detection.""" return { "sharpe_ratio_low": PerformanceThreshold( "sharpe_ratio", 0.5, 0.0, "less_than" ), "max_drawdown_high": PerformanceThreshold( "max_drawdown", 20.0, 30.0, "greater_than" ), "win_rate_low": PerformanceThreshold("win_rate", 40.0, 30.0, "less_than"), "execution_time_high": PerformanceThreshold( "execution_time", 60.0, 120.0, "greater_than" ), "api_failure_rate_high": PerformanceThreshold( "api_failure_rate", 5.0, 10.0, "greater_than" ), "memory_usage_high": PerformanceThreshold( "memory_usage", 1000, 2000, "greater_than" ), } def _setup_performance_thresholds(self): """Setup performance threshold gauges.""" for _threshold_name, threshold in self._anomaly_thresholds.items(): performance_thresholds.labels( metric_name=threshold.metric_name, threshold_type="warning" ).set(threshold.warning_value) performance_thresholds.labels( metric_name=threshold.metric_name, threshold_type="critical" ).set(threshold.critical_value) @contextmanager def track_backtest_execution( self, strategy_name: str, symbol: str, timeframe: str, data_points: int = 0 ): """ Context manager for tracking backtest execution metrics. Args: strategy_name: Name of the trading strategy symbol: Trading symbol (e.g., 'AAPL') timeframe: Data timeframe (e.g., '1D', '1H') data_points: Number of data points being processed """ start_time = time.time() start_memory = self._get_memory_usage() # Determine data size category data_size = self._categorize_data_size(data_points) try: yield # Success metrics duration = time.time() - start_time memory_used = self._get_memory_usage() - start_memory backtest_executions_total.labels( strategy_name=strategy_name, status="success", symbol=symbol, timeframe=timeframe, ).inc() backtest_execution_duration.labels( strategy_name=strategy_name, symbol=symbol, timeframe=timeframe, data_size=data_size, ).observe(duration) if data_points > 0: backtest_data_points_processed.labels( strategy_name=strategy_name, symbol=symbol, timeframe=timeframe ).inc(data_points) if memory_used > 0: complexity = self._categorize_complexity(data_points, duration) backtest_memory_usage.labels( strategy_name=strategy_name, symbol=symbol, complexity=complexity ).observe(memory_used) # Check for performance anomalies self._check_execution_anomalies(strategy_name, duration, memory_used) except Exception as e: # Error metrics backtest_executions_total.labels( strategy_name=strategy_name, status="failure", symbol=symbol, timeframe=timeframe, ).inc() self.logger.error(f"Backtest execution failed for {strategy_name}: {e}") raise def track_strategy_performance( self, strategy_name: str, symbol: str, period: str, returns: float, sharpe_ratio: float, max_drawdown: float, win_rate: float, total_trades: int, winning_trades: int, ): """ Track comprehensive strategy performance metrics. Args: strategy_name: Name of the trading strategy symbol: Trading symbol period: Performance period (e.g., '1Y', '6M', '3M') returns: Total returns in percentage sharpe_ratio: Sharpe ratio max_drawdown: Maximum drawdown percentage win_rate: Win rate percentage total_trades: Total number of trades winning_trades: Number of winning trades """ # Record performance metrics strategy_returns.labels( strategy_name=strategy_name, symbol=symbol, period=period ).observe(returns) strategy_sharpe_ratio.labels( strategy_name=strategy_name, symbol=symbol, period=period ).observe(sharpe_ratio) strategy_max_drawdown.labels( strategy_name=strategy_name, symbol=symbol, period=period ).observe(max_drawdown) strategy_win_rate.labels( strategy_name=strategy_name, symbol=symbol, period=period ).observe(win_rate) # Record trade counts strategy_trades_total.labels( strategy_name=strategy_name, symbol=symbol, trade_type="total", outcome="all", ).inc(total_trades) strategy_trades_total.labels( strategy_name=strategy_name, symbol=symbol, trade_type="winning", outcome="profit", ).inc(winning_trades) losing_trades = total_trades - winning_trades strategy_trades_total.labels( strategy_name=strategy_name, symbol=symbol, trade_type="losing", outcome="loss", ).inc(losing_trades) # Check for performance anomalies self._check_strategy_anomalies( strategy_name, sharpe_ratio, max_drawdown, win_rate ) def track_api_call( self, provider: str, endpoint: str, method: str, status_code: int, duration: float, error_type: str | None = None, remaining_calls: int | None = None, reset_time: datetime | None = None, ): """ Track API call metrics including rate limiting and failures. Args: provider: API provider name (e.g., 'tiingo', 'yahoo') endpoint: API endpoint method: HTTP method status_code: Response status code duration: Request duration in seconds error_type: Type of error if request failed remaining_calls: Remaining API calls before rate limit reset_time: When rate limit resets """ # Basic API call tracking api_calls_total.labels( provider=provider, endpoint=endpoint, method=method, status_code=str(status_code), ).inc() api_call_duration.labels(provider=provider, endpoint=endpoint).observe(duration) # Rate limiting metrics if remaining_calls is not None: rate_limit_remaining.labels( provider=provider, endpoint=endpoint, window="current" ).set(remaining_calls) if reset_time is not None: rate_limit_reset_time.labels(provider=provider, endpoint=endpoint).set( reset_time.timestamp() ) # Failure tracking if status_code >= 400: error_code = self._categorize_error_code(status_code) api_failures_total.labels( provider=provider, endpoint=endpoint, error_type=error_type or "unknown", error_code=error_code, ).inc() # Check for rate limiting if status_code == 429: rate_limit_hits.labels( provider=provider, endpoint=endpoint, limit_type="requests_per_hour" ).inc() # Check for API anomalies self._check_api_anomalies(provider, endpoint, status_code, duration) def track_circuit_breaker( self, provider: str, endpoint: str, state: str, failure_count: int ): """Track circuit breaker state and failures.""" state_mapping = {"closed": 0, "open": 1, "half-open": 2} circuit_breaker_state.labels(provider=provider, endpoint=endpoint).set( state_mapping.get(state, 0) ) if failure_count > 0: circuit_breaker_failures.labels(provider=provider, endpoint=endpoint).inc( failure_count ) def track_resource_usage( self, operation_type: str, memory_mb: float, computation_time: float, data_size: str = "unknown", ): """Track resource usage for VectorBT operations.""" vectorbt_memory_usage.labels(operation_type=operation_type).set(memory_mb) vectorbt_computation_time.labels( operation_type=operation_type, data_size=data_size ).observe(computation_time) # Check for resource usage anomalies if memory_mb > self._anomaly_thresholds["memory_usage_high"].warning_value: resource_usage_alerts.labels( resource_type="memory", threshold_type="warning" if memory_mb < self._anomaly_thresholds["memory_usage_high"].critical_value else "critical", ).inc() def track_database_operation( self, query_type: str, table_name: str, operation: str, duration: float ): """Track database operation performance.""" database_query_duration.labels( query_type=query_type, table_name=table_name, operation=operation ).observe(duration) def track_cache_operation( self, cache_type: str, operation: str, hit: bool, key_pattern: str = "general" ): """Track cache operation performance.""" status = "hit" if hit else "miss" cache_operations_total.labels( cache_type=cache_type, operation=operation, status=status ).inc() def detect_anomaly(self, anomaly_type: str, severity: str, context: dict[str, Any]): """Record detected anomaly.""" strategy_name = context.get("strategy_name", "unknown") strategy_performance_anomalies.labels( strategy_name=strategy_name, anomaly_type=anomaly_type, severity=severity ).inc() self.logger.warning( f"Anomaly detected: {anomaly_type} (severity: {severity})", extra={"context": context}, ) def update_portfolio_metrics( self, portfolio_id: str, portfolio_value_usd: float, daily_pnl_usd: float, strategy: str, positions: list[dict[str, Any]], ): """Update portfolio-related metrics.""" portfolio_value.labels(portfolio_id=portfolio_id, currency="USD").set( portfolio_value_usd ) portfolio_daily_pnl.labels( portfolio_id=portfolio_id, strategy=strategy ).observe(daily_pnl_usd) # Update position counts for position in positions: active_positions.labels( portfolio_id=portfolio_id, symbol=position.get("symbol", "unknown"), position_type=position.get("type", "long"), ).set(position.get("quantity", 0)) def _get_memory_usage(self) -> float: """Get current memory usage in MB.""" try: import psutil process = psutil.Process() return process.memory_info().rss / 1024 / 1024 except ImportError: return 0.0 def _categorize_data_size(self, data_points: int) -> str: """Categorize data size for metrics labeling.""" if data_points < 100: return "small" elif data_points < 1000: return "medium" elif data_points < 10000: return "large" else: return "xlarge" def _categorize_complexity(self, data_points: int, duration: float) -> str: """Categorize operation complexity.""" if data_points < 1000 and duration < 10: return "simple" elif data_points < 10000 and duration < 60: return "moderate" else: return "complex" def _categorize_error_code(self, status_code: int) -> str: """Categorize HTTP error codes.""" if 400 <= status_code < 500: return "client_error" elif 500 <= status_code < 600: return "server_error" else: return "other" def _check_execution_anomalies( self, strategy_name: str, duration: float, memory_mb: float ): """Check for execution performance anomalies.""" threshold = self._anomaly_thresholds["execution_time_high"] if duration > threshold.critical_value: self.detect_anomaly( "execution_time_high", "critical", { "strategy_name": strategy_name, "duration": duration, "threshold": threshold.critical_value, }, ) elif duration > threshold.warning_value: self.detect_anomaly( "execution_time_high", "warning", { "strategy_name": strategy_name, "duration": duration, "threshold": threshold.warning_value, }, ) def _check_strategy_anomalies( self, strategy_name: str, sharpe_ratio: float, max_drawdown: float, win_rate: float, ): """Check for strategy performance anomalies.""" # Check Sharpe ratio threshold = self._anomaly_thresholds["sharpe_ratio_low"] if sharpe_ratio < threshold.critical_value: self.detect_anomaly( "sharpe_ratio_low", "critical", {"strategy_name": strategy_name, "sharpe_ratio": sharpe_ratio}, ) elif sharpe_ratio < threshold.warning_value: self.detect_anomaly( "sharpe_ratio_low", "warning", {"strategy_name": strategy_name, "sharpe_ratio": sharpe_ratio}, ) # Check max drawdown threshold = self._anomaly_thresholds["max_drawdown_high"] if max_drawdown > threshold.critical_value: self.detect_anomaly( "max_drawdown_high", "critical", {"strategy_name": strategy_name, "max_drawdown": max_drawdown}, ) elif max_drawdown > threshold.warning_value: self.detect_anomaly( "max_drawdown_high", "warning", {"strategy_name": strategy_name, "max_drawdown": max_drawdown}, ) # Check win rate threshold = self._anomaly_thresholds["win_rate_low"] if win_rate < threshold.critical_value: self.detect_anomaly( "win_rate_low", "critical", {"strategy_name": strategy_name, "win_rate": win_rate}, ) elif win_rate < threshold.warning_value: self.detect_anomaly( "win_rate_low", "warning", {"strategy_name": strategy_name, "win_rate": win_rate}, ) def _check_api_anomalies( self, provider: str, endpoint: str, status_code: int, duration: float ): """Check for API call anomalies.""" # Check API response time if duration > 30.0: # 30 second threshold self.detect_anomaly( "api_response_slow", "warning" if duration < 60.0 else "critical", {"provider": provider, "endpoint": endpoint, "duration": duration}, ) # Check for repeated failures if status_code >= 500: self.detect_anomaly( "api_server_error", "critical", { "provider": provider, "endpoint": endpoint, "status_code": status_code, }, ) def get_metrics_text(self) -> str: """Get all backtesting metrics in Prometheus text format.""" return generate_latest(BACKTESTING_REGISTRY).decode("utf-8") # ============================================================================= # GLOBAL INSTANCES AND CONVENIENCE FUNCTIONS # ============================================================================= # Global metrics collector instance _metrics_collector: BacktestingMetricsCollector | None = None _collector_lock = threading.Lock() def get_backtesting_metrics() -> BacktestingMetricsCollector: """Get or create the global backtesting metrics collector.""" global _metrics_collector if _metrics_collector is None: with _collector_lock: if _metrics_collector is None: _metrics_collector = BacktestingMetricsCollector() return _metrics_collector # Convenience functions for common operations def track_backtest_execution( strategy_name: str, symbol: str, timeframe: str, data_points: int = 0 ): """Convenience function to track backtest execution.""" return get_backtesting_metrics().track_backtest_execution( strategy_name, symbol, timeframe, data_points ) def track_strategy_performance( strategy_name: str, symbol: str, period: str, returns: float, sharpe_ratio: float, max_drawdown: float, win_rate: float, total_trades: int, winning_trades: int, ): """Convenience function to track strategy performance.""" get_backtesting_metrics().track_strategy_performance( strategy_name, symbol, period, returns, sharpe_ratio, max_drawdown, win_rate, total_trades, winning_trades, ) def track_api_call_metrics( provider: str, endpoint: str, method: str, status_code: int, duration: float, error_type: str | None = None, remaining_calls: int | None = None, reset_time: datetime | None = None, ): """Convenience function to track API call metrics.""" get_backtesting_metrics().track_api_call( provider, endpoint, method, status_code, duration, error_type, remaining_calls, reset_time, ) def track_anomaly_detection(anomaly_type: str, severity: str, context: dict[str, Any]): """Convenience function to track detected anomalies.""" get_backtesting_metrics().detect_anomaly(anomaly_type, severity, context) def get_metrics_for_prometheus() -> str: """Get backtesting metrics in Prometheus format.""" return get_backtesting_metrics().get_metrics_text()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server