#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
监控和反馈机制模块
提供系统监控、性能统计、错误收集和反馈功能
"""
import os
import time
import json
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
from datetime import datetime, timedelta
import threading
from collections import defaultdict, deque
# 配置日志
logger = logging.getLogger(__name__)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.enabled = self.config.get('enable_performance_monitoring', True)
self.max_history = self.config.get('max_history_size', 1000)
# 性能指标
self.operation_times = defaultdict(list)
self.operation_counts = defaultdict(int)
self.error_counts = defaultdict(int)
self.operation_history = deque(maxlen=self.max_history)
# 系统资源监控
self.system_stats = {
'start_time': time.time(),
'total_operations': 0,
'total_errors': 0,
'avg_response_time': 0.0,
'peak_memory_usage': 0
}
# 启动后台监控线程
self.monitoring_thread = None
if self.enabled:
self._start_monitoring()
def _start_monitoring(self):
"""启动后台监控线程"""
self.monitoring_thread = threading.Thread(target=self._monitor_system, daemon=True)
self.monitoring_thread.start()
logger.info("性能监控已启动")
def _monitor_system(self):
"""后台系统监控"""
try:
import psutil
process = psutil.Process()
while True:
try:
# 更新系统资源使用情况
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
cpu_usage = process.cpu_percent()
self.system_stats['current_memory_usage'] = memory_usage
self.system_stats['current_cpu_usage'] = cpu_usage
if memory_usage > self.system_stats['peak_memory_usage']:
self.system_stats['peak_memory_usage'] = memory_usage
time.sleep(30) # 每30秒更新一次
except Exception as e:
logger.error(f"系统监控异常: {e}")
time.sleep(60)
except ImportError:
logger.warning("psutil未安装,无法进行系统资源监控")
def record_operation(self, operation_name: str, duration: float, success: bool = True):
"""记录操作性能"""
if not self.enabled:
return
timestamp = time.time()
# 记录操作时间
self.operation_times[operation_name].append(duration)
if len(self.operation_times[operation_name]) > self.max_history:
self.operation_times[operation_name].pop(0)
# 记录操作计数
self.operation_counts[operation_name] += 1
self.system_stats['total_operations'] += 1
if not success:
self.error_counts[operation_name] += 1
self.system_stats['total_errors'] += 1
# 记录操作历史
self.operation_history.append({
'operation': operation_name,
'duration': duration,
'success': success,
'timestamp': timestamp
})
# 更新平均响应时间
self._update_avg_response_time()
def _update_avg_response_time(self):
"""更新平均响应时间"""
all_times = []
for times in self.operation_times.values():
all_times.extend(times)
if all_times:
self.system_stats['avg_response_time'] = sum(all_times) / len(all_times)
def get_operation_stats(self, operation_name: Optional[str] = None) -> Dict[str, Any]:
"""获取操作统计信息"""
if operation_name:
if operation_name not in self.operation_times:
return {}
times = self.operation_times[operation_name]
return {
'operation': operation_name,
'count': self.operation_counts[operation_name],
'errors': self.error_counts[operation_name],
'success_rate': (self.operation_counts[operation_name] - self.error_counts[operation_name]) / max(self.operation_counts[operation_name], 1),
'avg_duration': sum(times) / len(times) if times else 0,
'min_duration': min(times) if times else 0,
'max_duration': max(times) if times else 0,
'total_duration': sum(times)
}
else:
return {
name: self.get_operation_stats(name)
for name in self.operation_times.keys()
}
def get_system_stats(self) -> Dict[str, Any]:
"""获取系统统计信息"""
uptime = time.time() - self.system_stats['start_time']
return {
**self.system_stats,
'uptime': uptime,
'operations_per_second': self.system_stats['total_operations'] / max(uptime, 1),
'error_rate': self.system_stats['total_errors'] / max(self.system_stats['total_operations'], 1),
'unique_operations': len(self.operation_times)
}
def export_stats(self, file_path: str) -> bool:
"""导出统计数据到文件"""
try:
stats = {
'timestamp': datetime.now().isoformat(),
'system_stats': self.get_system_stats(),
'operation_stats': self.get_operation_stats(),
'recent_operations': list(self.operation_history)[-50:] # 最近50个操作
}
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
logger.info(f"统计数据已导出到: {file_path}")
return True
except Exception as e:
logger.error(f"导出统计数据失败: {e}")
return False
class FeedbackCollector:
"""反馈收集器"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.feedback_file = self.config.get('feedback_file', 'feedback.json')
self.feedback_data = self._load_feedback_data()
self.max_feedback = self.config.get('max_feedback_count', 1000)
def _load_feedback_data(self) -> List[Dict[str, Any]]:
"""加载反馈数据"""
try:
if os.path.exists(self.feedback_file):
with open(self.feedback_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载反馈数据失败: {e}")
return []
def _save_feedback_data(self) -> bool:
"""保存反馈数据"""
try:
with open(self.feedback_file, 'w', encoding='utf-8') as f:
json.dump(self.feedback_data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
logger.error(f"保存反馈数据失败: {e}")
return False
def add_feedback(self, feedback_type: str, message: str, severity: str = 'info',
context: Optional[Dict[str, Any]] = None) -> bool:
"""添加反馈"""
try:
feedback_item = {
'id': len(self.feedback_data) + 1,
'type': feedback_type,
'message': message,
'severity': severity,
'timestamp': datetime.now().isoformat(),
'context': context or {}
}
self.feedback_data.append(feedback_item)
# 限制反馈数据数量
if len(self.feedback_data) > self.max_feedback:
self.feedback_data = self.feedback_data[-self.max_feedback:]
return self._save_feedback_data()
except Exception as e:
logger.error(f"添加反馈失败: {e}")
return False
def get_feedback(self, feedback_type: Optional[str] = None,
severity: Optional[str] = None,
limit: int = 50) -> List[Dict[str, Any]]:
"""获取反馈"""
filtered_feedback = self.feedback_data
if feedback_type:
filtered_feedback = [f for f in filtered_feedback if f['type'] == feedback_type]
if severity:
filtered_feedback = [f for f in filtered_feedback if f['severity'] == severity]
# 按时间倒序排列
filtered_feedback.sort(key=lambda x: x['timestamp'], reverse=True)
return filtered_feedback[:limit]
def get_feedback_summary(self) -> Dict[str, Any]:
"""获取反馈摘要"""
total = len(self.feedback_data)
if total == 0:
return {'total': 0}
type_counts = defaultdict(int)
severity_counts = defaultdict(int)
for feedback in self.feedback_data:
type_counts[feedback['type']] += 1
severity_counts[feedback['severity']] += 1
return {
'total': total,
'by_type': dict(type_counts),
'by_severity': dict(severity_counts),
'latest_feedback': self.feedback_data[-1] if self.feedback_data else None
}
class AlertManager:
"""告警管理器"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.alert_rules = self.config.get('alert_rules', [])
self.active_alerts = []
self.alert_history = []
def add_alert_rule(self, name: str, condition: str, threshold: float,
severity: str = 'warning', message: str = ""):
"""添加告警规则"""
rule = {
'name': name,
'condition': condition,
'threshold': threshold,
'severity': severity,
'message': message or f"{name} 超过阈值 {threshold}",
'enabled': True
}
self.alert_rules.append(rule)
def check_alerts(self, stats: Dict[str, Any]) -> List[Dict[str, Any]]:
"""检查告警条件"""
new_alerts = []
for rule in self.alert_rules:
if not rule['enabled']:
continue
try:
# 获取指标值
metric_value = self._get_metric_value(stats, rule['condition'])
# 检查阈值
if metric_value > rule['threshold']:
alert = {
'id': len(self.alert_history) + len(new_alerts) + 1,
'rule_name': rule['name'],
'severity': rule['severity'],
'message': rule['message'],
'metric': rule['condition'],
'value': metric_value,
'threshold': rule['threshold'],
'timestamp': datetime.now().isoformat()
}
new_alerts.append(alert)
self.active_alerts.append(alert)
except Exception as e:
logger.error(f"检查告警规则失败: {e}")
return new_alerts
def _get_metric_value(self, stats: Dict[str, Any], metric_path: str) -> float:
"""获取指标值"""
keys = metric_path.split('.')
value = stats
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return 0.0
return float(value) if isinstance(value, (int, float)) else 0.0
def get_active_alerts(self) -> List[Dict[str, Any]]:
"""获取活跃告警"""
return self.active_alerts.copy()
def acknowledge_alert(self, alert_id: int) -> bool:
"""确认告警"""
for i, alert in enumerate(self.active_alerts):
if alert['id'] == alert_id:
alert['acknowledged'] = True
alert['acknowledged_at'] = datetime.now().isoformat()
# 移动到历史记录
self.alert_history.append(alert)
self.active_alerts.pop(i)
return True
return False
# 全局实例
_performance_monitor: Optional[PerformanceMonitor] = None
_feedback_collector: Optional[FeedbackCollector] = None
_alert_manager: Optional[AlertManager] = None
def get_performance_monitor(config: Optional[Dict[str, Any]] = None) -> PerformanceMonitor:
"""获取性能监控器实例"""
global _performance_monitor
if _performance_monitor is None:
_performance_monitor = PerformanceMonitor(config)
return _performance_monitor
def get_feedback_collector(config: Optional[Dict[str, Any]] = None) -> FeedbackCollector:
"""获取反馈收集器实例"""
global _feedback_collector
if _feedback_collector is None:
_feedback_collector = FeedbackCollector(config)
return _feedback_collector
def get_alert_manager(config: Optional[Dict[str, Any]] = None) -> AlertManager:
"""获取告警管理器实例"""
global _alert_manager
if _alert_manager is None:
_alert_manager = AlertManager(config)
# 添加默认告警规则
_alert_manager.add_alert_rule(
"高错误率",
"error_rate",
0.1, # 10%
"warning",
"系统错误率过高,请检查日志"
)
_alert_manager.add_alert_rule(
"高响应时间",
"avg_response_time",
5.0, # 5秒
"warning",
"平均响应时间过长,请检查性能"
)
return _alert_manager
# 便捷函数
def record_operation(operation_name: str, duration: float, success: bool = True):
"""记录操作性能的便捷函数"""
get_performance_monitor().record_operation(operation_name, duration, success)
def add_feedback(feedback_type: str, message: str, severity: str = 'info',
context: Optional[Dict[str, Any]] = None) -> bool:
"""添加反馈的便捷函数"""
return get_feedback_collector().add_feedback(feedback_type, message, severity, context)