Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
circuit_breaker_services.py10.2 kB
""" Service-specific circuit breakers for external APIs. Provides pre-configured circuit breakers for different external services. """ import logging import pandas as pd import requests from requests.exceptions import ConnectionError, HTTPError, RequestException, Timeout from maverick_mcp.config.settings import get_settings from maverick_mcp.utils.circuit_breaker import ( CircuitBreakerConfig, EnhancedCircuitBreaker, FailureDetectionStrategy, ) from maverick_mcp.utils.fallback_strategies import ( ECONOMIC_DATA_FALLBACK, MARKET_DATA_FALLBACK, NEWS_FALLBACK, STOCK_DATA_FALLBACK_CHAIN, ) logger = logging.getLogger(__name__) settings = get_settings() # Service-specific configurations YFINANCE_CONFIG = CircuitBreakerConfig( name="yfinance", failure_threshold=3, failure_rate_threshold=0.5, timeout_threshold=30.0, recovery_timeout=120, # 2 minutes success_threshold=2, window_size=300, # 5 minutes detection_strategy=FailureDetectionStrategy.COMBINED, expected_exceptions=(Exception,), # yfinance can throw various exceptions ) FINVIZ_CONFIG = CircuitBreakerConfig( name="finviz", failure_threshold=5, failure_rate_threshold=0.6, timeout_threshold=20.0, recovery_timeout=180, # 3 minutes success_threshold=3, window_size=300, detection_strategy=FailureDetectionStrategy.COMBINED, expected_exceptions=(Exception,), ) FRED_CONFIG = CircuitBreakerConfig( name="fred_api", failure_threshold=5, failure_rate_threshold=0.5, timeout_threshold=15.0, recovery_timeout=300, # 5 minutes success_threshold=3, window_size=600, # 10 minutes detection_strategy=FailureDetectionStrategy.COMBINED, expected_exceptions=(Exception,), ) EXTERNAL_API_CONFIG = CircuitBreakerConfig( name="external_api", failure_threshold=3, failure_rate_threshold=0.4, timeout_threshold=10.0, recovery_timeout=60, success_threshold=2, window_size=300, detection_strategy=FailureDetectionStrategy.COMBINED, expected_exceptions=(RequestException, HTTPError, Timeout, ConnectionError), ) TIINGO_CONFIG = CircuitBreakerConfig( name="tiingo", failure_threshold=3, failure_rate_threshold=0.5, timeout_threshold=15.0, recovery_timeout=120, success_threshold=2, window_size=300, detection_strategy=FailureDetectionStrategy.COMBINED, expected_exceptions=(Exception,), ) HTTP_CONFIG = CircuitBreakerConfig( name="http_general", failure_threshold=5, failure_rate_threshold=0.6, timeout_threshold=30.0, recovery_timeout=60, success_threshold=3, window_size=300, detection_strategy=FailureDetectionStrategy.FAILURE_RATE, expected_exceptions=(RequestException, HTTPError, Timeout, ConnectionError), ) class StockDataCircuitBreaker(EnhancedCircuitBreaker): """Circuit breaker for stock data APIs (yfinance).""" def __init__(self): """Initialize with yfinance configuration.""" super().__init__(YFINANCE_CONFIG) self.fallback_chain = STOCK_DATA_FALLBACK_CHAIN def fetch_with_fallback( self, fetch_func: callable, symbol: str, start_date: str, end_date: str, **kwargs, ) -> pd.DataFrame: """ Fetch stock data with circuit breaker and fallback. Args: fetch_func: The function to fetch data (e.g., yfinance call) symbol: Stock symbol start_date: Start date end_date: End date **kwargs: Additional arguments for fetch_func Returns: DataFrame with stock data """ try: # Try primary fetch through circuit breaker return self.call_sync(fetch_func, symbol, start_date, end_date, **kwargs) except Exception as e: logger.warning( f"Primary stock data fetch failed for {symbol}: {e}. " f"Attempting fallback strategies." ) # Execute fallback chain return self.fallback_chain.execute_sync( symbol, start_date, end_date, **kwargs ) async def fetch_with_fallback_async( self, fetch_func: callable, symbol: str, start_date: str, end_date: str, **kwargs, ) -> pd.DataFrame: """Async version of fetch_with_fallback.""" try: return await self.call_async( fetch_func, symbol, start_date, end_date, **kwargs ) except Exception as e: logger.warning( f"Primary stock data fetch failed for {symbol}: {e}. " f"Attempting fallback strategies." ) return await self.fallback_chain.execute_async( symbol, start_date, end_date, **kwargs ) class MarketDataCircuitBreaker(EnhancedCircuitBreaker): """Circuit breaker for market data APIs (finviz, External API).""" def __init__(self, service_name: str = "market_data"): """Initialize with market data configuration.""" if service_name == "finviz": config = FINVIZ_CONFIG elif service_name == "external_api": config = EXTERNAL_API_CONFIG else: config = FINVIZ_CONFIG # Default super().__init__(config) self.fallback = MARKET_DATA_FALLBACK def fetch_with_fallback( self, fetch_func: callable, mover_type: str = "gainers", **kwargs ) -> dict: """Fetch market data with circuit breaker and fallback.""" try: return self.call_sync(fetch_func, mover_type, **kwargs) except Exception as e: logger.warning( f"Market data fetch failed for {mover_type}: {e}. " f"Returning fallback data." ) return self.fallback.execute_sync(mover_type, **kwargs) class EconomicDataCircuitBreaker(EnhancedCircuitBreaker): """Circuit breaker for economic data APIs (FRED).""" def __init__(self): """Initialize with FRED configuration.""" super().__init__(FRED_CONFIG) self.fallback = ECONOMIC_DATA_FALLBACK def fetch_with_fallback( self, fetch_func: callable, series_id: str, start_date: str, end_date: str, **kwargs, ) -> pd.Series: """Fetch economic data with circuit breaker and fallback.""" try: return self.call_sync(fetch_func, series_id, start_date, end_date, **kwargs) except Exception as e: logger.warning( f"Economic data fetch failed for {series_id}: {e}. " f"Using fallback values." ) return self.fallback.execute_sync(series_id, start_date, end_date, **kwargs) class NewsDataCircuitBreaker(EnhancedCircuitBreaker): """Circuit breaker for news/sentiment APIs.""" def __init__(self): """Initialize with news API configuration.""" # Use a generic config for news APIs config = CircuitBreakerConfig( name="news_api", failure_threshold=3, failure_rate_threshold=0.6, timeout_threshold=10.0, recovery_timeout=300, success_threshold=2, window_size=600, detection_strategy=FailureDetectionStrategy.COMBINED, expected_exceptions=(Exception,), ) super().__init__(config) self.fallback = NEWS_FALLBACK def fetch_with_fallback(self, fetch_func: callable, symbol: str, **kwargs) -> dict: """Fetch news data with circuit breaker and fallback.""" try: return self.call_sync(fetch_func, symbol, **kwargs) except Exception as e: logger.warning( f"News data fetch failed for {symbol}: {e}. Returning empty news data." ) return self.fallback.execute_sync(symbol, **kwargs) class HttpCircuitBreaker(EnhancedCircuitBreaker): """General circuit breaker for HTTP requests.""" def __init__(self): """Initialize with HTTP configuration.""" super().__init__(HTTP_CONFIG) def request_with_circuit_breaker( self, method: str, url: str, session: requests.Session | None = None, **kwargs ) -> requests.Response: """ Make HTTP request with circuit breaker protection. Args: method: HTTP method (GET, POST, etc.) url: URL to request session: Optional requests session **kwargs: Additional arguments for requests Returns: Response object """ def make_request(): # Ensure timeout is set if "timeout" not in kwargs: kwargs["timeout"] = self.config.timeout_threshold if session: return session.request(method, url, **kwargs) else: return requests.request(method, url, **kwargs) return self.call_sync(make_request) # Global instances for reuse stock_data_breaker = StockDataCircuitBreaker() market_data_breaker = MarketDataCircuitBreaker() economic_data_breaker = EconomicDataCircuitBreaker() news_data_breaker = NewsDataCircuitBreaker() http_breaker = HttpCircuitBreaker() def get_service_circuit_breaker(service: str) -> EnhancedCircuitBreaker: """ Get a circuit breaker for a specific service. Args: service: Service name (yfinance, finviz, fred, news, http) Returns: Configured circuit breaker for the service """ service_breakers = { "yfinance": stock_data_breaker, "finviz": market_data_breaker, "fred": economic_data_breaker, "external_api": MarketDataCircuitBreaker("external_api"), "tiingo": EnhancedCircuitBreaker(TIINGO_CONFIG), "news": news_data_breaker, "http": http_breaker, } breaker = service_breakers.get(service) if not breaker: logger.warning( f"No specific circuit breaker for service '{service}', using HTTP breaker" ) return http_breaker return breaker

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server