Skip to main content
Glama
__init__.py14.9 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 监控和反馈机制模块 提供系统监控、性能统计、错误收集和反馈功能 """ import os import time import json import logging from typing import Dict, Any, List, Optional from pathlib import Path from datetime import datetime, timedelta import threading from collections import defaultdict, deque # 配置日志 logger = logging.getLogger(__name__) class PerformanceMonitor: """性能监控器""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.enabled = self.config.get('enable_performance_monitoring', True) self.max_history = self.config.get('max_history_size', 1000) # 性能指标 self.operation_times = defaultdict(list) self.operation_counts = defaultdict(int) self.error_counts = defaultdict(int) self.operation_history = deque(maxlen=self.max_history) # 系统资源监控 self.system_stats = { 'start_time': time.time(), 'total_operations': 0, 'total_errors': 0, 'avg_response_time': 0.0, 'peak_memory_usage': 0 } # 启动后台监控线程 self.monitoring_thread = None if self.enabled: self._start_monitoring() def _start_monitoring(self): """启动后台监控线程""" self.monitoring_thread = threading.Thread(target=self._monitor_system, daemon=True) self.monitoring_thread.start() logger.info("性能监控已启动") def _monitor_system(self): """后台系统监控""" try: import psutil process = psutil.Process() while True: try: # 更新系统资源使用情况 memory_usage = process.memory_info().rss / 1024 / 1024 # MB cpu_usage = process.cpu_percent() self.system_stats['current_memory_usage'] = memory_usage self.system_stats['current_cpu_usage'] = cpu_usage if memory_usage > self.system_stats['peak_memory_usage']: self.system_stats['peak_memory_usage'] = memory_usage time.sleep(30) # 每30秒更新一次 except Exception as e: logger.error(f"系统监控异常: {e}") time.sleep(60) except ImportError: logger.warning("psutil未安装,无法进行系统资源监控") def record_operation(self, operation_name: str, duration: float, success: bool = True): """记录操作性能""" if not self.enabled: return timestamp = time.time() # 记录操作时间 self.operation_times[operation_name].append(duration) if len(self.operation_times[operation_name]) > self.max_history: self.operation_times[operation_name].pop(0) # 记录操作计数 self.operation_counts[operation_name] += 1 self.system_stats['total_operations'] += 1 if not success: self.error_counts[operation_name] += 1 self.system_stats['total_errors'] += 1 # 记录操作历史 self.operation_history.append({ 'operation': operation_name, 'duration': duration, 'success': success, 'timestamp': timestamp }) # 更新平均响应时间 self._update_avg_response_time() def _update_avg_response_time(self): """更新平均响应时间""" all_times = [] for times in self.operation_times.values(): all_times.extend(times) if all_times: self.system_stats['avg_response_time'] = sum(all_times) / len(all_times) def get_operation_stats(self, operation_name: Optional[str] = None) -> Dict[str, Any]: """获取操作统计信息""" if operation_name: if operation_name not in self.operation_times: return {} times = self.operation_times[operation_name] return { 'operation': operation_name, 'count': self.operation_counts[operation_name], 'errors': self.error_counts[operation_name], 'success_rate': (self.operation_counts[operation_name] - self.error_counts[operation_name]) / max(self.operation_counts[operation_name], 1), 'avg_duration': sum(times) / len(times) if times else 0, 'min_duration': min(times) if times else 0, 'max_duration': max(times) if times else 0, 'total_duration': sum(times) } else: return { name: self.get_operation_stats(name) for name in self.operation_times.keys() } def get_system_stats(self) -> Dict[str, Any]: """获取系统统计信息""" uptime = time.time() - self.system_stats['start_time'] return { **self.system_stats, 'uptime': uptime, 'operations_per_second': self.system_stats['total_operations'] / max(uptime, 1), 'error_rate': self.system_stats['total_errors'] / max(self.system_stats['total_operations'], 1), 'unique_operations': len(self.operation_times) } def export_stats(self, file_path: str) -> bool: """导出统计数据到文件""" try: stats = { 'timestamp': datetime.now().isoformat(), 'system_stats': self.get_system_stats(), 'operation_stats': self.get_operation_stats(), 'recent_operations': list(self.operation_history)[-50:] # 最近50个操作 } Path(file_path).parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2, ensure_ascii=False) logger.info(f"统计数据已导出到: {file_path}") return True except Exception as e: logger.error(f"导出统计数据失败: {e}") return False class FeedbackCollector: """反馈收集器""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.feedback_file = self.config.get('feedback_file', 'feedback.json') self.feedback_data = self._load_feedback_data() self.max_feedback = self.config.get('max_feedback_count', 1000) def _load_feedback_data(self) -> List[Dict[str, Any]]: """加载反馈数据""" try: if os.path.exists(self.feedback_file): with open(self.feedback_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"加载反馈数据失败: {e}") return [] def _save_feedback_data(self) -> bool: """保存反馈数据""" try: with open(self.feedback_file, 'w', encoding='utf-8') as f: json.dump(self.feedback_data, f, indent=2, ensure_ascii=False) return True except Exception as e: logger.error(f"保存反馈数据失败: {e}") return False def add_feedback(self, feedback_type: str, message: str, severity: str = 'info', context: Optional[Dict[str, Any]] = None) -> bool: """添加反馈""" try: feedback_item = { 'id': len(self.feedback_data) + 1, 'type': feedback_type, 'message': message, 'severity': severity, 'timestamp': datetime.now().isoformat(), 'context': context or {} } self.feedback_data.append(feedback_item) # 限制反馈数据数量 if len(self.feedback_data) > self.max_feedback: self.feedback_data = self.feedback_data[-self.max_feedback:] return self._save_feedback_data() except Exception as e: logger.error(f"添加反馈失败: {e}") return False def get_feedback(self, feedback_type: Optional[str] = None, severity: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]: """获取反馈""" filtered_feedback = self.feedback_data if feedback_type: filtered_feedback = [f for f in filtered_feedback if f['type'] == feedback_type] if severity: filtered_feedback = [f for f in filtered_feedback if f['severity'] == severity] # 按时间倒序排列 filtered_feedback.sort(key=lambda x: x['timestamp'], reverse=True) return filtered_feedback[:limit] def get_feedback_summary(self) -> Dict[str, Any]: """获取反馈摘要""" total = len(self.feedback_data) if total == 0: return {'total': 0} type_counts = defaultdict(int) severity_counts = defaultdict(int) for feedback in self.feedback_data: type_counts[feedback['type']] += 1 severity_counts[feedback['severity']] += 1 return { 'total': total, 'by_type': dict(type_counts), 'by_severity': dict(severity_counts), 'latest_feedback': self.feedback_data[-1] if self.feedback_data else None } class AlertManager: """告警管理器""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.alert_rules = self.config.get('alert_rules', []) self.active_alerts = [] self.alert_history = [] def add_alert_rule(self, name: str, condition: str, threshold: float, severity: str = 'warning', message: str = ""): """添加告警规则""" rule = { 'name': name, 'condition': condition, 'threshold': threshold, 'severity': severity, 'message': message or f"{name} 超过阈值 {threshold}", 'enabled': True } self.alert_rules.append(rule) def check_alerts(self, stats: Dict[str, Any]) -> List[Dict[str, Any]]: """检查告警条件""" new_alerts = [] for rule in self.alert_rules: if not rule['enabled']: continue try: # 获取指标值 metric_value = self._get_metric_value(stats, rule['condition']) # 检查阈值 if metric_value > rule['threshold']: alert = { 'id': len(self.alert_history) + len(new_alerts) + 1, 'rule_name': rule['name'], 'severity': rule['severity'], 'message': rule['message'], 'metric': rule['condition'], 'value': metric_value, 'threshold': rule['threshold'], 'timestamp': datetime.now().isoformat() } new_alerts.append(alert) self.active_alerts.append(alert) except Exception as e: logger.error(f"检查告警规则失败: {e}") return new_alerts def _get_metric_value(self, stats: Dict[str, Any], metric_path: str) -> float: """获取指标值""" keys = metric_path.split('.') value = stats for key in keys: if isinstance(value, dict) and key in value: value = value[key] else: return 0.0 return float(value) if isinstance(value, (int, float)) else 0.0 def get_active_alerts(self) -> List[Dict[str, Any]]: """获取活跃告警""" return self.active_alerts.copy() def acknowledge_alert(self, alert_id: int) -> bool: """确认告警""" for i, alert in enumerate(self.active_alerts): if alert['id'] == alert_id: alert['acknowledged'] = True alert['acknowledged_at'] = datetime.now().isoformat() # 移动到历史记录 self.alert_history.append(alert) self.active_alerts.pop(i) return True return False # 全局实例 _performance_monitor: Optional[PerformanceMonitor] = None _feedback_collector: Optional[FeedbackCollector] = None _alert_manager: Optional[AlertManager] = None def get_performance_monitor(config: Optional[Dict[str, Any]] = None) -> PerformanceMonitor: """获取性能监控器实例""" global _performance_monitor if _performance_monitor is None: _performance_monitor = PerformanceMonitor(config) return _performance_monitor def get_feedback_collector(config: Optional[Dict[str, Any]] = None) -> FeedbackCollector: """获取反馈收集器实例""" global _feedback_collector if _feedback_collector is None: _feedback_collector = FeedbackCollector(config) return _feedback_collector def get_alert_manager(config: Optional[Dict[str, Any]] = None) -> AlertManager: """获取告警管理器实例""" global _alert_manager if _alert_manager is None: _alert_manager = AlertManager(config) # 添加默认告警规则 _alert_manager.add_alert_rule( "高错误率", "error_rate", 0.1, # 10% "warning", "系统错误率过高,请检查日志" ) _alert_manager.add_alert_rule( "高响应时间", "avg_response_time", 5.0, # 5秒 "warning", "平均响应时间过长,请检查性能" ) return _alert_manager # 便捷函数 def record_operation(operation_name: str, duration: float, success: bool = True): """记录操作性能的便捷函数""" get_performance_monitor().record_operation(operation_name, duration, success) def add_feedback(feedback_type: str, message: str, severity: str = 'info', context: Optional[Dict[str, Any]] = None) -> bool: """添加反馈的便捷函数""" return get_feedback_collector().add_feedback(feedback_type, message, severity, context)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server