performance_tracker.pyโข25.6 kB
"""
Performance Tracking System for ACP Capabilities
Phase 2: Tracks capability performance, success metrics, and provides
analytics for adaptive learning and heuristic optimization.
"""
import asyncio
import json
import logging
import os
import psutil
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict, field
from contextlib import asynccontextmanager
from collections import defaultdict, deque
from ..utils.config import Config
from ..utils.error_handler import ErrorHandler
logger = logging.getLogger(__name__)
@dataclass
class PerformanceSnapshot:
"""Snapshot of performance metrics at a point in time"""
timestamp: datetime
cpu_percent: float
memory_mb: int
memory_percent: float
disk_io_read: int
disk_io_write: int
network_io_sent: int
network_io_recv: int
open_files: int
thread_count: int
@dataclass
class ExecutionMetrics:
"""Detailed metrics for a single capability execution"""
capability_id: str
execution_id: str
start_time: datetime
end_time: datetime
duration: float
success: bool
error_type: Optional[str] = None
error_message: Optional[str] = None
# Resource usage
peak_memory_mb: int = 0
avg_cpu_percent: float = 0.0
disk_operations: int = 0
network_calls: int = 0
# Performance snapshots
snapshots: List[PerformanceSnapshot] = field(default_factory=list)
# Custom metrics
custom_metrics: Dict[str, Any] = field(default_factory=dict)
@dataclass
class CapabilityPerformance:
"""Aggregated performance data for a capability"""
capability_id: str
total_executions: int
successful_executions: int
failed_executions: int
success_rate: float
# Timing metrics
avg_duration: float
min_duration: float
max_duration: float
recent_durations: deque # Last 20 durations
# Resource metrics
avg_peak_memory_mb: float
max_peak_memory_mb: int
avg_cpu_percent: float
# Error analysis
error_frequency: Dict[str, int]
common_errors: List[str] # Top 5 error types
# Performance trend
performance_trend: List[float] # Success rate over time
last_execution: datetime
# Health indicators
health_score: float # 0-100 overall health
performance_grade: str # A, B, C, D, F
class PerformanceTracker:
"""Tracks and analyzes capability performance"""
def __init__(self, config: Config):
self.config = config
self.error_handler = ErrorHandler()
# Storage paths
workspace_root = config.get('workspace_root', os.getcwd())
self.metrics_dir = Path(workspace_root) / ".katamari" / "acp" / "performance"
self.executions_file = self.metrics_dir / "executions.jsonl"
self.performance_file = self.metrics_dir / "performance.json"
# Create directories
self.metrics_dir.mkdir(parents=True, exist_ok=True)
# In-memory storage
self._active_executions: Dict[str, ExecutionMetrics] = {}
self._capability_performance: Dict[str, CapabilityPerformance] = {}
self._performance_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100))
# Monitoring settings
self.snapshot_interval = 1.0 # seconds
self.max_snapshots_per_execution = 60
self.performance_window = 50 # executions for trend analysis
# System monitoring
self._monitoring_active = False
self._monitoring_tasks: Dict[str, asyncio.Task] = {}
logger.info("Performance tracker initialized")
@asynccontextmanager
async def track_execution(self, capability_id: str, execution_id: str):
"""Context manager for tracking capability execution"""
# Start tracking
start_time = datetime.now()
metrics = ExecutionMetrics(
capability_id=capability_id,
execution_id=execution_id,
start_time=start_time,
end_time=start_time,
duration=0.0,
success=False
)
self._active_executions[execution_id] = metrics
# Start monitoring
monitor_task = asyncio.create_task(
self._monitor_execution(execution_id, capability_id)
)
self._monitoring_tasks[execution_id] = monitor_task
try:
yield metrics
except Exception as e:
# Record error
metrics.success = False
metrics.error_type = type(e).__name__
metrics.error_message = str(e)
raise
finally:
# Stop monitoring
if execution_id in self._monitoring_tasks:
self._monitoring_tasks[execution_id].cancel()
del self._monitoring_tasks[execution_id]
# Finalize metrics
metrics.end_time = datetime.now()
metrics.duration = (metrics.end_time - metrics.start_time).total_seconds()
# Store and analyze
await self._store_execution_metrics(metrics)
await self._update_capability_performance(metrics.capability_id)
# Remove from active
if execution_id in self._active_executions:
del self._active_executions[execution_id]
async def get_capability_performance(self, capability_id: str) -> Optional[CapabilityPerformance]:
"""Get performance data for a capability"""
if capability_id in self._capability_performance:
return self._capability_performance[capability_id]
# Load from disk
await self._load_performance_data()
return self._capability_performance.get(capability_id)
async def get_performance_summary(self, days_back: int = 30) -> Dict[str, Any]:
"""Get overall performance summary"""
try:
cutoff_date = datetime.now() - timedelta(days=days_back)
# Load recent executions
recent_executions = await self._load_recent_executions(cutoff_date)
if not recent_executions:
return {"total_executions": 0}
# Calculate summary statistics
total_executions = len(recent_executions)
successful_executions = sum(1 for e in recent_executions if e.success)
success_rate = successful_executions / total_executions
durations = [e.duration for e in recent_executions]
avg_duration = sum(durations) / len(durations)
# Capability breakdown
capability_stats: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "successful": 0})
for execution in recent_executions:
cap = execution.capability_id
capability_stats[cap]["total"] += 1
if execution.success:
capability_stats[cap]["successful"] += 1
# Error analysis
error_counts = defaultdict(int)
for execution in recent_executions:
if execution.error_type:
error_counts[execution.error_type] += 1
return {
"period_days": days_back,
"total_executions": total_executions,
"successful_executions": successful_executions,
"success_rate": success_rate,
"average_duration": avg_duration,
"capability_breakdown": dict(capability_stats),
"common_errors": dict(sorted(error_counts.items(), key=lambda x: x[1], reverse=True)[:10]),
"capabilities_tracked": len(capability_stats)
}
except Exception as e:
logger.error(f"Failed to get performance summary: {e}")
return {"error": str(e)}
async def get_performance_trends(self, capability_id: str, days_back: int = 7) -> Dict[str, Any]:
"""Get performance trends for a specific capability"""
try:
performance = await self.get_capability_performance(capability_id)
if not performance:
return {"error": "Capability not found"}
# Get recent executions for trend analysis
cutoff_date = datetime.now() - timedelta(days=days_back)
recent_executions = await self._load_capability_executions(capability_id, cutoff_date)
if not recent_executions:
return {"error": "No recent executions"}
# Calculate trends
daily_stats: Dict[str, Dict[str, Any]] = defaultdict(lambda: {"total": 0, "successful": 0, "durations": []})
for execution in recent_executions:
day_key = execution.start_time.strftime("%Y-%m-%d")
daily_stats[day_key]["total"] += 1
if execution.success:
daily_stats[day_key]["successful"] += 1
daily_stats[day_key]["durations"].append(execution.duration)
# Build trend data
trend_data = []
for date in sorted(daily_stats.keys()):
stats = daily_stats[date]
success_rate = stats["successful"] / stats["total"] if stats["total"] > 0 else 0
durations = stats["durations"]
avg_duration = sum(durations) / len(durations) if durations else 0
trend_data.append({
"date": date,
"executions": stats["total"],
"success_rate": success_rate,
"avg_duration": avg_duration
})
return {
"capability_id": capability_id,
"period_days": days_back,
"trend_data": trend_data,
"current_health_score": performance.health_score,
"performance_grade": performance.performance_grade
}
except Exception as e:
logger.error(f"Failed to get performance trends: {e}")
return {"error": str(e)}
async def add_custom_metric(self, execution_id: str, metric_name: str, value: Any) -> bool:
"""Add custom metric to active execution"""
try:
if execution_id in self._active_executions:
self._active_executions[execution_id].custom_metrics[metric_name] = value
return True
return False
except Exception as e:
logger.error(f"Failed to add custom metric: {e}")
return False
async def _monitor_execution(self, execution_id: str, capability_id: str) -> None:
"""Monitor execution and collect performance snapshots"""
try:
process = psutil.Process()
while execution_id in self._active_executions:
# Collect system metrics
snapshot = PerformanceSnapshot(
timestamp=datetime.now(),
cpu_percent=process.cpu_percent(),
memory_mb=process.memory_info().rss // 1024 // 1024,
memory_percent=process.memory_percent(),
disk_io_read=process.io_counters().read_bytes if process.io_counters() else 0,
disk_io_write=process.io_counters().write_bytes if process.io_counters() else 0,
network_io_sent=process.io_counters().write_bytes if process.io_counters() else 0,
network_io_recv=process.io_counters().read_bytes if process.io_counters() else 0,
open_files=len(process.open_files()),
thread_count=process.num_threads()
)
# Add to execution metrics
if execution_id in self._active_executions:
metrics = self._active_executions[execution_id]
metrics.snapshots.append(snapshot)
# Update peak metrics
metrics.peak_memory_mb = max(metrics.peak_memory_mb, snapshot.memory_mb)
# Limit snapshots
if len(metrics.snapshots) > self.max_snapshots_per_execution:
metrics.snapshots = metrics.snapshots[-self.max_snapshots_per_execution:]
# Wait for next snapshot
await asyncio.sleep(self.snapshot_interval)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error monitoring execution {execution_id}: {e}")
async def _store_execution_metrics(self, metrics: ExecutionMetrics) -> None:
"""Store execution metrics to disk"""
try:
# Calculate derived metrics
if metrics.snapshots:
metrics.avg_cpu_percent = sum(s.cpu_percent for s in metrics.snapshots) / len(metrics.snapshots)
# Store to file
with open(self.executions_file, 'a') as f:
metrics_dict = asdict(metrics)
metrics_dict['start_time'] = metrics.start_time.isoformat()
metrics_dict['end_time'] = metrics.end_time.isoformat()
metrics_dict['snapshots'] = [asdict(s) for s in metrics.snapshots]
for snapshot in metrics_dict['snapshots']:
snapshot['timestamp'] = snapshot['timestamp'].isoformat()
f.write(json.dumps(metrics_dict) + '\n')
except Exception as e:
logger.error(f"Failed to store execution metrics: {e}")
async def _update_capability_performance(self, capability_id: str) -> None:
"""Update aggregated performance data for a capability"""
try:
# Load all executions for this capability
executions = await self._load_capability_executions(capability_id)
if not executions:
return
# Calculate aggregated metrics
total_executions = len(executions)
successful_executions = sum(1 for e in executions if e.success)
failed_executions = total_executions - successful_executions
success_rate = successful_executions / total_executions
durations = [e.duration for e in executions]
avg_duration = sum(durations) / len(durations)
min_duration = min(durations)
max_duration = max(durations)
recent_durations = deque([e.duration for e in executions[-20:]], maxlen=20)
# Resource metrics
peak_memories = [e.peak_memory_mb for e in executions if e.peak_memory_mb > 0]
avg_peak_memory = sum(peak_memories) / len(peak_memories) if peak_memories else 0
max_peak_memory = max(peak_memories) if peak_memories else 0
avg_cpus = [e.avg_cpu_percent for e in executions if e.avg_cpu_percent > 0]
avg_cpu = sum(avg_cpus) / len(avg_cpus) if avg_cpus else 0
# Error analysis
error_frequency = defaultdict(int)
for execution in executions:
if execution.error_type:
error_frequency[execution.error_type] += 1
common_errors = sorted(error_frequency.items(), key=lambda x: x[1], reverse=True)[:5]
common_errors = [error[0] for error in common_errors]
# Performance trend
performance_trend = [1.0 if e.success else 0.0 for e in executions[-self.performance_window:]]
# Health score calculation
health_score = self._calculate_health_score(
success_rate, avg_duration, avg_peak_memory, len(error_frequency)
)
performance_grade = self._get_performance_grade(health_score)
# Create performance object
performance = CapabilityPerformance(
capability_id=capability_id,
total_executions=total_executions,
successful_executions=successful_executions,
failed_executions=failed_executions,
success_rate=success_rate,
avg_duration=avg_duration,
min_duration=min_duration,
max_duration=max_duration,
recent_durations=recent_durations,
avg_peak_memory_mb=avg_peak_memory,
max_peak_memory_mb=max_peak_memory,
avg_cpu_percent=avg_cpu,
error_frequency=dict(error_frequency),
common_errors=common_errors,
performance_trend=performance_trend,
last_execution=max(e.start_time for e in executions),
health_score=health_score,
performance_grade=performance_grade
)
# Cache and store
self._capability_performance[capability_id] = performance
await self._save_performance_data()
except Exception as e:
logger.error(f"Failed to update capability performance: {e}")
async def _load_performance_data(self) -> None:
"""Load performance data from disk"""
try:
if not self.performance_file.exists():
return
with open(self.performance_file, 'r') as f:
data = json.load(f)
for capability_id, perf_data in data.items():
# Convert deques back to deques
recent_durations = deque(perf_data['recent_durations'], maxlen=20)
performance_trend = deque(perf_data['performance_trend'], maxlen=self.performance_window)
performance = CapabilityPerformance(
capability_id=perf_data['capability_id'],
total_executions=perf_data['total_executions'],
successful_executions=perf_data['successful_executions'],
failed_executions=perf_data['failed_executions'],
success_rate=perf_data['success_rate'],
avg_duration=perf_data['avg_duration'],
min_duration=perf_data['min_duration'],
max_duration=perf_data['max_duration'],
recent_durations=recent_durations,
avg_peak_memory_mb=perf_data['avg_peak_memory_mb'],
max_peak_memory_mb=perf_data['max_peak_memory_mb'],
avg_cpu_percent=perf_data['avg_cpu_percent'],
error_frequency=perf_data['error_frequency'],
common_errors=perf_data['common_errors'],
performance_trend=list(performance_trend),
last_execution=datetime.fromisoformat(perf_data['last_execution']),
health_score=perf_data['health_score'],
performance_grade=perf_data['performance_grade']
)
self._capability_performance[capability_id] = performance
except Exception as e:
logger.error(f"Failed to load performance data: {e}")
async def _save_performance_data(self) -> None:
"""Save performance data to disk"""
try:
data = {}
for capability_id, performance in self._capability_performance.items():
perf_data = asdict(performance)
perf_data['last_execution'] = performance.last_execution.isoformat()
data[capability_id] = perf_data
with open(self.performance_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save performance data: {e}")
async def _load_recent_executions(self, since: datetime) -> List[ExecutionMetrics]:
"""Load recent executions from disk"""
executions = []
if not self.executions_file.exists():
return executions
try:
with open(self.executions_file, 'r') as f:
for line in f:
try:
data = json.loads(line.strip())
start_time = datetime.fromisoformat(data['start_time'])
if start_time >= since:
# Reconstruct ExecutionMetrics
snapshots = []
for snap_data in data['snapshots']:
snapshot = PerformanceSnapshot(
timestamp=datetime.fromisoformat(snap_data['timestamp']),
cpu_percent=snap_data['cpu_percent'],
memory_mb=snap_data['memory_mb'],
memory_percent=snap_data['memory_percent'],
disk_io_read=snap_data['disk_io_read'],
disk_io_write=snap_data['disk_io_write'],
network_io_sent=snap_data['network_io_sent'],
network_io_recv=snap_data['network_io_recv'],
open_files=snap_data['open_files'],
thread_count=snap_data['thread_count']
)
snapshots.append(snapshot)
execution = ExecutionMetrics(
capability_id=data['capability_id'],
execution_id=data['execution_id'],
start_time=start_time,
end_time=datetime.fromisoformat(data['end_time']),
duration=data['duration'],
success=data['success'],
error_type=data.get('error_type'),
error_message=data.get('error_message'),
peak_memory_mb=data.get('peak_memory_mb', 0),
avg_cpu_percent=data.get('avg_cpu_percent', 0.0),
disk_operations=data.get('disk_operations', 0),
network_calls=data.get('network_calls', 0),
snapshots=snapshots,
custom_metrics=data.get('custom_metrics', {})
)
executions.append(execution)
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Skipping invalid execution line: {e}")
continue
except Exception as e:
logger.error(f"Failed to load recent executions: {e}")
return executions
async def _load_capability_executions(self, capability_id: str, since: Optional[datetime] = None) -> List[ExecutionMetrics]:
"""Load executions for a specific capability"""
all_executions = await self._load_recent_executions(since or datetime.min)
return [e for e in all_executions if e.capability_id == capability_id]
def _calculate_health_score(
self,
success_rate: float,
avg_duration: float,
avg_memory_mb: float,
error_types: int
) -> float:
"""Calculate overall health score (0-100)"""
try:
# Success rate weight: 40%
success_score = success_rate * 40
# Duration score (faster is better): 25%
# Normalize duration (assuming 10 seconds as baseline)
duration_score = max(0, 25 - (avg_duration / 10) * 25)
# Memory score (lower is better): 20%
# Normalize memory (assuming 500MB as baseline)
memory_score = max(0, 20 - (avg_memory_mb / 500) * 20)
# Error diversity score (fewer error types is better): 15%
error_score = max(0, 15 - error_types * 3)
total_score = success_score + duration_score + memory_score + error_score
return min(100, max(0, total_score))
except Exception as e:
logger.error(f"Failed to calculate health score: {e}")
return 50.0 # Default to middle score
def _get_performance_grade(self, health_score: float) -> str:
"""Get performance grade from health score"""
if health_score >= 90:
return "A"
elif health_score >= 80:
return "B"
elif health_score >= 70:
return "C"
elif health_score >= 60:
return "D"
else:
return "F"