Skip to main content
Glama
aggregator.py24.8 kB
""" Metrics Aggregator for Analytics Engine. Performs aggregation and analysis on parsed metrics data. """ import asyncio from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional, Tuple, Set from dataclasses import dataclass, field from collections import defaultdict from enum import Enum import statistics from ..utils.logging import get_logger from .parser import ParsedMetric, MetricsParser from .writer import MetricType logger = get_logger(__name__) class AggregationType(str, Enum): """Types of aggregations supported.""" HOURLY = "hourly" DAILY = "daily" WEEKLY = "weekly" MONTHLY = "monthly" BY_SESSION = "by_session" BY_USER = "by_user" BY_TOOL = "by_tool" BY_AGENT = "by_agent" BY_PROJECT = "by_project" @dataclass class AggregationResult: """Result of an aggregation operation.""" type: AggregationType start_time: datetime end_time: datetime # Count metrics total_metrics: int = 0 total_sessions: int = 0 total_users: int = 0 total_errors: int = 0 # Time metrics (milliseconds) total_duration_ms: float = 0 avg_duration_ms: float = 0 min_duration_ms: Optional[float] = None max_duration_ms: Optional[float] = None p50_duration_ms: Optional[float] = None p95_duration_ms: Optional[float] = None p99_duration_ms: Optional[float] = None # Token metrics total_tokens: int = 0 avg_tokens_per_session: float = 0 # Success metrics success_count: int = 0 failure_count: int = 0 success_rate: float = 0.0 # Breakdowns metrics_by_type: Dict[str, int] = field(default_factory=dict) tools_usage: Dict[str, Dict[str, Any]] = field(default_factory=dict) agents_usage: Dict[str, Dict[str, Any]] = field(default_factory=dict) errors_by_type: Dict[str, int] = field(default_factory=dict) # Time series data (for charting) time_series: List[Dict[str, Any]] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "type": self.type.value, "start_time": self.start_time.isoformat(), "end_time": self.end_time.isoformat(), "total_metrics": self.total_metrics, "total_sessions": self.total_sessions, "total_users": self.total_users, "total_errors": self.total_errors, "duration_stats": { "total_ms": self.total_duration_ms, "avg_ms": self.avg_duration_ms, "min_ms": self.min_duration_ms, "max_ms": self.max_duration_ms, "p50_ms": self.p50_duration_ms, "p95_ms": self.p95_duration_ms, "p99_ms": self.p99_duration_ms }, "token_stats": { "total": self.total_tokens, "avg_per_session": self.avg_tokens_per_session }, "success_metrics": { "success_count": self.success_count, "failure_count": self.failure_count, "success_rate": self.success_rate }, "breakdowns": { "by_type": self.metrics_by_type, "tools": self.tools_usage, "agents": self.agents_usage, "errors": self.errors_by_type }, "time_series": self.time_series } class MetricsAggregator: """Aggregates metrics data for analysis and reporting.""" def __init__(self, parser: MetricsParser): """ Initialize aggregator. Args: parser: Metrics parser instance """ self.parser = parser async def aggregate( self, aggregation_type: AggregationType, start_time: datetime, end_time: datetime, filters: Optional[Dict[str, Any]] = None ) -> AggregationResult: """ Perform aggregation on metrics. Args: aggregation_type: Type of aggregation to perform start_time: Start of time range end_time: End of time range filters: Optional filters to apply Returns: Aggregation result """ result = AggregationResult( type=aggregation_type, start_time=start_time, end_time=end_time ) # Collect metrics metrics = await self._collect_metrics(start_time, end_time, filters) result.total_metrics = len(metrics) if not metrics: return result # Perform base aggregations await self._aggregate_base_stats(metrics, result) # Perform type-specific aggregations if aggregation_type == AggregationType.HOURLY: await self._aggregate_hourly(metrics, result) elif aggregation_type == AggregationType.DAILY: await self._aggregate_daily(metrics, result) elif aggregation_type == AggregationType.WEEKLY: await self._aggregate_weekly(metrics, result) elif aggregation_type == AggregationType.MONTHLY: await self._aggregate_monthly(metrics, result) elif aggregation_type == AggregationType.BY_SESSION: await self._aggregate_by_session(metrics, result) elif aggregation_type == AggregationType.BY_USER: await self._aggregate_by_user(metrics, result) elif aggregation_type == AggregationType.BY_TOOL: await self._aggregate_by_tool(metrics, result) elif aggregation_type == AggregationType.BY_AGENT: await self._aggregate_by_agent(metrics, result) elif aggregation_type == AggregationType.BY_PROJECT: await self._aggregate_by_project(metrics, result) return result async def _collect_metrics( self, start_time: datetime, end_time: datetime, filters: Optional[Dict[str, Any]] = None ) -> List[ParsedMetric]: """Collect metrics with optional filters.""" all_metrics = [] async for batch in self.parser.stream_metrics(start_time, end_time): for metric in batch: # Apply filters if filters: if "session_id" in filters and metric.session_id != filters["session_id"]: continue if "user_id" in filters and metric.user_id != filters["user_id"]: continue if "type" in filters and metric.type != filters["type"]: continue if "tool_name" in filters and metric.tool_name != filters["tool_name"]: continue if "agent_id" in filters and metric.agent_id != filters["agent_id"]: continue all_metrics.append(metric) return all_metrics async def _aggregate_base_stats( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate base statistics.""" sessions: Set[str] = set() users: Set[str] = set() durations: List[float] = [] for metric in metrics: # Count by type result.metrics_by_type[metric.type.value] = \ result.metrics_by_type.get(metric.type.value, 0) + 1 # Track sessions and users if metric.session_id: sessions.add(metric.session_id) if metric.user_id: users.add(metric.user_id) # Track errors if metric.type == MetricType.ERROR_OCCURRED: result.total_errors += 1 if metric.error_type: result.errors_by_type[metric.error_type] = \ result.errors_by_type.get(metric.error_type, 0) + 1 # Track durations if metric.duration_ms is not None: durations.append(metric.duration_ms) result.total_duration_ms += metric.duration_ms # Track success/failure if metric.success is not None: if metric.success: result.success_count += 1 else: result.failure_count += 1 # Track tokens if metric.token_count: result.total_tokens += metric.token_count # Track tool usage if metric.tool_name: if metric.tool_name not in result.tools_usage: result.tools_usage[metric.tool_name] = { "count": 0, "success": 0, "failure": 0, "total_duration_ms": 0, "avg_duration_ms": 0 } tool_stats = result.tools_usage[metric.tool_name] tool_stats["count"] += 1 if metric.success: tool_stats["success"] += 1 else: tool_stats["failure"] += 1 if metric.duration_ms: tool_stats["total_duration_ms"] += metric.duration_ms # Track agent usage if metric.agent_id: if metric.agent_id not in result.agents_usage: result.agents_usage[metric.agent_id] = { "count": 0, "success": 0, "failure": 0, "total_duration_ms": 0, "avg_duration_ms": 0 } agent_stats = result.agents_usage[metric.agent_id] agent_stats["count"] += 1 if metric.success: agent_stats["success"] += 1 else: agent_stats["failure"] += 1 if metric.duration_ms: agent_stats["total_duration_ms"] += metric.duration_ms # Set counts result.total_sessions = len(sessions) result.total_users = len(users) # Calculate duration statistics if durations: result.avg_duration_ms = statistics.mean(durations) result.min_duration_ms = min(durations) result.max_duration_ms = max(durations) # Calculate percentiles sorted_durations = sorted(durations) n = len(sorted_durations) result.p50_duration_ms = sorted_durations[n // 2] result.p95_duration_ms = sorted_durations[int(n * 0.95)] result.p99_duration_ms = sorted_durations[int(n * 0.99)] # Calculate success rate total_outcomes = result.success_count + result.failure_count if total_outcomes > 0: result.success_rate = result.success_count / total_outcomes # Calculate average tokens per session if result.total_sessions > 0: result.avg_tokens_per_session = result.total_tokens / result.total_sessions # Calculate tool/agent averages for tool_stats in result.tools_usage.values(): if tool_stats["count"] > 0 and tool_stats["total_duration_ms"] > 0: tool_stats["avg_duration_ms"] = \ tool_stats["total_duration_ms"] / tool_stats["count"] for agent_stats in result.agents_usage.values(): if agent_stats["count"] > 0 and agent_stats["total_duration_ms"] > 0: agent_stats["avg_duration_ms"] = \ agent_stats["total_duration_ms"] / agent_stats["count"] async def _aggregate_hourly( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by hour.""" hourly_data = defaultdict(lambda: { "count": 0, "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: # Round to hour hour = metric.timestamp.replace(minute=0, second=0, microsecond=0) hour_data = hourly_data[hour] hour_data["count"] += 1 if metric.type == MetricType.ERROR_OCCURRED: hour_data["errors"] += 1 if metric.duration_ms: hour_data["duration_ms"] += metric.duration_ms if metric.token_count: hour_data["tokens"] += metric.token_count # Convert to time series for hour, data in sorted(hourly_data.items()): result.time_series.append({ "timestamp": hour.isoformat(), "metrics_count": data["count"], "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] }) async def _aggregate_daily( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by day.""" daily_data = defaultdict(lambda: { "count": 0, "sessions": set(), "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: # Round to day day = metric.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) day_data = daily_data[day] day_data["count"] += 1 if metric.session_id: day_data["sessions"].add(metric.session_id) if metric.type == MetricType.ERROR_OCCURRED: day_data["errors"] += 1 if metric.duration_ms: day_data["duration_ms"] += metric.duration_ms if metric.token_count: day_data["tokens"] += metric.token_count # Convert to time series for day, data in sorted(daily_data.items()): result.time_series.append({ "timestamp": day.isoformat(), "metrics_count": data["count"], "sessions": len(data["sessions"]), "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] }) async def _aggregate_weekly( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by week.""" weekly_data = defaultdict(lambda: { "count": 0, "sessions": set(), "users": set(), "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: # Get start of week (Monday) week_start = metric.timestamp - timedelta(days=metric.timestamp.weekday()) week_start = week_start.replace(hour=0, minute=0, second=0, microsecond=0) week_data = weekly_data[week_start] week_data["count"] += 1 if metric.session_id: week_data["sessions"].add(metric.session_id) if metric.user_id: week_data["users"].add(metric.user_id) if metric.type == MetricType.ERROR_OCCURRED: week_data["errors"] += 1 if metric.duration_ms: week_data["duration_ms"] += metric.duration_ms if metric.token_count: week_data["tokens"] += metric.token_count # Convert to time series for week, data in sorted(weekly_data.items()): result.time_series.append({ "week_start": week.isoformat(), "metrics_count": data["count"], "sessions": len(data["sessions"]), "users": len(data["users"]), "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] }) async def _aggregate_monthly( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by month.""" monthly_data = defaultdict(lambda: { "count": 0, "sessions": set(), "users": set(), "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: # Round to month month = metric.timestamp.replace(day=1, hour=0, minute=0, second=0, microsecond=0) month_data = monthly_data[month] month_data["count"] += 1 if metric.session_id: month_data["sessions"].add(metric.session_id) if metric.user_id: month_data["users"].add(metric.user_id) if metric.type == MetricType.ERROR_OCCURRED: month_data["errors"] += 1 if metric.duration_ms: month_data["duration_ms"] += metric.duration_ms if metric.token_count: month_data["tokens"] += metric.token_count # Convert to time series for month, data in sorted(monthly_data.items()): result.time_series.append({ "month": month.isoformat(), "metrics_count": data["count"], "sessions": len(data["sessions"]), "users": len(data["users"]), "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] }) async def _aggregate_by_session( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by session.""" session_data = defaultdict(lambda: { "start_time": None, "end_time": None, "metrics_count": 0, "tool_uses": defaultdict(int), "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: if not metric.session_id: continue session = session_data[metric.session_id] session["metrics_count"] += 1 # Track time range if session["start_time"] is None or metric.timestamp < session["start_time"]: session["start_time"] = metric.timestamp if session["end_time"] is None or metric.timestamp > session["end_time"]: session["end_time"] = metric.timestamp # Track metrics if metric.tool_name: session["tool_uses"][metric.tool_name] += 1 if metric.type == MetricType.ERROR_OCCURRED: session["errors"] += 1 if metric.duration_ms: session["duration_ms"] += metric.duration_ms if metric.token_count: session["tokens"] += metric.token_count # Convert to time series for session_id, data in session_data.items(): duration_seconds = 0 if data["start_time"] and data["end_time"]: duration_seconds = (data["end_time"] - data["start_time"]).total_seconds() result.time_series.append({ "session_id": session_id, "start_time": data["start_time"].isoformat() if data["start_time"] else None, "end_time": data["end_time"].isoformat() if data["end_time"] else None, "duration_seconds": duration_seconds, "metrics_count": data["metrics_count"], "tool_uses": dict(data["tool_uses"]), "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] }) async def _aggregate_by_user( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by user.""" user_data = defaultdict(lambda: { "sessions": set(), "metrics_count": 0, "tool_uses": defaultdict(int), "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: if not metric.user_id: continue user = user_data[metric.user_id] user["metrics_count"] += 1 if metric.session_id: user["sessions"].add(metric.session_id) if metric.tool_name: user["tool_uses"][metric.tool_name] += 1 if metric.type == MetricType.ERROR_OCCURRED: user["errors"] += 1 if metric.duration_ms: user["duration_ms"] += metric.duration_ms if metric.token_count: user["tokens"] += metric.token_count # Convert to results for user_id, data in user_data.items(): result.time_series.append({ "user_id": user_id, "sessions": len(data["sessions"]), "metrics_count": data["metrics_count"], "tool_uses": dict(data["tool_uses"]), "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] }) async def _aggregate_by_tool( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by tool.""" # Tool usage is already aggregated in base stats # Create time series from tools_usage for tool_name, stats in result.tools_usage.items(): result.time_series.append({ "tool": tool_name, **stats }) async def _aggregate_by_agent( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by agent.""" # Agent usage is already aggregated in base stats # Create time series from agents_usage for agent_id, stats in result.agents_usage.items(): result.time_series.append({ "agent": agent_id, **stats }) async def _aggregate_by_project( self, metrics: List[ParsedMetric], result: AggregationResult ) -> None: """Aggregate metrics by project path.""" project_data = defaultdict(lambda: { "sessions": set(), "users": set(), "metrics_count": 0, "tool_uses": defaultdict(int), "errors": 0, "duration_ms": 0, "tokens": 0 }) for metric in metrics: # Extract project path from session data project_path = None if metric.type == MetricType.SESSION_START: project_path = metric.entry.data.get("project_path") if not project_path: continue project = project_data[project_path] project["metrics_count"] += 1 if metric.session_id: project["sessions"].add(metric.session_id) if metric.user_id: project["users"].add(metric.user_id) if metric.tool_name: project["tool_uses"][metric.tool_name] += 1 if metric.type == MetricType.ERROR_OCCURRED: project["errors"] += 1 if metric.duration_ms: project["duration_ms"] += metric.duration_ms if metric.token_count: project["tokens"] += metric.token_count # Convert to results for project_path, data in project_data.items(): result.time_series.append({ "project_path": project_path, "sessions": len(data["sessions"]), "users": len(data["users"]), "metrics_count": data["metrics_count"], "tool_uses": dict(data["tool_uses"]), "errors": data["errors"], "total_duration_ms": data["duration_ms"], "tokens": data["tokens"] })

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server