Skip to main content
Glama
performance.py12.5 kB
""" 性能优化工具 提供性能分析、瓶颈识别和优化建议功能。 """ import time import asyncio import functools import threading import psutil import os from typing import Dict, List, Optional, Callable, Any, Union from dataclasses import dataclass, field from collections import defaultdict, deque import statistics from pathlib import Path import json from .logger import default_logger, LogLevel, LoggerCategory @dataclass class PerformanceMetric: """性能指标""" name: str value: float unit: str timestamp: float = field(default_factory=time.time) metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class PerformanceProfile: """性能档案""" operation_name: str total_calls: int = 0 total_time: float = 0.0 min_time: float = float('inf') max_time: float = 0.0 avg_time: float = 0.0 recent_times: deque = field(default_factory=lambda: deque(maxlen=100)) errors: int = 0 memory_usage_mb: float = 0.0 class PerformanceProfiler: """性能分析器""" def __init__(self): self.profiles: Dict[str, PerformanceProfile] = {} self.active_operations: Dict[str, float] = {} self.lock = threading.RLock() self.system_metrics = [] def start_operation(self, operation_name: str) -> str: """开始操作计时""" operation_id = f"{operation_name}_{int(time.time() * 1000)}" with self.lock: self.active_operations[operation_id] = time.time() # 初始化档案 if operation_name not in self.profiles: self.profiles[operation_name] = PerformanceProfile(operation_name) self.profiles[operation_name].total_calls += 1 return operation_id def end_operation(self, operation_id: str, success: bool = True) -> float: """结束操作计时""" with self.lock: if operation_id not in self.active_operations: return 0.0 start_time = self.active_operations[operation_id] duration = time.time() - start_time # 提取操作名称 operation_name = operation_id.rsplit('_', 1)[0] if operation_name in self.profiles: profile = self.profiles[operation_name] profile.total_time += duration profile.min_time = min(profile.min_time, duration) profile.max_time = max(profile.max_time, duration) profile.avg_time = profile.total_time / profile.total_calls profile.recent_times.append(duration) if not success: profile.errors += 1 del self.active_operations[operation_id] return duration def get_profile(self, operation_name: str) -> Optional[PerformanceProfile]: """获取操作档案""" return self.profiles.get(operation_name) def get_all_profiles(self) -> Dict[str, PerformanceProfile]: """获取所有操作档案""" return self.profiles.copy() def collect_system_metrics(self): """收集系统指标""" try: # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存使用情况 memory = psutil.virtual_memory() # 磁盘使用情况 disk = psutil.disk_usage('/') # 网络统计 network = psutil.net_io_counters() metric = PerformanceMetric( name="system_resources", value=cpu_percent, unit="percent_cpu", metadata={ "memory_total_gb": memory.total / (1024**3), "memory_used_gb": memory.used / (1024**3), "memory_percent": memory.percent, "disk_total_gb": disk.total / (1024**3), "disk_used_gb": disk.used / (1024**3), "disk_percent": disk.percent, "network_bytes_sent": network.bytes_sent, "network_bytes_recv": network.bytes_recv } ) self.system_metrics.append(metric) # 保留最近100个系统指标 if len(self.system_metrics) > 100: self.system_metrics = self.system_metrics[-100:] except Exception as e: default_logger.warning(f"收集系统指标失败: {e}", category=LoggerCategory.PERFORMANCE) def identify_bottlenecks(self) -> List[Dict[str, Any]]: """识别性能瓶颈""" bottlenecks = [] for name, profile in self.profiles.items(): # 检查平均执行时间过长 if profile.avg_time > 1.0: # 超过1秒 bottlenecks.append({ "type": "slow_operation", "operation": name, "avg_time": profile.avg_time, "max_time": profile.max_time, "total_calls": profile.total_calls, "recommendation": "考虑优化算法或添加缓存" }) # 检查错误率过高 if profile.total_calls > 10: error_rate = profile.errors / profile.total_calls if error_rate > 0.1: # 错误率超过10% bottlenecks.append({ "type": "high_error_rate", "operation": name, "error_rate": error_rate, "total_calls": profile.total_calls, "errors": profile.errors, "recommendation": "检查异常处理逻辑,修复错误源" }) # 检查性能波动过大 if len(profile.recent_times) > 10: std_dev = statistics.stdev(profile.recent_times) if std_dev > profile.avg_time * 0.5: # 标准差超过平均值的50% bottlenecks.append({ "type": "performance_variance", "operation": name, "std_dev": std_dev, "avg_time": profile.avg_time, "recommendation": "检查是否存在资源竞争或负载变化" }) return bottlenecks def generate_optimization_report(self) -> Dict[str, Any]: """生成优化报告""" bottlenecks = self.identify_bottlenecks() # 计算总体统计 total_operations = sum(p.total_calls for p in self.profiles.values()) total_errors = sum(p.errors for p in self.profiles.values()) overall_error_rate = total_errors / total_operations if total_operations > 0 else 0 # 找出最慢的操作 slowest_operations = sorted( [(name, profile.avg_time) for name, profile in self.profiles.items()], key=lambda x: x[1], reverse=True )[:5] # 找出调用最频繁的操作 most_frequent_operations = sorted( [(name, profile.total_calls) for name, profile in self.profiles.items()], key=lambda x: x[1], reverse=True )[:5] return { "summary": { "total_operations": total_operations, "total_errors": total_errors, "error_rate": overall_error_rate, "unique_operations": len(self.profiles) }, "bottlenecks": bottlenecks, "slowest_operations": slowest_operations, "most_frequent_operations": most_frequent_operations, "recommendations": self._generate_recommendations(bottlenecks) } def _generate_recommendations(self, bottlenecks: List[Dict[str, Any]]) -> List[str]: """生成优化建议""" recommendations = [] # 基于瓶颈类型的通用建议 bottleneck_types = set(b["type"] for b in bottlenecks) if "slow_operation" in bottleneck_types: recommendations.append("考虑实施缓存机制减少重复计算") recommendations.append("优化算法复杂度,使用更高效的数据结构") recommendations.append("考虑异步处理提高并发性能") if "high_error_rate" in bottleneck_types: recommendations.append("加强输入验证,减少异常情况") recommendations.append("改进错误处理逻辑,提高容错能力") recommendations.append("添加更详细的错误日志,便于诊断") if "performance_variance" in bottleneck_types: recommendations.append("检查是否存在资源竞争或锁竞争") recommendations.append("实施负载均衡,分散处理压力") recommendations.append("添加性能监控,实时跟踪变化") # 基于具体操作的建议 for bottleneck in bottlenecks: operation = bottleneck["operation"] if "file" in operation.lower(): recommendations.append("对于文件操作,考虑使用批量处理和异步I/O") if "network" in operation.lower() or "http" in operation.lower(): recommendations.append("对于网络操作,考虑添加重试机制和连接池") if "database" in operation.lower() or "db" in operation.lower(): recommendations.append("对于数据库操作,考虑添加索引和查询优化") return list(set(recommendations)) # 去重 # 全局性能分析器实例 global_profiler = PerformanceProfiler() def performance_monitor(operation_name: str): """性能监控装饰器""" def decorator(func: Callable) -> Callable: @functools.wraps(func) def sync_wrapper(*args, **kwargs): operation_id = global_profiler.start_operation(operation_name) try: result = func(*args, **kwargs) global_profiler.end_operation(operation_id, success=True) return result except Exception as e: global_profiler.end_operation(operation_id, success=False) raise @functools.wraps(func) async def async_wrapper(*args, **kwargs): operation_id = global_profiler.start_operation(operation_name) try: result = await func(*args, **kwargs) global_profiler.end_operation(operation_id, success=True) return result except Exception as e: global_profiler.end_operation(operation_id, success=False) raise # 根据函数类型返回适当的包装器 if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator def memory_usage_monitor(): """内存使用监控""" process = psutil.Process(os.getpid()) memory_info = process.memory_info() return memory_info.rss / (1024 * 1024) # MB async def collect_background_metrics(interval: int = 30): """后台收集系统指标""" while True: global_profiler.collect_system_metrics() await asyncio.sleep(interval) def save_performance_report(filepath: str = "performance_report.json"): """保存性能报告""" report = global_profiler.generate_optimization_report() with open(filepath, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) default_logger.info( f"性能报告已保存到: {filepath}", category=LoggerCategory.PERFORMANCE ) def get_current_performance_status() -> Dict[str, Any]: """获取当前性能状态""" return { "active_operations": len(global_profiler.active_operations), "total_profiles": len(global_profiler.profiles), "system_metrics": len(global_profiler.system_metrics), "memory_usage_mb": memory_usage_monitor(), "cpu_percent": psutil.cpu_percent() }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server