"""
监控指标模块
提供系统性能监控和指标收集功能
"""
import time
import psutil
import threading
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from collections import deque
import json
import logging
from pathlib import Path
from ..utils import get_system_info
@dataclass
class MetricValue:
"""指标值"""
name: str
value: float
timestamp: float
labels: Dict[str, str]
def __post_init__(self):
if self.labels is None:
self.labels = {}
if self.timestamp == 0:
self.timestamp = time.time()
@dataclass
class SystemMetrics:
"""系统指标"""
cpu_percent: float
memory_percent: float
disk_usage_percent: float
network_io: Dict[str, int]
process_count: int
timestamp: float
@classmethod
def collect(cls) -> 'SystemMetrics':
"""收集系统指标"""
return cls(
cpu_percent=psutil.cpu_percent(interval=1),
memory_percent=psutil.virtual_memory().percent,
disk_usage_percent=psutil.disk_usage('/').percent,
network_io={
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_recv': psutil.net_io_counters().bytes_recv,
'packets_sent': psutil.net_io_counters().packets_sent,
'packets_recv': psutil.net_io_counters().packets_recv
},
process_count=len(psutil.pids()),
timestamp=time.time()
)
@dataclass
class ApplicationMetrics:
"""应用指标"""
requests_total: int
requests_successful: int
requests_failed: int
average_response_time: float
active_connections: int
cache_hits: int
cache_misses: int
files_processed: int
errors_total: int
timestamp: float
def __post_init__(self):
if self.timestamp == 0:
self.timestamp = time.time()
class MetricsCollector:
"""指标收集器"""
def __init__(self, collection_interval: int = 30):
self.collection_interval = collection_interval
self.metrics_history: Dict[str, deque] = {}
self.max_history_size = 1000
self.logger = logging.getLogger(__name__)
self.is_collecting = False
self.collection_thread: Optional[threading.Thread] = None
# 应用指标
self.app_metrics = ApplicationMetrics(
requests_total=0,
requests_successful=0,
requests_failed=0,
average_response_time=0.0,
active_connections=0,
cache_hits=0,
cache_misses=0,
files_processed=0,
errors_total=0
)
# 指标回调
self.metric_callbacks: List[callable] = []
def start_collection(self) -> None:
"""开始指标收集"""
if self.is_collecting:
return
self.is_collecting = True
self.collection_thread = threading.Thread(
target=self._collection_loop,
daemon=True
)
self.collection_thread.start()
self.logger.info(f"指标收集已启动,间隔: {self.collection_interval}秒")
def stop_collection(self) -> None:
"""停止指标收集"""
self.is_collecting = False
if self.collection_thread:
self.collection_thread.join(timeout=5)
self.logger.info("指标收集已停止")
def _collection_loop(self) -> None:
"""收集循环"""
while self.is_collecting:
try:
# 收集系统指标
system_metrics = SystemMetrics.collect()
self._add_metric("system_cpu", system_metrics.cpu_percent)
self._add_metric("system_memory", system_metrics.memory_percent)
self._add_metric("system_disk", system_metrics.disk_usage_percent)
self._add_metric("system_processes", system_metrics.process_count)
# 收集应用指标
self._add_metric("app_requests_total", self.app_metrics.requests_total)
self._add_metric("app_requests_successful", self.app_metrics.requests_successful)
self._add_metric("app_requests_failed", self.app_metrics.requests_failed)
self._add_metric("app_response_time", self.app_metrics.average_response_time)
self._add_metric("app_active_connections", self.app_metrics.active_connections)
self._add_metric("app_cache_hits", self.app_metrics.cache_hits)
self._add_metric("app_cache_misses", self.app_metrics.cache_misses)
self._add_metric("app_files_processed", self.app_metrics.files_processed)
self._add_metric("app_errors_total", self.app_metrics.errors_total)
# 调用指标回调
for callback in self.metric_callbacks:
try:
callback(system_metrics, self.app_metrics)
except Exception as e:
self.logger.error(f"指标回调执行失败: {e}")
# 等待下一次收集
time.sleep(self.collection_interval)
except Exception as e:
self.logger.error(f"指标收集出错: {e}")
time.sleep(self.collection_interval)
def _add_metric(self, name: str, value: float) -> None:
"""添加指标值"""
if name not in self.metrics_history:
self.metrics_history[name] = deque(maxlen=self.max_history_size)
metric = MetricValue(
name=name,
value=value,
timestamp=time.time(),
labels={}
)
self.metrics_history[name].append(metric)
def record_request(self, success: bool, response_time: float) -> None:
"""记录请求"""
self.app_metrics.requests_total += 1
if success:
self.app_metrics.requests_successful += 1
else:
self.app_metrics.requests_failed += 1
# 更新平均响应时间
if self.app_metrics.requests_total > 0:
self.app_metrics.average_response_time = (
(self.app_metrics.average_response_time * (self.app_metrics.requests_total - 1) + response_time) /
self.app_metrics.requests_total
)
def record_cache_hit(self) -> None:
"""记录缓存命中"""
self.app_metrics.cache_hits += 1
def record_cache_miss(self) -> None:
"""记录缓存未命中"""
self.app_metrics.cache_misses += 1
def record_file_processed(self) -> None:
"""记录文件处理"""
self.app_metrics.files_processed += 1
def record_error(self) -> None:
"""记录错误"""
self.app_metrics.errors_total += 1
def set_active_connections(self, count: int) -> None:
"""设置活跃连接数"""
self.app_metrics.active_connections = count
def get_metrics_summary(self) -> Dict[str, Any]:
"""获取指标摘要"""
summary = {
"timestamp": time.time(),
"system": {},
"application": asdict(self.app_metrics)
}
# 系统指标摘要
for name in ["system_cpu", "system_memory", "system_disk"]:
if name in self.metrics_history and self.metrics_history[name]:
values = [m.value for m in self.metrics_history[name]]
summary["system"][name.replace("system_", "")] = {
"current": values[-1] if values else 0,
"average": sum(values) / len(values) if values else 0,
"min": min(values) if values else 0,
"max": max(values) if values else 0
}
return summary
def get_metrics_history(self, metric_name: str, limit: int = 100) -> List[MetricValue]:
"""获取指标历史"""
if metric_name not in self.metrics_history:
return []
history = list(self.metrics_history[metric_name])
return history[-limit:] if limit > 0 else history
def register_callback(self, callback: callable) -> None:
"""注册指标回调"""
self.metric_callbacks.append(callback)
def export_metrics(self, format: str = "json") -> str:
"""导出指标"""
if format == "json":
return self._export_json()
elif format == "prometheus":
return self._export_prometheus()
else:
raise ValueError(f"不支持的导出格式: {format}")
def _export_json(self) -> str:
"""导出JSON格式"""
summary = self.get_metrics_summary()
return json.dumps(summary, ensure_ascii=False, indent=2)
def _export_prometheus(self) -> str:
"""导出Prometheus格式"""
lines = []
# 系统指标
for name in ["system_cpu", "system_memory", "system_disk"]:
if name in self.metrics_history and self.metrics_history[name]:
latest = self.metrics_history[name][-1]
prometheus_name = name.replace("system_", "folder_docs_system_")
lines.append(f"{prometheus_name} {latest.value}")
# 应用指标
lines.extend([
f"folder_docs_requests_total {self.app_metrics.requests_total}",
f"folder_docs_requests_successful {self.app_metrics.requests_successful}",
f"folder_docs_requests_failed {self.app_metrics.requests_failed}",
f"folder_docs_response_time {self.app_metrics.average_response_time}",
f"folder_docs_active_connections {self.app_metrics.active_connections}",
f"folder_docs_cache_hits {self.app_metrics.cache_hits}",
f"folder_docs_cache_misses {self.app_metrics.cache_misses}",
f"folder_docs_files_processed {self.app_metrics.files_processed}",
f"folder_docs_errors_total {self.app_metrics.errors_total}"
])
return "\n".join(lines)
class AlertManager:
"""告警管理器"""
def __init__(self, collector: MetricsCollector):
self.collector = collector
self.alert_rules = []
self.active_alerts = []
self.alert_callbacks: List[callable] = []
self.logger = logging.getLogger(__name__)
# 注册指标回调
self.collector.register_callback(self._check_alerts)
# 默认告警规则
self._setup_default_rules()
def _setup_default_rules(self) -> None:
"""设置默认告警规则"""
self.alert_rules = [
{
"name": "high_cpu_usage",
"metric": "system_cpu",
"condition": "gt",
"threshold": 80.0,
"duration": 300, # 5分钟
"severity": "warning",
"message": "CPU使用率过高"
},
{
"name": "high_memory_usage",
"metric": "system_memory",
"condition": "gt",
"threshold": 85.0,
"duration": 300,
"severity": "warning",
"message": "内存使用率过高"
},
{
"name": "high_error_rate",
"metric": "app_requests_failed",
"condition": "rate_gt",
"threshold": 0.1, # 10%错误率
"duration": 600, # 10分钟
"severity": "critical",
"message": "错误率过高"
},
{
"name": "low_cache_hit_rate",
"metric": "app_cache_hit_rate",
"condition": "lt",
"threshold": 0.7, # 70%命中率
"duration": 1800, # 30分钟
"severity": "info",
"message": "缓存命中率过低"
}
]
def _check_alerts(self, system_metrics: SystemMetrics, app_metrics: ApplicationMetrics) -> None:
"""检查告警条件"""
current_time = time.time()
for rule in self.alert_rules:
try:
if self._evaluate_rule(rule, current_time):
self._trigger_alert(rule, current_time)
else:
self._resolve_alert(rule["name"], current_time)
except Exception as e:
self.logger.error(f"告警规则 {rule['name']} 评估失败: {e}")
def _evaluate_rule(self, rule: Dict[str, Any], current_time: float) -> bool:
"""评估告警规则"""
metric_name = rule["metric"]
condition = rule["condition"]
threshold = rule["threshold"]
duration = rule.get("duration", 0)
# 特殊处理需要计算的指标
if metric_name == "app_cache_hit_rate":
total_cache = app_metrics.cache_hits + app_metrics.cache_misses
if total_cache == 0:
current_value = 0.0
else:
current_value = app_metrics.cache_hits / total_cache
elif condition == "rate_gt":
# 错误率
total_requests = app_metrics.requests_total
if total_requests == 0:
current_value = 0.0
else:
current_value = app_metrics.requests_failed / total_requests
else:
# 获取当前指标值
current_value = getattr(app_metrics, metric_name, None)
if current_value is None:
current_value = getattr(system_metrics, metric_name, 0)
# 评估条件
if condition == "gt":
return current_value > threshold
elif condition == "lt":
return current_value < threshold
elif condition == "rate_gt":
return current_value > threshold
return False
def _trigger_alert(self, rule: Dict[str, Any], timestamp: float) -> None:
"""触发告警"""
alert_id = rule["name"]
# 检查是否已经存在活跃告警
for alert in self.active_alerts:
if alert["id"] == alert_id:
return # 告警已存在
# 创建新告警
alert = {
"id": alert_id,
"rule": rule["name"],
"severity": rule["severity"],
"message": rule["message"],
"triggered_at": timestamp,
"resolved_at": None,
"status": "active"
}
self.active_alerts.append(alert)
self.logger.warning(f"告警触发: {rule['message']}")
# 调用告警回调
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
self.logger.error(f"告警回调执行失败: {e}")
def _resolve_alert(self, rule_name: str, timestamp: float) -> None:
"""解决告警"""
for alert in self.active_alerts:
if alert["id"] == rule_name and alert["status"] == "active":
alert["status"] = "resolved"
alert["resolved_at"] = timestamp
self.logger.info(f"告警已解决: {alert['message']}")
break
def get_active_alerts(self) -> List[Dict[str, Any]]:
"""获取活跃告警"""
return [alert for alert in self.active_alerts if alert["status"] == "active"]
def get_alert_history(self, limit: int = 100) -> List[Dict[str, Any]]:
"""获取告警历史"""
history = sorted(self.active_alerts, key=lambda x: x["triggered_at"], reverse=True)
return history[:limit] if limit > 0 else history
def register_alert_callback(self, callback: callable) -> None:
"""注册告警回调"""
self.alert_callbacks.append(callback)
# 全局指标收集器实例
global_metrics_collector = MetricsCollector()
global_alert_manager = AlertManager(global_metrics_collector)
def start_metrics_collection(interval: int = 30) -> None:
"""启动指标收集"""
global_metrics_collector.start_collection()
def stop_metrics_collection() -> None:
"""停止指标收集"""
global_metrics_collector.stop_collection()
def record_request(success: bool, response_time: float) -> None:
"""记录请求"""
global_metrics_collector.record_request(success, response_time)
def record_cache_hit() -> None:
"""记录缓存命中"""
global_metrics_collector.record_cache_hit()
def record_cache_miss() -> None:
"""记录缓存未命中"""
global_metrics_collector.record_cache_miss()
def get_metrics_summary() -> Dict[str, Any]:
"""获取指标摘要"""
return global_metrics_collector.get_metrics_summary()
def get_active_alerts() -> List[Dict[str, Any]]:
"""获取活跃告警"""
return global_alert_manager.get_active_alerts()