"""
性能优化工具
提供性能分析、瓶颈识别和优化建议功能。
"""
import time
import asyncio
import functools
import threading
import psutil
import os
from typing import Dict, List, Optional, Callable, Any, Union
from dataclasses import dataclass, field
from collections import defaultdict, deque
import statistics
from pathlib import Path
import json
from .logger import default_logger, LogLevel, LoggerCategory
@dataclass
class PerformanceMetric:
"""性能指标"""
name: str
value: float
unit: str
timestamp: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class PerformanceProfile:
"""性能档案"""
operation_name: str
total_calls: int = 0
total_time: float = 0.0
min_time: float = float('inf')
max_time: float = 0.0
avg_time: float = 0.0
recent_times: deque = field(default_factory=lambda: deque(maxlen=100))
errors: int = 0
memory_usage_mb: float = 0.0
class PerformanceProfiler:
"""性能分析器"""
def __init__(self):
self.profiles: Dict[str, PerformanceProfile] = {}
self.active_operations: Dict[str, float] = {}
self.lock = threading.RLock()
self.system_metrics = []
def start_operation(self, operation_name: str) -> str:
"""开始操作计时"""
operation_id = f"{operation_name}_{int(time.time() * 1000)}"
with self.lock:
self.active_operations[operation_id] = time.time()
# 初始化档案
if operation_name not in self.profiles:
self.profiles[operation_name] = PerformanceProfile(operation_name)
self.profiles[operation_name].total_calls += 1
return operation_id
def end_operation(self, operation_id: str, success: bool = True) -> float:
"""结束操作计时"""
with self.lock:
if operation_id not in self.active_operations:
return 0.0
start_time = self.active_operations[operation_id]
duration = time.time() - start_time
# 提取操作名称
operation_name = operation_id.rsplit('_', 1)[0]
if operation_name in self.profiles:
profile = self.profiles[operation_name]
profile.total_time += duration
profile.min_time = min(profile.min_time, duration)
profile.max_time = max(profile.max_time, duration)
profile.avg_time = profile.total_time / profile.total_calls
profile.recent_times.append(duration)
if not success:
profile.errors += 1
del self.active_operations[operation_id]
return duration
def get_profile(self, operation_name: str) -> Optional[PerformanceProfile]:
"""获取操作档案"""
return self.profiles.get(operation_name)
def get_all_profiles(self) -> Dict[str, PerformanceProfile]:
"""获取所有操作档案"""
return self.profiles.copy()
def collect_system_metrics(self):
"""收集系统指标"""
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用情况
memory = psutil.virtual_memory()
# 磁盘使用情况
disk = psutil.disk_usage('/')
# 网络统计
network = psutil.net_io_counters()
metric = PerformanceMetric(
name="system_resources",
value=cpu_percent,
unit="percent_cpu",
metadata={
"memory_total_gb": memory.total / (1024**3),
"memory_used_gb": memory.used / (1024**3),
"memory_percent": memory.percent,
"disk_total_gb": disk.total / (1024**3),
"disk_used_gb": disk.used / (1024**3),
"disk_percent": disk.percent,
"network_bytes_sent": network.bytes_sent,
"network_bytes_recv": network.bytes_recv
}
)
self.system_metrics.append(metric)
# 保留最近100个系统指标
if len(self.system_metrics) > 100:
self.system_metrics = self.system_metrics[-100:]
except Exception as e:
default_logger.warning(f"收集系统指标失败: {e}", category=LoggerCategory.PERFORMANCE)
def identify_bottlenecks(self) -> List[Dict[str, Any]]:
"""识别性能瓶颈"""
bottlenecks = []
for name, profile in self.profiles.items():
# 检查平均执行时间过长
if profile.avg_time > 1.0: # 超过1秒
bottlenecks.append({
"type": "slow_operation",
"operation": name,
"avg_time": profile.avg_time,
"max_time": profile.max_time,
"total_calls": profile.total_calls,
"recommendation": "考虑优化算法或添加缓存"
})
# 检查错误率过高
if profile.total_calls > 10:
error_rate = profile.errors / profile.total_calls
if error_rate > 0.1: # 错误率超过10%
bottlenecks.append({
"type": "high_error_rate",
"operation": name,
"error_rate": error_rate,
"total_calls": profile.total_calls,
"errors": profile.errors,
"recommendation": "检查异常处理逻辑,修复错误源"
})
# 检查性能波动过大
if len(profile.recent_times) > 10:
std_dev = statistics.stdev(profile.recent_times)
if std_dev > profile.avg_time * 0.5: # 标准差超过平均值的50%
bottlenecks.append({
"type": "performance_variance",
"operation": name,
"std_dev": std_dev,
"avg_time": profile.avg_time,
"recommendation": "检查是否存在资源竞争或负载变化"
})
return bottlenecks
def generate_optimization_report(self) -> Dict[str, Any]:
"""生成优化报告"""
bottlenecks = self.identify_bottlenecks()
# 计算总体统计
total_operations = sum(p.total_calls for p in self.profiles.values())
total_errors = sum(p.errors for p in self.profiles.values())
overall_error_rate = total_errors / total_operations if total_operations > 0 else 0
# 找出最慢的操作
slowest_operations = sorted(
[(name, profile.avg_time) for name, profile in self.profiles.items()],
key=lambda x: x[1],
reverse=True
)[:5]
# 找出调用最频繁的操作
most_frequent_operations = sorted(
[(name, profile.total_calls) for name, profile in self.profiles.items()],
key=lambda x: x[1],
reverse=True
)[:5]
return {
"summary": {
"total_operations": total_operations,
"total_errors": total_errors,
"error_rate": overall_error_rate,
"unique_operations": len(self.profiles)
},
"bottlenecks": bottlenecks,
"slowest_operations": slowest_operations,
"most_frequent_operations": most_frequent_operations,
"recommendations": self._generate_recommendations(bottlenecks)
}
def _generate_recommendations(self, bottlenecks: List[Dict[str, Any]]) -> List[str]:
"""生成优化建议"""
recommendations = []
# 基于瓶颈类型的通用建议
bottleneck_types = set(b["type"] for b in bottlenecks)
if "slow_operation" in bottleneck_types:
recommendations.append("考虑实施缓存机制减少重复计算")
recommendations.append("优化算法复杂度,使用更高效的数据结构")
recommendations.append("考虑异步处理提高并发性能")
if "high_error_rate" in bottleneck_types:
recommendations.append("加强输入验证,减少异常情况")
recommendations.append("改进错误处理逻辑,提高容错能力")
recommendations.append("添加更详细的错误日志,便于诊断")
if "performance_variance" in bottleneck_types:
recommendations.append("检查是否存在资源竞争或锁竞争")
recommendations.append("实施负载均衡,分散处理压力")
recommendations.append("添加性能监控,实时跟踪变化")
# 基于具体操作的建议
for bottleneck in bottlenecks:
operation = bottleneck["operation"]
if "file" in operation.lower():
recommendations.append("对于文件操作,考虑使用批量处理和异步I/O")
if "network" in operation.lower() or "http" in operation.lower():
recommendations.append("对于网络操作,考虑添加重试机制和连接池")
if "database" in operation.lower() or "db" in operation.lower():
recommendations.append("对于数据库操作,考虑添加索引和查询优化")
return list(set(recommendations)) # 去重
# 全局性能分析器实例
global_profiler = PerformanceProfiler()
def performance_monitor(operation_name: str):
"""性能监控装饰器"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
operation_id = global_profiler.start_operation(operation_name)
try:
result = func(*args, **kwargs)
global_profiler.end_operation(operation_id, success=True)
return result
except Exception as e:
global_profiler.end_operation(operation_id, success=False)
raise
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
operation_id = global_profiler.start_operation(operation_name)
try:
result = await func(*args, **kwargs)
global_profiler.end_operation(operation_id, success=True)
return result
except Exception as e:
global_profiler.end_operation(operation_id, success=False)
raise
# 根据函数类型返回适当的包装器
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
def memory_usage_monitor():
"""内存使用监控"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return memory_info.rss / (1024 * 1024) # MB
async def collect_background_metrics(interval: int = 30):
"""后台收集系统指标"""
while True:
global_profiler.collect_system_metrics()
await asyncio.sleep(interval)
def save_performance_report(filepath: str = "performance_report.json"):
"""保存性能报告"""
report = global_profiler.generate_optimization_report()
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
default_logger.info(
f"性能报告已保存到: {filepath}",
category=LoggerCategory.PERFORMANCE
)
def get_current_performance_status() -> Dict[str, Any]:
"""获取当前性能状态"""
return {
"active_operations": len(global_profiler.active_operations),
"total_profiles": len(global_profiler.profiles),
"system_metrics": len(global_profiler.system_metrics),
"memory_usage_mb": memory_usage_monitor(),
"cpu_percent": psutil.cpu_percent()
}