Skip to main content
Glama
enhanced_monitor.py28.3 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 增强监控系统 用于实时监控项目状态、性能指标和依赖健康状况 """ import json import time import psutil import subprocess from pathlib import Path from typing import Dict, List, Optional from datetime import datetime, timedelta import threading import queue import logging from dataclasses import dataclass, asdict @dataclass class SystemMetrics: """系统指标数据类""" timestamp: str cpu_percent: float memory_percent: float memory_used_gb: float memory_total_gb: float disk_percent: float disk_used_gb: float disk_total_gb: float network_sent_mb: float network_recv_mb: float process_count: int load_average: List[float] @dataclass class DependencyStatus: """依赖状态数据类""" name: str version: str status: str # 'ok', 'outdated', 'vulnerable', 'missing' latest_version: Optional[str] = None vulnerabilities: List[str] = None last_checked: str = None @dataclass class ServiceHealth: """服务健康状态数据类""" service_name: str status: str # 'running', 'stopped', 'error' response_time_ms: Optional[float] = None error_message: Optional[str] = None last_check: str = None uptime_seconds: Optional[float] = None class EnhancedMonitor: """增强监控系统""" def __init__(self, project_root: str, config_file: str = None): """初始化监控系统 Args: project_root: 项目根目录 config_file: 配置文件路径 """ self.project_root = Path(project_root) self.config_file = config_file or self.project_root / "config" / "monitor_config.json" self.metrics_queue = queue.Queue() self.is_monitoring = False self.monitor_thread = None # 设置日志 self._setup_logging() # 加载配置 self.config = self._load_config() # 初始化监控数据存储 self.metrics_history = [] self.dependency_status = {} self.service_health = {} # 网络基线数据 self.network_baseline = self._get_network_baseline() def _setup_logging(self): """设置日志系统""" log_dir = self.project_root / "logs" log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / "monitor.log"), logging.StreamHandler() ] ) self.logger = logging.getLogger("EnhancedMonitor") def _load_config(self) -> Dict: """加载监控配置 Returns: 配置字典 """ default_config = { "monitoring_interval": 30, # 秒 "dependency_check_interval": 3600, # 秒 "service_check_interval": 60, # 秒 "metrics_retention_hours": 24, "alert_thresholds": { "cpu_percent": 80, "memory_percent": 85, "disk_percent": 90, "response_time_ms": 5000 }, "services_to_monitor": [ { "name": "mcp_server", "type": "http", "url": "http://localhost:8080/health", "timeout": 5 } ], "dependencies_to_monitor": [ "fastmcp", "pandas", "numpy", "openpyxl", "xlsxwriter" ] } if self.config_file.exists(): try: with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) # 合并默认配置 for key, value in default_config.items(): if key not in config: config[key] = value return config except Exception as e: self.logger.warning(f"无法加载配置文件,使用默认配置: {e}") return default_config def _get_network_baseline(self) -> Dict: """获取网络基线数据 Returns: 网络基线数据 """ try: net_io = psutil.net_io_counters() return { 'bytes_sent': net_io.bytes_sent, 'bytes_recv': net_io.bytes_recv, 'timestamp': time.time() } except Exception: return {'bytes_sent': 0, 'bytes_recv': 0, 'timestamp': time.time()} def collect_system_metrics(self) -> SystemMetrics: """收集系统指标 Returns: 系统指标对象 """ try: # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存使用情况 memory = psutil.virtual_memory() memory_used_gb = memory.used / (1024**3) memory_total_gb = memory.total / (1024**3) # 磁盘使用情况 disk = psutil.disk_usage('/') disk_used_gb = disk.used / (1024**3) disk_total_gb = disk.total / (1024**3) # 网络使用情况 net_io = psutil.net_io_counters() time_diff = time.time() - self.network_baseline['timestamp'] if time_diff > 0: network_sent_mb = (net_io.bytes_sent - self.network_baseline['bytes_sent']) / (1024**2) / time_diff network_recv_mb = (net_io.bytes_recv - self.network_baseline['bytes_recv']) / (1024**2) / time_diff else: network_sent_mb = 0 network_recv_mb = 0 # 进程数量 process_count = len(psutil.pids()) # 系统负载 try: load_average = list(psutil.getloadavg()) except AttributeError: # Windows系统不支持getloadavg load_average = [0.0, 0.0, 0.0] return SystemMetrics( timestamp=datetime.now().isoformat(), cpu_percent=cpu_percent, memory_percent=memory.percent, memory_used_gb=round(memory_used_gb, 2), memory_total_gb=round(memory_total_gb, 2), disk_percent=disk.percent, disk_used_gb=round(disk_used_gb, 2), disk_total_gb=round(disk_total_gb, 2), network_sent_mb=round(network_sent_mb, 2), network_recv_mb=round(network_recv_mb, 2), process_count=process_count, load_average=load_average ) except Exception as e: self.logger.error(f"收集系统指标失败: {e}") return None def check_dependency_status(self) -> Dict[str, DependencyStatus]: """检查依赖状态 Returns: 依赖状态字典 """ self.logger.info("检查依赖状态...") dependency_status = {} for dep_name in self.config['dependencies_to_monitor']: try: # 检查当前版本 result = subprocess.run( ['pip', 'show', dep_name], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: # 解析版本信息 lines = result.stdout.split('\n') version = None for line in lines: if line.startswith('Version:'): version = line.split(':', 1)[1].strip() break if version: # 检查最新版本 latest_version = self._get_latest_version(dep_name) # 检查安全漏洞 vulnerabilities = self._check_vulnerabilities(dep_name, version) # 确定状态 if vulnerabilities: status = 'vulnerable' elif latest_version and version != latest_version: status = 'outdated' else: status = 'ok' dependency_status[dep_name] = DependencyStatus( name=dep_name, version=version, status=status, latest_version=latest_version, vulnerabilities=vulnerabilities, last_checked=datetime.now().isoformat() ) else: dependency_status[dep_name] = DependencyStatus( name=dep_name, version='unknown', status='error', last_checked=datetime.now().isoformat() ) else: dependency_status[dep_name] = DependencyStatus( name=dep_name, version='not_installed', status='missing', last_checked=datetime.now().isoformat() ) except Exception as e: self.logger.error(f"检查依赖 {dep_name} 失败: {e}") dependency_status[dep_name] = DependencyStatus( name=dep_name, version='error', status='error', last_checked=datetime.now().isoformat() ) self.dependency_status = dependency_status return dependency_status def _get_latest_version(self, package_name: str) -> Optional[str]: """获取包的最新版本 Args: package_name: 包名 Returns: 最新版本号 """ try: result = subprocess.run( ['pip', 'index', 'versions', package_name], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: # 解析输出获取最新版本 lines = result.stdout.split('\n') for line in lines: if 'Available versions:' in line: versions = line.split(':', 1)[1].strip().split(', ') if versions and versions[0]: return versions[0] except Exception as e: self.logger.debug(f"获取 {package_name} 最新版本失败: {e}") return None def _check_vulnerabilities(self, package_name: str, version: str) -> List[str]: """检查包的安全漏洞 Args: package_name: 包名 version: 版本号 Returns: 漏洞列表 """ try: # 使用safety检查漏洞 result = subprocess.run( ['safety', 'check', '--json'], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: try: safety_data = json.loads(result.stdout) vulnerabilities = [] for vuln in safety_data: if vuln.get('package_name', '').lower() == package_name.lower(): vulnerabilities.append(vuln.get('advisory', 'Unknown vulnerability')) return vulnerabilities except json.JSONDecodeError: pass except Exception as e: self.logger.debug(f"检查 {package_name} 漏洞失败: {e}") return [] def check_service_health(self) -> Dict[str, ServiceHealth]: """检查服务健康状态 Returns: 服务健康状态字典 """ self.logger.info("检查服务健康状态...") service_health = {} for service_config in self.config['services_to_monitor']: service_name = service_config['name'] try: if service_config['type'] == 'http': health = self._check_http_service(service_config) elif service_config['type'] == 'process': health = self._check_process_service(service_config) else: health = ServiceHealth( service_name=service_name, status='error', error_message=f"不支持的服务类型: {service_config['type']}", last_check=datetime.now().isoformat() ) service_health[service_name] = health except Exception as e: self.logger.error(f"检查服务 {service_name} 失败: {e}") service_health[service_name] = ServiceHealth( service_name=service_name, status='error', error_message=str(e), last_check=datetime.now().isoformat() ) self.service_health = service_health return service_health def _check_http_service(self, service_config: Dict) -> ServiceHealth: """检查HTTP服务 Args: service_config: 服务配置 Returns: 服务健康状态 """ import requests service_name = service_config['name'] url = service_config['url'] timeout = service_config.get('timeout', 5) try: start_time = time.time() response = requests.get(url, timeout=timeout) response_time_ms = (time.time() - start_time) * 1000 if response.status_code == 200: status = 'running' error_message = None else: status = 'error' error_message = f"HTTP {response.status_code}" return ServiceHealth( service_name=service_name, status=status, response_time_ms=round(response_time_ms, 2), error_message=error_message, last_check=datetime.now().isoformat() ) except requests.exceptions.RequestException as e: return ServiceHealth( service_name=service_name, status='stopped', error_message=str(e), last_check=datetime.now().isoformat() ) def _check_process_service(self, service_config: Dict) -> ServiceHealth: """检查进程服务 Args: service_config: 服务配置 Returns: 服务健康状态 """ service_name = service_config['name'] process_name = service_config.get('process_name', service_name) try: # 查找进程 for proc in psutil.process_iter(['pid', 'name', 'create_time']): if process_name.lower() in proc.info['name'].lower(): uptime_seconds = time.time() - proc.info['create_time'] return ServiceHealth( service_name=service_name, status='running', uptime_seconds=round(uptime_seconds, 2), last_check=datetime.now().isoformat() ) # 进程未找到 return ServiceHealth( service_name=service_name, status='stopped', error_message="进程未运行", last_check=datetime.now().isoformat() ) except Exception as e: return ServiceHealth( service_name=service_name, status='error', error_message=str(e), last_check=datetime.now().isoformat() ) def check_alerts(self, metrics: SystemMetrics) -> List[Dict]: """检查告警条件 Args: metrics: 系统指标 Returns: 告警列表 """ alerts = [] thresholds = self.config['alert_thresholds'] # CPU告警 if metrics.cpu_percent > thresholds['cpu_percent']: alerts.append({ 'type': 'cpu_high', 'message': f"CPU使用率过高: {metrics.cpu_percent}%", 'severity': 'warning', 'timestamp': metrics.timestamp }) # 内存告警 if metrics.memory_percent > thresholds['memory_percent']: alerts.append({ 'type': 'memory_high', 'message': f"内存使用率过高: {metrics.memory_percent}%", 'severity': 'warning', 'timestamp': metrics.timestamp }) # 磁盘告警 if metrics.disk_percent > thresholds['disk_percent']: alerts.append({ 'type': 'disk_high', 'message': f"磁盘使用率过高: {metrics.disk_percent}%", 'severity': 'critical', 'timestamp': metrics.timestamp }) # 服务响应时间告警 for service_name, health in self.service_health.items(): if health.response_time_ms and health.response_time_ms > thresholds['response_time_ms']: alerts.append({ 'type': 'service_slow', 'message': f"服务 {service_name} 响应时间过长: {health.response_time_ms}ms", 'severity': 'warning', 'timestamp': health.last_check }) # 依赖漏洞告警 for dep_name, status in self.dependency_status.items(): if status.status == 'vulnerable': alerts.append({ 'type': 'dependency_vulnerable', 'message': f"依赖 {dep_name} 存在安全漏洞", 'severity': 'critical', 'timestamp': status.last_checked }) return alerts def start_monitoring(self): """开始监控""" if self.is_monitoring: self.logger.warning("监控已在运行中") return self.is_monitoring = True self.monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True) self.monitor_thread.start() self.logger.info("监控系统已启动") def stop_monitoring(self): """停止监控""" self.is_monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=5) self.logger.info("监控系统已停止") def _monitoring_loop(self): """监控主循环""" last_dependency_check = 0 last_service_check = 0 while self.is_monitoring: try: current_time = time.time() # 收集系统指标 metrics = self.collect_system_metrics() if metrics: self.metrics_history.append(metrics) # 清理旧数据 self._cleanup_old_metrics() # 检查告警 alerts = self.check_alerts(metrics) if alerts: self._handle_alerts(alerts) # 定期检查依赖状态 if current_time - last_dependency_check > self.config['dependency_check_interval']: self.check_dependency_status() last_dependency_check = current_time # 定期检查服务健康状态 if current_time - last_service_check > self.config['service_check_interval']: self.check_service_health() last_service_check = current_time # 等待下次检查 time.sleep(self.config['monitoring_interval']) except Exception as e: self.logger.error(f"监控循环出错: {e}") time.sleep(10) # 出错时等待更长时间 def _cleanup_old_metrics(self): """清理旧的指标数据""" retention_hours = self.config['metrics_retention_hours'] cutoff_time = datetime.now() - timedelta(hours=retention_hours) self.metrics_history = [ metrics for metrics in self.metrics_history if datetime.fromisoformat(metrics.timestamp) > cutoff_time ] def _handle_alerts(self, alerts: List[Dict]): """处理告警 Args: alerts: 告警列表 """ for alert in alerts: self.logger.warning(f"告警: {alert['message']}") # 这里可以添加更多告警处理逻辑,如发送邮件、Slack通知等 def get_monitoring_status(self) -> Dict: """获取监控状态 Returns: 监控状态字典 """ latest_metrics = self.metrics_history[-1] if self.metrics_history else None return { 'is_monitoring': self.is_monitoring, 'latest_metrics': asdict(latest_metrics) if latest_metrics else None, 'dependency_status': {name: asdict(status) for name, status in self.dependency_status.items()}, 'service_health': {name: asdict(health) for name, health in self.service_health.items()}, 'metrics_count': len(self.metrics_history), 'last_update': datetime.now().isoformat() } def generate_monitoring_report(self) -> str: """生成监控报告 Returns: 监控报告内容 """ status = self.get_monitoring_status() report = f""" # 系统监控报告 生成时间: {status['last_update']} 监控状态: {'运行中' if status['is_monitoring'] else '已停止'} 指标数据点: {status['metrics_count']} 个 ## 系统指标概览 """ if status['latest_metrics']: metrics = status['latest_metrics'] report += f""" - **CPU使用率**: {metrics['cpu_percent']}% - **内存使用率**: {metrics['memory_percent']}% ({metrics['memory_used_gb']:.1f}GB / {metrics['memory_total_gb']:.1f}GB) - **磁盘使用率**: {metrics['disk_percent']}% ({metrics['disk_used_gb']:.1f}GB / {metrics['disk_total_gb']:.1f}GB) - **网络流量**: 发送 {metrics['network_sent_mb']:.2f}MB/s, 接收 {metrics['network_recv_mb']:.2f}MB/s - **进程数量**: {metrics['process_count']} - **系统负载**: {', '.join(map(str, metrics['load_average']))} """ report += "## 依赖状态\n\n" for name, dep_status in status['dependency_status'].items(): status_emoji = { 'ok': '✅', 'outdated': '⚠️', 'vulnerable': '🚨', 'missing': '❌', 'error': '💥' }.get(dep_status['status'], '❓') report += f"- {status_emoji} **{name}**: {dep_status['version']} ({dep_status['status']})\n" if dep_status['vulnerabilities']: report += f" - 漏洞: {', '.join(dep_status['vulnerabilities'])}\n" report += "\n## 服务健康状态\n\n" for name, health in status['service_health'].items(): status_emoji = { 'running': '✅', 'stopped': '❌', 'error': '💥' }.get(health['status'], '❓') report += f"- {status_emoji} **{name}**: {health['status']}\n" if health['response_time_ms']: report += f" - 响应时间: {health['response_time_ms']}ms\n" if health['uptime_seconds']: uptime_hours = health['uptime_seconds'] / 3600 report += f" - 运行时间: {uptime_hours:.1f}小时\n" if health['error_message']: report += f" - 错误: {health['error_message']}\n" return report def save_monitoring_report(self, output_file: str = None): """保存监控报告 Args: output_file: 输出文件路径 """ if output_file is None: output_file = self.project_root / "monitoring_report.md" report = self.generate_monitoring_report() with open(output_file, 'w', encoding='utf-8') as f: f.write(report) self.logger.info(f"监控报告已保存到: {output_file}") # 同时保存JSON格式的详细数据 status = self.get_monitoring_status() json_file = str(output_file).replace('.md', '.json') with open(json_file, 'w', encoding='utf-8') as f: json.dump(status, f, indent=2, ensure_ascii=False) self.logger.info(f"详细数据已保存到: {json_file}") def main(): """主函数""" project_root = Path.cwd() print("🚀 启动增强监控系统...") monitor = EnhancedMonitor(str(project_root)) try: # 执行一次完整检查 print("📊 执行系统指标收集...") metrics = monitor.collect_system_metrics() print("🔍 检查依赖状态...") monitor.check_dependency_status() print("🏥 检查服务健康状态...") monitor.check_service_health() # 生成并保存报告 print("📝 生成监控报告...") monitor.save_monitoring_report() # 显示状态摘要 status = monitor.get_monitoring_status() print("\n📊 监控状态摘要:") if status['latest_metrics']: m = status['latest_metrics'] print(f"- CPU: {m['cpu_percent']}%") print(f"- 内存: {m['memory_percent']}%") print(f"- 磁盘: {m['disk_percent']}%") print(f"\n🔗 依赖状态:") for name, dep in status['dependency_status'].items(): print(f"- {name}: {dep['status']}") print(f"\n🏥 服务状态:") for name, health in status['service_health'].items(): print(f"- {name}: {health['status']}") print("\n✅ 监控检查完成!") except KeyboardInterrupt: print("\n⏹️ 监控已停止") except Exception as e: print(f"❌ 监控出错: {e}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server