Skip to main content
Glama
health_manager.py23.1 kB
# -*- coding: utf-8 -*- """ 服务健康管理器 实现智能的服务健康监控、故障检测和自动恢复 """ import time import threading import requests import subprocess import psutil import logging from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass, field from enum import Enum from pathlib import Path import json import signal from contextlib import contextmanager # 配置日志 logger = logging.getLogger(__name__) class ServiceStatus(Enum): """服务状态枚举""" HEALTHY = "healthy" UNHEALTHY = "unhealthy" DEGRADED = "degraded" STARTING = "starting" STOPPING = "stopping" STOPPED = "stopped" UNKNOWN = "unknown" class HealthCheckType(Enum): """健康检查类型""" HTTP = "http" TCP = "tcp" PROCESS = "process" CUSTOM = "custom" @dataclass class HealthCheckConfig: """健康检查配置""" check_type: HealthCheckType endpoint: Optional[str] = None port: Optional[int] = None timeout: int = 5 interval: int = 30 retries: int = 3 retry_delay: int = 5 expected_status: int = 200 custom_check: Optional[Callable] = None @dataclass class ServiceConfig: """服务配置""" name: str description: str health_check: HealthCheckConfig start_command: List[str] stop_command: Optional[List[str]] = None working_directory: Optional[str] = None environment: Dict[str, str] = field(default_factory=dict) dependencies: List[str] = field(default_factory=list) max_restart_attempts: int = 3 restart_delay: int = 10 graceful_shutdown_timeout: int = 30 fallback_strategy: Optional[str] = None @dataclass class ServiceMetrics: """服务指标""" cpu_usage: float = 0.0 memory_usage: float = 0.0 response_time: float = 0.0 error_rate: float = 0.0 uptime: float = 0.0 restart_count: int = 0 last_health_check: Optional[float] = None last_error: Optional[str] = None class ServiceInstance: """服务实例""" def __init__(self, config: ServiceConfig): self.config = config self.status = ServiceStatus.STOPPED self.process: Optional[subprocess.Popen] = None self.metrics = ServiceMetrics() self.start_time: Optional[float] = None self.restart_attempts = 0 self.last_restart_time: Optional[float] = None self.health_check_failures = 0 self.lock = threading.Lock() def start(self) -> bool: """启动服务""" with self.lock: if self.status in [ServiceStatus.HEALTHY, ServiceStatus.STARTING]: logger.info(f"服务 {self.config.name} 已在运行") return True try: logger.info(f"启动服务: {self.config.name}") self.status = ServiceStatus.STARTING # 准备环境 env = {**os.environ, **self.config.environment} # 启动进程 self.process = subprocess.Popen( self.config.start_command, cwd=self.config.working_directory, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid if hasattr(os, 'setsid') else None ) self.start_time = time.time() self.restart_attempts += 1 self.last_restart_time = time.time() # 等待服务启动 startup_timeout = 30 start_check_time = time.time() while time.time() - start_check_time < startup_timeout: if self._perform_health_check(): self.status = ServiceStatus.HEALTHY self.health_check_failures = 0 logger.info(f"服务 {self.config.name} 启动成功") return True # 检查进程是否还在运行 if self.process.poll() is not None: stdout, stderr = self.process.communicate() logger.error(f"服务 {self.config.name} 启动失败: {stderr.decode()}") self.status = ServiceStatus.STOPPED return False time.sleep(1) logger.warning(f"服务 {self.config.name} 启动超时") self.status = ServiceStatus.UNHEALTHY return False except Exception as e: logger.error(f"启动服务 {self.config.name} 失败: {e}") self.status = ServiceStatus.STOPPED self.metrics.last_error = str(e) return False def stop(self, graceful: bool = True) -> bool: """停止服务""" with self.lock: if self.status == ServiceStatus.STOPPED: return True try: logger.info(f"停止服务: {self.config.name}") self.status = ServiceStatus.STOPPING if self.process: if graceful: # 优雅关闭 self.process.terminate() try: self.process.wait(timeout=self.config.graceful_shutdown_timeout) except subprocess.TimeoutExpired: logger.warning(f"服务 {self.config.name} 优雅关闭超时,强制终止") self.process.kill() self.process.wait() else: # 强制关闭 self.process.kill() self.process.wait() self.process = None self.status = ServiceStatus.STOPPED self.start_time = None logger.info(f"服务 {self.config.name} 已停止") return True except Exception as e: logger.error(f"停止服务 {self.config.name} 失败: {e}") self.metrics.last_error = str(e) return False def restart(self) -> bool: """重启服务""" logger.info(f"重启服务: {self.config.name}") if self.stop(): time.sleep(self.config.restart_delay) return self.start() return False def _perform_health_check(self) -> bool: """执行健康检查""" try: check_start_time = time.time() if self.config.health_check.check_type == HealthCheckType.HTTP: response = requests.get( self.config.health_check.endpoint, timeout=self.config.health_check.timeout ) success = response.status_code == self.config.health_check.expected_status elif self.config.health_check.check_type == HealthCheckType.TCP: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.config.health_check.timeout) result = sock.connect_ex(('localhost', self.config.health_check.port)) sock.close() success = result == 0 elif self.config.health_check.check_type == HealthCheckType.PROCESS: success = self.process is not None and self.process.poll() is None elif self.config.health_check.check_type == HealthCheckType.CUSTOM: if self.config.health_check.custom_check: success = self.config.health_check.custom_check(self) else: success = False else: success = False # 更新指标 self.metrics.response_time = time.time() - check_start_time self.metrics.last_health_check = time.time() if success: self.health_check_failures = 0 else: self.health_check_failures += 1 return success except Exception as e: logger.warning(f"服务 {self.config.name} 健康检查失败: {e}") self.health_check_failures += 1 self.metrics.last_error = str(e) return False def update_metrics(self): """更新服务指标""" if self.process and self.process.poll() is None: try: # 获取进程信息 proc = psutil.Process(self.process.pid) self.metrics.cpu_usage = proc.cpu_percent() self.metrics.memory_usage = proc.memory_info().rss / (1024 * 1024) # MB # 计算运行时间 if self.start_time: self.metrics.uptime = time.time() - self.start_time self.metrics.restart_count = self.restart_attempts except (psutil.NoSuchProcess, psutil.AccessDenied): pass def get_status_info(self) -> Dict[str, Any]: """获取服务状态信息""" return { "name": self.config.name, "status": self.status.value, "metrics": { "cpu_usage": self.metrics.cpu_usage, "memory_usage": self.metrics.memory_usage, "response_time": self.metrics.response_time, "uptime": self.metrics.uptime, "restart_count": self.metrics.restart_count, "health_check_failures": self.health_check_failures, "last_health_check": self.metrics.last_health_check, "last_error": self.metrics.last_error }, "process_id": self.process.pid if self.process else None, "start_time": self.start_time } class FallbackStrategy: """降级策略""" def __init__(self): self.strategies = {} def register_strategy(self, service_name: str, strategy_func: Callable): """注册降级策略""" self.strategies[service_name] = strategy_func def execute_fallback(self, service_name: str, *args, **kwargs) -> Any: """执行降级策略""" if service_name in self.strategies: logger.info(f"执行服务 {service_name} 的降级策略") return self.strategies[service_name](*args, **kwargs) else: logger.warning(f"服务 {service_name} 没有配置降级策略") return None class ServiceHealthManager: """服务健康管理器""" def __init__(self): self.services: Dict[str, ServiceInstance] = {} self.fallback_strategy = FallbackStrategy() self.monitoring_thread: Optional[threading.Thread] = None self.monitoring_active = False self.alert_callbacks: List[Callable] = [] # 配置文件路径 self.config_file = Path("service_health_config.json") # 加载配置 self._load_configuration() def register_service(self, config: ServiceConfig) -> bool: """注册服务""" try: if config.name in self.services: logger.warning(f"服务 {config.name} 已存在,将覆盖配置") self.services[config.name] = ServiceInstance(config) logger.info(f"服务 {config.name} 注册成功") # 保存配置 self._save_configuration() return True except Exception as e: logger.error(f"注册服务 {config.name} 失败: {e}") return False def start_service(self, service_name: str) -> bool: """启动指定服务""" if service_name not in self.services: logger.error(f"服务 {service_name} 未注册") return False service = self.services[service_name] # 检查依赖服务 for dep_name in service.config.dependencies: if dep_name in self.services: dep_service = self.services[dep_name] if dep_service.status != ServiceStatus.HEALTHY: logger.info(f"启动依赖服务: {dep_name}") if not self.start_service(dep_name): logger.error(f"依赖服务 {dep_name} 启动失败") return False return service.start() def stop_service(self, service_name: str, graceful: bool = True) -> bool: """停止指定服务""" if service_name not in self.services: logger.error(f"服务 {service_name} 未注册") return False return self.services[service_name].stop(graceful) def restart_service(self, service_name: str) -> bool: """重启指定服务""" if service_name not in self.services: logger.error(f"服务 {service_name} 未注册") return False return self.services[service_name].restart() def start_monitoring(self): """启动监控""" if self.monitoring_active: logger.info("监控已在运行") return self.monitoring_active = True self.monitoring_thread = threading.Thread(target=self._monitoring_loop, daemon=True) self.monitoring_thread.start() logger.info("服务监控已启动") def stop_monitoring(self): """停止监控""" self.monitoring_active = False if self.monitoring_thread: self.monitoring_thread.join(timeout=5) logger.info("服务监控已停止") def _monitoring_loop(self): """监控循环""" while self.monitoring_active: try: for service_name, service in self.services.items(): self._check_service_health(service) service.update_metrics() time.sleep(10) # 每10秒检查一次 except Exception as e: logger.error(f"监控循环异常: {e}") time.sleep(5) def _check_service_health(self, service: ServiceInstance): """检查服务健康状态""" if service.status in [ServiceStatus.STOPPED, ServiceStatus.STOPPING]: return # 执行健康检查 is_healthy = service._perform_health_check() if is_healthy: if service.status != ServiceStatus.HEALTHY: service.status = ServiceStatus.HEALTHY logger.info(f"服务 {service.config.name} 恢复健康") self._trigger_alert("service_recovered", service) else: # 健康检查失败 if service.health_check_failures >= service.config.health_check.retries: if service.status == ServiceStatus.HEALTHY: service.status = ServiceStatus.UNHEALTHY logger.warning(f"服务 {service.config.name} 变为不健康状态") self._trigger_alert("service_unhealthy", service) # 尝试自动重启 if service.restart_attempts < service.config.max_restart_attempts: logger.info(f"尝试自动重启服务: {service.config.name}") if service.restart(): self._trigger_alert("service_restarted", service) else: self._trigger_alert("service_restart_failed", service) else: logger.error(f"服务 {service.config.name} 重启次数超限") self._trigger_alert("service_restart_limit_exceeded", service) def _trigger_alert(self, alert_type: str, service: ServiceInstance): """触发告警""" alert_data = { "type": alert_type, "service": service.config.name, "timestamp": time.time(), "status": service.status.value, "metrics": service.get_status_info()["metrics"] } for callback in self.alert_callbacks: try: callback(alert_data) except Exception as e: logger.error(f"告警回调执行失败: {e}") def register_alert_callback(self, callback: Callable): """注册告警回调""" self.alert_callbacks.append(callback) def register_fallback_strategy(self, service_name: str, strategy_func: Callable): """注册降级策略""" self.fallback_strategy.register_strategy(service_name, strategy_func) def execute_fallback(self, service_name: str, *args, **kwargs) -> Any: """执行降级策略""" return self.fallback_strategy.execute_fallback(service_name, *args, **kwargs) def get_service_status(self, service_name: Optional[str] = None) -> Dict[str, Any]: """获取服务状态""" if service_name: if service_name in self.services: return self.services[service_name].get_status_info() else: return {"error": f"服务 {service_name} 未找到"} else: return { service_name: service.get_status_info() for service_name, service in self.services.items() } def _load_configuration(self): """加载配置文件""" if self.config_file.exists(): try: with open(self.config_file, 'r', encoding='utf-8') as f: config_data = json.load(f) for service_data in config_data.get('services', []): # 重建服务配置 health_check = HealthCheckConfig( check_type=HealthCheckType(service_data['health_check']['check_type']), endpoint=service_data['health_check'].get('endpoint'), port=service_data['health_check'].get('port'), timeout=service_data['health_check'].get('timeout', 5), interval=service_data['health_check'].get('interval', 30), retries=service_data['health_check'].get('retries', 3) ) service_config = ServiceConfig( name=service_data['name'], description=service_data['description'], health_check=health_check, start_command=service_data['start_command'], stop_command=service_data.get('stop_command'), working_directory=service_data.get('working_directory'), environment=service_data.get('environment', {}), dependencies=service_data.get('dependencies', []) ) self.register_service(service_config) logger.info(f"从配置文件加载了 {len(config_data.get('services', []))} 个服务") except Exception as e: logger.error(f"加载配置文件失败: {e}") def _save_configuration(self): """保存配置文件""" try: config_data = { "services": [ { "name": service.config.name, "description": service.config.description, "health_check": { "check_type": service.config.health_check.check_type.value, "endpoint": service.config.health_check.endpoint, "port": service.config.health_check.port, "timeout": service.config.health_check.timeout, "interval": service.config.health_check.interval, "retries": service.config.health_check.retries }, "start_command": service.config.start_command, "stop_command": service.config.stop_command, "working_directory": service.config.working_directory, "environment": service.config.environment, "dependencies": service.config.dependencies } for service in self.services.values() ] } with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config_data, f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"保存配置文件失败: {e}") def shutdown(self): """关闭管理器""" logger.info("关闭服务健康管理器") # 停止监控 self.stop_monitoring() # 停止所有服务 for service_name in list(self.services.keys()): self.stop_service(service_name) logger.info("服务健康管理器已关闭") # 便捷函数 def create_excel_service_config() -> ServiceConfig: """创建 Excel Go 服务配置""" return ServiceConfig( name="excel-go-service", description="Excel Go 高性能处理服务", health_check=HealthCheckConfig( check_type=HealthCheckType.HTTP, endpoint="http://localhost:8080/api/v1/health", timeout=5, interval=30, retries=3 ), start_command=["./excel-service"], working_directory="excel-service", environment={"PORT": "8080"}, max_restart_attempts=3, restart_delay=10 ) if __name__ == "__main__": # 测试代码 import os # 创建健康管理器 manager = ServiceHealthManager() # 注册 Excel 服务 excel_config = create_excel_service_config() manager.register_service(excel_config) # 注册告警回调 def alert_handler(alert_data): print(f"告警: {alert_data}") manager.register_alert_callback(alert_handler) # 启动监控 manager.start_monitoring() try: # 启动服务 manager.start_service("excel-go-service") # 运行一段时间 time.sleep(60) # 获取状态 status = manager.get_service_status() print(f"服务状态: {status}") finally: manager.shutdown()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server