Skip to main content
Glama

USPTO Final Petition Decisions MCP Server

by john-walkoe
health_check.py13.2 kB
""" Health check system for monitoring application health Provides comprehensive health checks for all system components including API connectivity, circuit breaker status, cache health, and dependencies. """ import asyncio import time import logging from typing import Dict, Any, List, Optional from enum import Enum from ..api.fpd_client import FPDClient from ..config.field_manager import FieldManager from ..shared.cache import CacheManager from ..shared.structured_logging import StructuredLogger logger = logging.getLogger(__name__) class HealthStatus(Enum): """Health check status levels""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" class HealthCheck: """Individual health check result""" def __init__( self, name: str, status: HealthStatus, message: str, response_time_ms: Optional[float] = None, details: Optional[Dict[str, Any]] = None ): self.name = name self.status = status self.message = message self.response_time_ms = response_time_ms self.details = details or {} self.timestamp = time.time() def to_dict(self) -> Dict[str, Any]: """Convert health check to dictionary""" result = { "name": self.name, "status": self.status.value, "message": self.message, "timestamp": self.timestamp } if self.response_time_ms is not None: result["response_time_ms"] = self.response_time_ms if self.details: result["details"] = self.details return result class HealthChecker: """Comprehensive health check system""" def __init__( self, api_client: Optional[FPDClient] = None, field_manager: Optional[FieldManager] = None, cache_manager: Optional[CacheManager] = None, structured_logger: Optional[StructuredLogger] = None ): """ Initialize health checker Args: api_client: FPD API client for connectivity checks field_manager: Field manager for configuration checks cache_manager: Cache manager for cache health checks structured_logger: Structured logger for health events """ self.api_client = api_client self.field_manager = field_manager self.cache_manager = cache_manager self.structured_logger = structured_logger or StructuredLogger("health_checker") async def check_api_connectivity(self) -> HealthCheck: """Check USPTO API connectivity""" start_time = time.time() try: if not self.api_client: return HealthCheck( name="api_connectivity", status=HealthStatus.UNHEALTHY, message="API client not configured" ) # Try a minimal API call to test connectivity result = await self.api_client.search_petitions( query="*", limit=1, offset=0 ) response_time_ms = (time.time() - start_time) * 1000 if "error" in result: return HealthCheck( name="api_connectivity", status=HealthStatus.UNHEALTHY, message=f"API error: {result.get('error', 'Unknown error')}", response_time_ms=response_time_ms, details={"error_details": result} ) return HealthCheck( name="api_connectivity", status=HealthStatus.HEALTHY, message="API connectivity OK", response_time_ms=response_time_ms, details={"result_count": result.get("recordTotalQuantity", 0)} ) except Exception as e: response_time_ms = (time.time() - start_time) * 1000 return HealthCheck( name="api_connectivity", status=HealthStatus.UNHEALTHY, message=f"API connectivity failed: {str(e)}", response_time_ms=response_time_ms, details={"exception": str(e)} ) def check_circuit_breakers(self) -> HealthCheck: """Check circuit breaker status""" try: if not self.api_client: return HealthCheck( name="circuit_breakers", status=HealthStatus.UNHEALTHY, message="API client not configured" ) breaker_status = self.api_client.get_circuit_breaker_status() # Check if any circuit breakers are open open_breakers = [] degraded_breakers = [] for breaker_name, status in breaker_status.items(): if status["state"] == "open": open_breakers.append(breaker_name) elif status["state"] == "half_open": degraded_breakers.append(breaker_name) if open_breakers: return HealthCheck( name="circuit_breakers", status=HealthStatus.UNHEALTHY, message=f"Circuit breakers open: {', '.join(open_breakers)}", details=breaker_status ) elif degraded_breakers: return HealthCheck( name="circuit_breakers", status=HealthStatus.DEGRADED, message=f"Circuit breakers in recovery: {', '.join(degraded_breakers)}", details=breaker_status ) else: return HealthCheck( name="circuit_breakers", status=HealthStatus.HEALTHY, message="All circuit breakers closed", details=breaker_status ) except Exception as e: return HealthCheck( name="circuit_breakers", status=HealthStatus.UNHEALTHY, message=f"Circuit breaker check failed: {str(e)}", details={"exception": str(e)} ) def check_cache_health(self) -> HealthCheck: """Check cache system health""" try: if not self.cache_manager: return HealthCheck( name="cache", status=HealthStatus.DEGRADED, message="Cache manager not configured - caching disabled" ) cache_stats = self.cache_manager.get_stats() # Determine health based on cache statistics if cache_stats.get("cache_type") == "TTLCache": current_size = cache_stats.get("current_size", 0) max_size = cache_stats.get("max_size", 100) utilization = (current_size / max_size) * 100 if max_size > 0 else 0 if utilization > 90: status = HealthStatus.DEGRADED message = f"Cache near capacity: {utilization:.1f}% full" else: status = HealthStatus.HEALTHY message = f"Cache healthy: {utilization:.1f}% full" else: # SimpleCache valid_items = cache_stats.get("valid_items", 0) total_items = cache_stats.get("total_items", 0) if total_items > 0: valid_ratio = (valid_items / total_items) * 100 if valid_ratio < 50: status = HealthStatus.DEGRADED message = f"Many expired items in cache: {valid_ratio:.1f}% valid" else: status = HealthStatus.HEALTHY message = f"Cache healthy: {valid_ratio:.1f}% valid items" else: status = HealthStatus.HEALTHY message = "Cache empty but healthy" return HealthCheck( name="cache", status=status, message=message, details=cache_stats ) except Exception as e: return HealthCheck( name="cache", status=HealthStatus.UNHEALTHY, message=f"Cache health check failed: {str(e)}", details={"exception": str(e)} ) def check_configuration(self) -> HealthCheck: """Check configuration health""" try: if not self.field_manager: return HealthCheck( name="configuration", status=HealthStatus.UNHEALTHY, message="Field manager not configured" ) # Check if field sets are available field_sets = self.field_manager.get_predefined_sets() required_sets = ["petitions_minimal", "petitions_balanced"] missing_sets = [] for required_set in required_sets: if required_set not in field_sets: missing_sets.append(required_set) if missing_sets: return HealthCheck( name="configuration", status=HealthStatus.UNHEALTHY, message=f"Missing required field sets: {', '.join(missing_sets)}", details={"available_sets": list(field_sets.keys())} ) # Check field set completeness minimal_fields = field_sets["petitions_minimal"].get("fields", []) balanced_fields = field_sets["petitions_balanced"].get("fields", []) if len(minimal_fields) < 5: return HealthCheck( name="configuration", status=HealthStatus.DEGRADED, message="Minimal field set has too few fields", details={"minimal_field_count": len(minimal_fields)} ) return HealthCheck( name="configuration", status=HealthStatus.HEALTHY, message="Configuration healthy", details={ "field_sets": len(field_sets), "minimal_fields": len(minimal_fields), "balanced_fields": len(balanced_fields) } ) except Exception as e: return HealthCheck( name="configuration", status=HealthStatus.UNHEALTHY, message=f"Configuration check failed: {str(e)}", details={"exception": str(e)} ) async def run_all_checks(self) -> Dict[str, Any]: """Run all health checks and return comprehensive status""" start_time = time.time() # Run all health checks checks = await asyncio.gather( self.check_api_connectivity(), asyncio.create_task(asyncio.coroutine(lambda: self.check_circuit_breakers)()), asyncio.create_task(asyncio.coroutine(lambda: self.check_cache_health)()), asyncio.create_task(asyncio.coroutine(lambda: self.check_configuration)()), return_exceptions=True ) # Convert results to health checks health_checks = [] for check in checks: if isinstance(check, HealthCheck): health_checks.append(check) elif isinstance(check, Exception): health_checks.append(HealthCheck( name="unknown", status=HealthStatus.UNHEALTHY, message=f"Health check failed: {str(check)}" )) # Determine overall status overall_status = HealthStatus.HEALTHY unhealthy_count = 0 degraded_count = 0 for check in health_checks: if check.status == HealthStatus.UNHEALTHY: unhealthy_count += 1 overall_status = HealthStatus.UNHEALTHY elif check.status == HealthStatus.DEGRADED: degraded_count += 1 if overall_status == HealthStatus.HEALTHY: overall_status = HealthStatus.DEGRADED total_time_ms = (time.time() - start_time) * 1000 result = { "status": overall_status.value, "timestamp": time.time(), "total_check_time_ms": total_time_ms, "summary": { "total_checks": len(health_checks), "healthy": len([c for c in health_checks if c.status == HealthStatus.HEALTHY]), "degraded": degraded_count, "unhealthy": unhealthy_count }, "checks": [check.to_dict() for check in health_checks] } # Log health check result self.structured_logger.log_health_check( component="overall_system", status=overall_status.value, details=result["summary"], response_time_ms=total_time_ms ) return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/john-walkoe/uspto_fpd_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server