Skip to main content
Glama
metrics.py13.2 kB
""" Metrics collection for incremental updates. Tracks update operations to provide insights into: - Stability trends over time - Performance characteristics - Edge repair effectiveness - Error patterns Usage: collector = UpdateMetricsCollector() # After each update collector.record_update(result) # Get statistics stats = collector.get_statistics() print(f"Average stability: {stats['avg_stability']:.1f}%") # Export for analysis collector.export_to_json("metrics.json") """ import json import time from dataclasses import dataclass, asdict from typing import List, Dict, Optional from pathlib import Path import statistics @dataclass class UpdateMetric: """Single update operation metric.""" timestamp: float file_path: str success: bool # Frame changes frames_deleted: int frames_added: int frames_stable: int stability_percentage: float # Edge changes edges_deleted: int edges_added: int # Performance parse_time_ms: float diff_time_ms: float database_time_ms: float total_time_ms: float # Errors/warnings error_count: int warning_count: int class UpdateMetricsCollector: """ Collect and analyze incremental update metrics. Thread-safe for single-process usage. Not designed for multi-process scenarios. """ def __init__(self, max_history: int = 1000): """ Initialize metrics collector. Args: max_history: Maximum number of metrics to keep in memory """ self.max_history = max_history self.metrics: List[UpdateMetric] = [] self._file_metrics: Dict[str, List[UpdateMetric]] = {} def record_update(self, result) -> None: """ Record an update result. Args: result: UpdateResult from IncrementalUpdater """ metric = UpdateMetric( timestamp=time.time(), file_path=result.file_path, success=result.success, frames_deleted=result.frames_deleted, frames_added=result.frames_added, frames_stable=result.frames_stable, stability_percentage=result.stability_percentage, edges_deleted=result.edges_deleted, edges_added=result.edges_added, parse_time_ms=result.parse_time_ms, diff_time_ms=result.diff_time_ms, database_time_ms=result.database_time_ms, total_time_ms=result.total_time_ms, error_count=len(result.errors) if result.errors else 0, warning_count=len(result.warnings) if result.warnings else 0 ) # Add to global history self.metrics.append(metric) # Add to per-file history if result.file_path not in self._file_metrics: self._file_metrics[result.file_path] = [] self._file_metrics[result.file_path].append(metric) # Trim if exceeds max history if len(self.metrics) > self.max_history: oldest = self.metrics[0] self.metrics.pop(0) # Also trim from file metrics if oldest.file_path in self._file_metrics: file_list = self._file_metrics[oldest.file_path] if file_list and file_list[0].timestamp == oldest.timestamp: file_list.pop(0) def get_statistics(self) -> Dict: """ Get aggregated statistics across all updates. Returns: Dictionary with statistics """ if not self.metrics: return { 'total_updates': 0, 'success_rate': 0.0, 'avg_stability': 0.0, 'avg_time_ms': 0.0 } successful = [m for m in self.metrics if m.success] stats = { 'total_updates': len(self.metrics), 'successful_updates': len(successful), 'failed_updates': len(self.metrics) - len(successful), 'success_rate': (len(successful) / len(self.metrics)) * 100, # Stability metrics (successful only) 'avg_stability': statistics.mean(m.stability_percentage for m in successful) if successful else 0.0, 'median_stability': statistics.median(m.stability_percentage for m in successful) if successful else 0.0, 'min_stability': min(m.stability_percentage for m in successful) if successful else 0.0, 'max_stability': max(m.stability_percentage for m in successful) if successful else 0.0, # Performance metrics (successful only) 'avg_time_ms': statistics.mean(m.total_time_ms for m in successful) if successful else 0.0, 'median_time_ms': statistics.median(m.total_time_ms for m in successful) if successful else 0.0, 'p95_time_ms': self._percentile([m.total_time_ms for m in successful], 95) if successful else 0.0, 'p99_time_ms': self._percentile([m.total_time_ms for m in successful], 99) if successful else 0.0, # Frame change metrics 'avg_frames_changed': statistics.mean( m.frames_deleted + m.frames_added for m in successful ) if successful else 0.0, # Edge repair metrics 'total_edges_added': sum(m.edges_added for m in successful), 'total_edges_deleted': sum(m.edges_deleted for m in successful), 'avg_edges_per_update': statistics.mean(m.edges_added for m in successful) if successful else 0.0, # Error metrics 'total_errors': sum(m.error_count for m in self.metrics), 'total_warnings': sum(m.warning_count for m in self.metrics), } return stats def get_file_statistics(self, file_path: str) -> Optional[Dict]: """ Get statistics for a specific file. Args: file_path: Path to file Returns: Statistics dict or None if file not found """ if file_path not in self._file_metrics: return None metrics = self._file_metrics[file_path] if not metrics: return None successful = [m for m in metrics if m.success] return { 'file_path': file_path, 'update_count': len(metrics), 'avg_stability': statistics.mean(m.stability_percentage for m in successful) if successful else 0.0, 'avg_time_ms': statistics.mean(m.total_time_ms for m in successful) if successful else 0.0, 'last_updated': max(m.timestamp for m in metrics), } def get_stability_trend(self, window: int = 10) -> List[float]: """ Get stability percentage trend over recent updates. Args: window: Number of recent updates to include Returns: List of stability percentages (most recent first) """ recent = [m for m in self.metrics[-window:] if m.success] return [m.stability_percentage for m in reversed(recent)] def get_performance_trend(self, window: int = 10) -> List[float]: """ Get performance (total time) trend over recent updates. Args: window: Number of recent updates to include Returns: List of total_time_ms values (most recent first) """ recent = [m for m in self.metrics[-window:] if m.success] return [m.total_time_ms for m in reversed(recent)] def get_most_frequently_updated_files(self, top_n: int = 10) -> List[tuple]: """ Get most frequently updated files. Args: top_n: Number of files to return Returns: List of (file_path, update_count) tuples """ file_counts = { file_path: len(metrics) for file_path, metrics in self._file_metrics.items() } sorted_files = sorted( file_counts.items(), key=lambda x: x[1], reverse=True ) return sorted_files[:top_n] def get_slowest_updates(self, top_n: int = 10) -> List[UpdateMetric]: """ Get slowest update operations. Args: top_n: Number of updates to return Returns: List of UpdateMetric sorted by total_time_ms descending """ successful = [m for m in self.metrics if m.success] sorted_metrics = sorted( successful, key=lambda m: m.total_time_ms, reverse=True ) return sorted_metrics[:top_n] def get_least_stable_updates(self, top_n: int = 10) -> List[UpdateMetric]: """ Get updates with lowest stability. Args: top_n: Number of updates to return Returns: List of UpdateMetric sorted by stability_percentage ascending """ successful = [m for m in self.metrics if m.success] sorted_metrics = sorted( successful, key=lambda m: m.stability_percentage ) return sorted_metrics[:top_n] def export_to_json(self, file_path: str) -> None: """ Export metrics to JSON file. Args: file_path: Output file path """ data = { 'exported_at': time.time(), 'total_metrics': len(self.metrics), 'metrics': [asdict(m) for m in self.metrics], 'statistics': self.get_statistics() } with open(file_path, 'w') as f: json.dump(data, f, indent=2) def load_from_json(self, file_path: str) -> None: """ Load metrics from JSON file. Args: file_path: Input file path """ with open(file_path, 'r') as f: data = json.load(f) self.metrics = [ UpdateMetric(**metric_dict) for metric_dict in data['metrics'] ] # Rebuild file metrics index self._file_metrics.clear() for metric in self.metrics: if metric.file_path not in self._file_metrics: self._file_metrics[metric.file_path] = [] self._file_metrics[metric.file_path].append(metric) def print_summary(self) -> None: """Print human-readable summary to stdout.""" stats = self.get_statistics() print("Incremental Update Metrics Summary") print("=" * 60) print(f"Total updates: {stats['total_updates']}") print(f"Success rate: {stats['success_rate']:.1f}%") print() print("Stability:") print(f" Average: {stats['avg_stability']:.1f}%") print(f" Median: {stats['median_stability']:.1f}%") print(f" Range: {stats['min_stability']:.1f}% - {stats['max_stability']:.1f}%") print() print("Performance:") print(f" Average time: {stats['avg_time_ms']:.1f}ms") print(f" Median time: {stats['median_time_ms']:.1f}ms") print(f" 95th percentile: {stats['p95_time_ms']:.1f}ms") print(f" 99th percentile: {stats['p99_time_ms']:.1f}ms") print() print("Edge Repair:") print(f" Total edges added: {stats['total_edges_added']}") print(f" Avg per update: {stats['avg_edges_per_update']:.1f}") print() if stats['total_errors'] > 0 or stats['total_warnings'] > 0: print("Issues:") print(f" Errors: {stats['total_errors']}") print(f" Warnings: {stats['total_warnings']}") @staticmethod def _percentile(values: List[float], percentile: int) -> float: """Calculate percentile of values.""" if not values: return 0.0 sorted_values = sorted(values) index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)] # Global instance for convenience _global_collector: Optional[UpdateMetricsCollector] = None def get_global_collector() -> UpdateMetricsCollector: """Get or create global metrics collector.""" global _global_collector if _global_collector is None: _global_collector = UpdateMetricsCollector() return _global_collector def record_update(result) -> None: """Convenience function to record update in global collector.""" get_global_collector().record_update(result) def get_statistics() -> Dict: """Convenience function to get statistics from global collector.""" return get_global_collector().get_statistics() def print_summary() -> None: """Convenience function to print summary from global collector.""" get_global_collector().print_summary()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server