Skip to main content
Glama
metrics.py17.3 kB
""" 监控指标模块 提供系统性能监控和指标收集功能 """ import time import psutil import threading from typing import Dict, Any, Optional, List from dataclasses import dataclass, asdict from collections import deque import json import logging from pathlib import Path from ..utils import get_system_info @dataclass class MetricValue: """指标值""" name: str value: float timestamp: float labels: Dict[str, str] def __post_init__(self): if self.labels is None: self.labels = {} if self.timestamp == 0: self.timestamp = time.time() @dataclass class SystemMetrics: """系统指标""" cpu_percent: float memory_percent: float disk_usage_percent: float network_io: Dict[str, int] process_count: int timestamp: float @classmethod def collect(cls) -> 'SystemMetrics': """收集系统指标""" return cls( cpu_percent=psutil.cpu_percent(interval=1), memory_percent=psutil.virtual_memory().percent, disk_usage_percent=psutil.disk_usage('/').percent, network_io={ 'bytes_sent': psutil.net_io_counters().bytes_sent, 'bytes_recv': psutil.net_io_counters().bytes_recv, 'packets_sent': psutil.net_io_counters().packets_sent, 'packets_recv': psutil.net_io_counters().packets_recv }, process_count=len(psutil.pids()), timestamp=time.time() ) @dataclass class ApplicationMetrics: """应用指标""" requests_total: int requests_successful: int requests_failed: int average_response_time: float active_connections: int cache_hits: int cache_misses: int files_processed: int errors_total: int timestamp: float def __post_init__(self): if self.timestamp == 0: self.timestamp = time.time() class MetricsCollector: """指标收集器""" def __init__(self, collection_interval: int = 30): self.collection_interval = collection_interval self.metrics_history: Dict[str, deque] = {} self.max_history_size = 1000 self.logger = logging.getLogger(__name__) self.is_collecting = False self.collection_thread: Optional[threading.Thread] = None # 应用指标 self.app_metrics = ApplicationMetrics( requests_total=0, requests_successful=0, requests_failed=0, average_response_time=0.0, active_connections=0, cache_hits=0, cache_misses=0, files_processed=0, errors_total=0 ) # 指标回调 self.metric_callbacks: List[callable] = [] def start_collection(self) -> None: """开始指标收集""" if self.is_collecting: return self.is_collecting = True self.collection_thread = threading.Thread( target=self._collection_loop, daemon=True ) self.collection_thread.start() self.logger.info(f"指标收集已启动,间隔: {self.collection_interval}秒") def stop_collection(self) -> None: """停止指标收集""" self.is_collecting = False if self.collection_thread: self.collection_thread.join(timeout=5) self.logger.info("指标收集已停止") def _collection_loop(self) -> None: """收集循环""" while self.is_collecting: try: # 收集系统指标 system_metrics = SystemMetrics.collect() self._add_metric("system_cpu", system_metrics.cpu_percent) self._add_metric("system_memory", system_metrics.memory_percent) self._add_metric("system_disk", system_metrics.disk_usage_percent) self._add_metric("system_processes", system_metrics.process_count) # 收集应用指标 self._add_metric("app_requests_total", self.app_metrics.requests_total) self._add_metric("app_requests_successful", self.app_metrics.requests_successful) self._add_metric("app_requests_failed", self.app_metrics.requests_failed) self._add_metric("app_response_time", self.app_metrics.average_response_time) self._add_metric("app_active_connections", self.app_metrics.active_connections) self._add_metric("app_cache_hits", self.app_metrics.cache_hits) self._add_metric("app_cache_misses", self.app_metrics.cache_misses) self._add_metric("app_files_processed", self.app_metrics.files_processed) self._add_metric("app_errors_total", self.app_metrics.errors_total) # 调用指标回调 for callback in self.metric_callbacks: try: callback(system_metrics, self.app_metrics) except Exception as e: self.logger.error(f"指标回调执行失败: {e}") # 等待下一次收集 time.sleep(self.collection_interval) except Exception as e: self.logger.error(f"指标收集出错: {e}") time.sleep(self.collection_interval) def _add_metric(self, name: str, value: float) -> None: """添加指标值""" if name not in self.metrics_history: self.metrics_history[name] = deque(maxlen=self.max_history_size) metric = MetricValue( name=name, value=value, timestamp=time.time(), labels={} ) self.metrics_history[name].append(metric) def record_request(self, success: bool, response_time: float) -> None: """记录请求""" self.app_metrics.requests_total += 1 if success: self.app_metrics.requests_successful += 1 else: self.app_metrics.requests_failed += 1 # 更新平均响应时间 if self.app_metrics.requests_total > 0: self.app_metrics.average_response_time = ( (self.app_metrics.average_response_time * (self.app_metrics.requests_total - 1) + response_time) / self.app_metrics.requests_total ) def record_cache_hit(self) -> None: """记录缓存命中""" self.app_metrics.cache_hits += 1 def record_cache_miss(self) -> None: """记录缓存未命中""" self.app_metrics.cache_misses += 1 def record_file_processed(self) -> None: """记录文件处理""" self.app_metrics.files_processed += 1 def record_error(self) -> None: """记录错误""" self.app_metrics.errors_total += 1 def set_active_connections(self, count: int) -> None: """设置活跃连接数""" self.app_metrics.active_connections = count def get_metrics_summary(self) -> Dict[str, Any]: """获取指标摘要""" summary = { "timestamp": time.time(), "system": {}, "application": asdict(self.app_metrics) } # 系统指标摘要 for name in ["system_cpu", "system_memory", "system_disk"]: if name in self.metrics_history and self.metrics_history[name]: values = [m.value for m in self.metrics_history[name]] summary["system"][name.replace("system_", "")] = { "current": values[-1] if values else 0, "average": sum(values) / len(values) if values else 0, "min": min(values) if values else 0, "max": max(values) if values else 0 } return summary def get_metrics_history(self, metric_name: str, limit: int = 100) -> List[MetricValue]: """获取指标历史""" if metric_name not in self.metrics_history: return [] history = list(self.metrics_history[metric_name]) return history[-limit:] if limit > 0 else history def register_callback(self, callback: callable) -> None: """注册指标回调""" self.metric_callbacks.append(callback) def export_metrics(self, format: str = "json") -> str: """导出指标""" if format == "json": return self._export_json() elif format == "prometheus": return self._export_prometheus() else: raise ValueError(f"不支持的导出格式: {format}") def _export_json(self) -> str: """导出JSON格式""" summary = self.get_metrics_summary() return json.dumps(summary, ensure_ascii=False, indent=2) def _export_prometheus(self) -> str: """导出Prometheus格式""" lines = [] # 系统指标 for name in ["system_cpu", "system_memory", "system_disk"]: if name in self.metrics_history and self.metrics_history[name]: latest = self.metrics_history[name][-1] prometheus_name = name.replace("system_", "folder_docs_system_") lines.append(f"{prometheus_name} {latest.value}") # 应用指标 lines.extend([ f"folder_docs_requests_total {self.app_metrics.requests_total}", f"folder_docs_requests_successful {self.app_metrics.requests_successful}", f"folder_docs_requests_failed {self.app_metrics.requests_failed}", f"folder_docs_response_time {self.app_metrics.average_response_time}", f"folder_docs_active_connections {self.app_metrics.active_connections}", f"folder_docs_cache_hits {self.app_metrics.cache_hits}", f"folder_docs_cache_misses {self.app_metrics.cache_misses}", f"folder_docs_files_processed {self.app_metrics.files_processed}", f"folder_docs_errors_total {self.app_metrics.errors_total}" ]) return "\n".join(lines) class AlertManager: """告警管理器""" def __init__(self, collector: MetricsCollector): self.collector = collector self.alert_rules = [] self.active_alerts = [] self.alert_callbacks: List[callable] = [] self.logger = logging.getLogger(__name__) # 注册指标回调 self.collector.register_callback(self._check_alerts) # 默认告警规则 self._setup_default_rules() def _setup_default_rules(self) -> None: """设置默认告警规则""" self.alert_rules = [ { "name": "high_cpu_usage", "metric": "system_cpu", "condition": "gt", "threshold": 80.0, "duration": 300, # 5分钟 "severity": "warning", "message": "CPU使用率过高" }, { "name": "high_memory_usage", "metric": "system_memory", "condition": "gt", "threshold": 85.0, "duration": 300, "severity": "warning", "message": "内存使用率过高" }, { "name": "high_error_rate", "metric": "app_requests_failed", "condition": "rate_gt", "threshold": 0.1, # 10%错误率 "duration": 600, # 10分钟 "severity": "critical", "message": "错误率过高" }, { "name": "low_cache_hit_rate", "metric": "app_cache_hit_rate", "condition": "lt", "threshold": 0.7, # 70%命中率 "duration": 1800, # 30分钟 "severity": "info", "message": "缓存命中率过低" } ] def _check_alerts(self, system_metrics: SystemMetrics, app_metrics: ApplicationMetrics) -> None: """检查告警条件""" current_time = time.time() for rule in self.alert_rules: try: if self._evaluate_rule(rule, current_time): self._trigger_alert(rule, current_time) else: self._resolve_alert(rule["name"], current_time) except Exception as e: self.logger.error(f"告警规则 {rule['name']} 评估失败: {e}") def _evaluate_rule(self, rule: Dict[str, Any], current_time: float) -> bool: """评估告警规则""" metric_name = rule["metric"] condition = rule["condition"] threshold = rule["threshold"] duration = rule.get("duration", 0) # 特殊处理需要计算的指标 if metric_name == "app_cache_hit_rate": total_cache = app_metrics.cache_hits + app_metrics.cache_misses if total_cache == 0: current_value = 0.0 else: current_value = app_metrics.cache_hits / total_cache elif condition == "rate_gt": # 错误率 total_requests = app_metrics.requests_total if total_requests == 0: current_value = 0.0 else: current_value = app_metrics.requests_failed / total_requests else: # 获取当前指标值 current_value = getattr(app_metrics, metric_name, None) if current_value is None: current_value = getattr(system_metrics, metric_name, 0) # 评估条件 if condition == "gt": return current_value > threshold elif condition == "lt": return current_value < threshold elif condition == "rate_gt": return current_value > threshold return False def _trigger_alert(self, rule: Dict[str, Any], timestamp: float) -> None: """触发告警""" alert_id = rule["name"] # 检查是否已经存在活跃告警 for alert in self.active_alerts: if alert["id"] == alert_id: return # 告警已存在 # 创建新告警 alert = { "id": alert_id, "rule": rule["name"], "severity": rule["severity"], "message": rule["message"], "triggered_at": timestamp, "resolved_at": None, "status": "active" } self.active_alerts.append(alert) self.logger.warning(f"告警触发: {rule['message']}") # 调用告警回调 for callback in self.alert_callbacks: try: callback(alert) except Exception as e: self.logger.error(f"告警回调执行失败: {e}") def _resolve_alert(self, rule_name: str, timestamp: float) -> None: """解决告警""" for alert in self.active_alerts: if alert["id"] == rule_name and alert["status"] == "active": alert["status"] = "resolved" alert["resolved_at"] = timestamp self.logger.info(f"告警已解决: {alert['message']}") break def get_active_alerts(self) -> List[Dict[str, Any]]: """获取活跃告警""" return [alert for alert in self.active_alerts if alert["status"] == "active"] def get_alert_history(self, limit: int = 100) -> List[Dict[str, Any]]: """获取告警历史""" history = sorted(self.active_alerts, key=lambda x: x["triggered_at"], reverse=True) return history[:limit] if limit > 0 else history def register_alert_callback(self, callback: callable) -> None: """注册告警回调""" self.alert_callbacks.append(callback) # 全局指标收集器实例 global_metrics_collector = MetricsCollector() global_alert_manager = AlertManager(global_metrics_collector) def start_metrics_collection(interval: int = 30) -> None: """启动指标收集""" global_metrics_collector.start_collection() def stop_metrics_collection() -> None: """停止指标收集""" global_metrics_collector.stop_collection() def record_request(success: bool, response_time: float) -> None: """记录请求""" global_metrics_collector.record_request(success, response_time) def record_cache_hit() -> None: """记录缓存命中""" global_metrics_collector.record_cache_hit() def record_cache_miss() -> None: """记录缓存未命中""" global_metrics_collector.record_cache_miss() def get_metrics_summary() -> Dict[str, Any]: """获取指标摘要""" return global_metrics_collector.get_metrics_summary() def get_active_alerts() -> List[Dict[str, Any]]: """获取活跃告警""" return global_alert_manager.get_active_alerts()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server