"""
Agent Orchestration Platform - Tool Execution Monitoring
This module implements comprehensive monitoring and metrics collection for FastMCP tool execution,
providing real-time performance tracking, error analysis, and operational intelligence.
Architecture Integration:
- Design Patterns: Observer pattern for metrics collection, Decorator pattern for tool instrumentation
- Security Model: Secure metrics collection with audit integration and privacy protection
- Performance Profile: O(1) metrics collection with efficient aggregation and minimal overhead
Technical Decisions:
- Async Monitoring: Non-blocking metrics collection with background processing
- Decorator-Based Instrumentation: Transparent tool monitoring with zero code changes
- Statistical Analysis: Comprehensive performance statistics with trend analysis
- Error Classification: Intelligent error categorization for operational insights
Dependencies & Integration:
- External: None beyond standard library for maximum reliability
- Internal: Monitoring system for metrics storage, audit logging for security events
Quality Assurance:
- Test Coverage: Property-based testing for metrics accuracy and performance impact
- Error Handling: Graceful degradation with monitoring system isolation
Author: Adder_5 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import functools
import statistics
import threading
import time
import traceback
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, cast
# Import monitoring infrastructure
from src.core.monitoring import MetricType, ServerMonitoring
from .contracts_shim import ensure, require
F = TypeVar("F", bound=Callable[..., Any])
class ExecutionStatus(Enum):
"""Tool execution status levels."""
SUCCESS = "success"
ERROR = "error"
TIMEOUT = "timeout"
VALIDATION_FAILED = "validation_failed"
SECURITY_VIOLATION = "security_violation"
class ErrorCategory(Enum):
"""Error category classification for analysis."""
USER_INPUT = auto()
SYSTEM_ERROR = auto()
NETWORK_ERROR = auto()
SECURITY_ERROR = auto()
VALIDATION_ERROR = auto()
TIMEOUT_ERROR = auto()
UNKNOWN_ERROR = auto()
@dataclass
class ToolExecutionMetrics:
"""Comprehensive metrics for a single tool execution."""
tool_name: str
execution_id: str
start_time: datetime
end_time: Optional[datetime]
duration_ms: Optional[float]
status: ExecutionStatus
error_message: Optional[str]
error_category: Optional[ErrorCategory]
input_size_bytes: int
output_size_bytes: int
memory_usage_mb: Optional[float]
cpu_time_ms: Optional[float]
user_id: Optional[str]
session_id: Optional[str]
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert execution metrics to dictionary for serialization."""
return {
"tool_name": self.tool_name,
"execution_id": self.execution_id,
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_ms": self.duration_ms,
"status": self.status.value,
"error_message": self.error_message,
"error_category": self.error_category.name if self.error_category else None,
"input_size_bytes": self.input_size_bytes,
"output_size_bytes": self.output_size_bytes,
"memory_usage_mb": self.memory_usage_mb,
"cpu_time_ms": self.cpu_time_ms,
"user_id": self.user_id,
"session_id": self.session_id,
"metadata": self.metadata,
}
@dataclass
class ToolPerformanceStats:
"""Performance statistics for a tool over time."""
tool_name: str
total_executions: int
successful_executions: int
failed_executions: int
avg_duration_ms: float
min_duration_ms: float
max_duration_ms: float
p95_duration_ms: float
p99_duration_ms: float
success_rate: float
error_rate: float
avg_input_size_bytes: float
avg_output_size_bytes: float
last_execution: Optional[datetime]
error_distribution: Dict[str, int]
trend_7d: List[float] # Daily success rates for last 7 days
def to_dict(self) -> Dict[str, Any]:
"""Convert performance stats to dictionary for serialization."""
return {
"tool_name": self.tool_name,
"total_executions": self.total_executions,
"successful_executions": self.successful_executions,
"failed_executions": self.failed_executions,
"avg_duration_ms": self.avg_duration_ms,
"min_duration_ms": self.min_duration_ms,
"max_duration_ms": self.max_duration_ms,
"p95_duration_ms": self.p95_duration_ms,
"p99_duration_ms": self.p99_duration_ms,
"success_rate": self.success_rate,
"error_rate": self.error_rate,
"avg_input_size_bytes": self.avg_input_size_bytes,
"avg_output_size_bytes": self.avg_output_size_bytes,
"last_execution": (
self.last_execution.isoformat() if self.last_execution else None
),
"error_distribution": self.error_distribution,
"trend_7d": self.trend_7d,
}
class ToolExecutionContext:
"""Context manager for tool execution monitoring."""
def __init__(
self,
tool_name: str,
monitor: "ToolExecutionMonitor",
user_id: Optional[str] = None,
session_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
):
"""Initialize execution context."""
self.tool_name = tool_name
self.monitor = monitor
self.user_id = user_id
self.session_id = session_id
self.metadata = metadata or {}
self.execution_id = f"{tool_name}_{int(time.time() * 1000000)}"
self.metrics: Optional[ToolExecutionMetrics] = None
self.start_time: Optional[datetime] = None
def __enter__(self) -> "ToolExecutionContext":
"""Start execution monitoring."""
self.start_time = datetime.utcnow()
self.metrics = ToolExecutionMetrics(
tool_name=self.tool_name,
execution_id=self.execution_id,
start_time=self.start_time,
end_time=None,
duration_ms=None,
status=ExecutionStatus.SUCCESS,
error_message=None,
error_category=None,
input_size_bytes=0,
output_size_bytes=0,
memory_usage_mb=None,
cpu_time_ms=None,
user_id=self.user_id,
session_id=self.session_id,
metadata=self.metadata,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Complete execution monitoring."""
if self.metrics and self.start_time:
end_time = datetime.utcnow()
duration = (end_time - self.start_time).total_seconds() * 1000
self.metrics.end_time = end_time
self.metrics.duration_ms = duration
# Handle exceptions
if exc_type is not None:
self.metrics.status = ExecutionStatus.ERROR
self.metrics.error_message = (
str(exc_val) if exc_val else "Unknown error"
)
self.metrics.error_category = self._classify_error(exc_type, exc_val)
# Record metrics
self.monitor._record_execution_metrics(self.metrics)
def set_input_size(self, size_bytes: int) -> None:
"""Set input size for the execution."""
if self.metrics:
self.metrics.input_size_bytes = size_bytes
def set_output_size(self, size_bytes: int) -> None:
"""Set output size for the execution."""
if self.metrics:
self.metrics.output_size_bytes = size_bytes
def set_memory_usage(self, memory_mb: float) -> None:
"""Set memory usage for the execution."""
if self.metrics:
self.metrics.memory_usage_mb = memory_mb
def add_metadata(self, key: str, value: Any) -> None:
"""Add metadata to the execution."""
if self.metrics:
self.metrics.metadata[key] = value
def _classify_error(self, exc_type: type, exc_val: Exception) -> ErrorCategory:
"""Classify error type for analysis."""
if exc_type.__name__ in ["ValidationError", "ValueError", "TypeError"]:
return ErrorCategory.VALIDATION_ERROR
elif exc_type.__name__ in ["TimeoutError", "asyncio.TimeoutError"]:
return ErrorCategory.TIMEOUT_ERROR
elif exc_type.__name__ in [
"PermissionError",
"SecurityError",
"AuthenticationError",
]:
return ErrorCategory.SECURITY_ERROR
elif exc_type.__name__ in ["ConnectionError", "NetworkError", "HTTPError"]:
return ErrorCategory.NETWORK_ERROR
elif "user" in str(exc_val).lower() or "input" in str(exc_val).lower():
return ErrorCategory.USER_INPUT
elif exc_type.__name__ in ["SystemError", "OSError", "RuntimeError"]:
return ErrorCategory.SYSTEM_ERROR
else:
return ErrorCategory.UNKNOWN_ERROR
class ToolExecutionMonitor:
"""
Comprehensive tool execution monitor with performance tracking and analysis.
Implements detailed monitoring of tool executions with statistical analysis,
error classification, and performance optimization insights.
"""
def __init__(self, monitoring_system: Optional[ServerMonitoring] = None):
"""Initialize tool execution monitor."""
self.monitoring_system = monitoring_system
self._execution_history: deque = deque(maxlen=10000)
self._tool_stats: Dict[str, Dict[str, Any]] = defaultdict(dict)
self._lock = threading.Lock()
# Performance tracking
self._duration_buckets = [
1,
5,
10,
50,
100,
500,
1000,
5000,
10000,
] # milliseconds
self._duration_histograms: Dict[str, List[int]] = defaultdict(
lambda: [0] * len(self._duration_buckets)
)
# Error tracking
self._error_history: deque = deque(maxlen=1000)
self._recent_errors: Dict[str, List[datetime]] = defaultdict(list)
def create_execution_context(
self,
tool_name: str,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> ToolExecutionContext:
"""Create execution context for monitoring a tool execution."""
return ToolExecutionContext(
tool_name=tool_name,
monitor=self,
user_id=user_id,
session_id=session_id,
metadata=metadata,
)
def _record_execution_metrics(self, metrics: ToolExecutionMetrics) -> None:
"""Record execution metrics for analysis."""
with self._lock:
# Add to execution history
self._execution_history.append(metrics)
# Update tool statistics
self._update_tool_stats(metrics)
# Update duration histogram
self._update_duration_histogram(metrics)
# Record error if applicable
if metrics.status != ExecutionStatus.SUCCESS:
self._error_history.append(metrics)
self._recent_errors[metrics.tool_name].append(metrics.start_time)
# Keep only recent errors (last 24 hours)
cutoff_time = datetime.utcnow() - timedelta(hours=24)
self._recent_errors[metrics.tool_name] = [
error_time
for error_time in self._recent_errors[metrics.tool_name]
if error_time > cutoff_time
]
# Record metrics in monitoring system
if self.monitoring_system:
self._record_in_monitoring_system(metrics)
def _update_tool_stats(self, metrics: ToolExecutionMetrics) -> None:
"""Update tool performance statistics."""
tool_name = metrics.tool_name
if tool_name not in self._tool_stats:
self._tool_stats[tool_name] = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"durations": [],
"input_sizes": [],
"output_sizes": [],
"error_counts": defaultdict(int),
"last_execution": None,
}
stats = self._tool_stats[tool_name]
stats["total_executions"] += 1
if metrics.status == ExecutionStatus.SUCCESS:
stats["successful_executions"] += 1
else:
stats["failed_executions"] += 1
if metrics.error_category:
stats["error_counts"][metrics.error_category.name] += 1
if metrics.duration_ms is not None:
stats["durations"].append(metrics.duration_ms)
# Keep only recent durations for performance
if len(stats["durations"]) > 1000:
stats["durations"] = stats["durations"][-500:]
stats["input_sizes"].append(metrics.input_size_bytes)
stats["output_sizes"].append(metrics.output_size_bytes)
stats["last_execution"] = metrics.end_time or metrics.start_time
def _update_duration_histogram(self, metrics: ToolExecutionMetrics) -> None:
"""Update duration histogram for performance analysis."""
if metrics.duration_ms is None:
return
tool_name = metrics.tool_name
duration = metrics.duration_ms
# Find appropriate bucket
bucket_index = len(self._duration_buckets) - 1
for i, bucket_limit in enumerate(self._duration_buckets):
if duration <= bucket_limit:
bucket_index = i
break
self._duration_histograms[tool_name][bucket_index] += 1
def _record_in_monitoring_system(self, metrics: ToolExecutionMetrics) -> None:
"""Record metrics in the monitoring system."""
try:
tool_name = metrics.tool_name
# Record execution count
self.monitoring_system.metrics_collector.increment_counter(
f"tool.{tool_name}.executions.total"
)
# Record success/failure
if metrics.status == ExecutionStatus.SUCCESS:
self.monitoring_system.metrics_collector.increment_counter(
f"tool.{tool_name}.executions.success"
)
else:
self.monitoring_system.metrics_collector.increment_counter(
f"tool.{tool_name}.executions.failure"
)
# Record duration
if metrics.duration_ms is not None:
self.monitoring_system.metrics_collector.record_timer(
f"tool.{tool_name}.duration", metrics.duration_ms
)
# Record data sizes
self.monitoring_system.metrics_collector.set_gauge(
f"tool.{tool_name}.input_size_bytes", metrics.input_size_bytes
)
self.monitoring_system.metrics_collector.set_gauge(
f"tool.{tool_name}.output_size_bytes", metrics.output_size_bytes
)
except Exception:
# Best effort metrics recording
pass
def get_tool_performance_stats(
self, tool_name: str
) -> Optional[ToolPerformanceStats]:
"""Get comprehensive performance statistics for a tool."""
with self._lock:
if tool_name not in self._tool_stats:
return None
stats = self._tool_stats[tool_name]
durations = stats["durations"]
if not durations:
return None
# Calculate performance statistics
total_executions = stats["total_executions"]
successful_executions = stats["successful_executions"]
failed_executions = stats["failed_executions"]
avg_duration = statistics.mean(durations)
min_duration = min(durations)
max_duration = max(durations)
# Calculate percentiles
sorted_durations = sorted(durations)
p95_duration = (
sorted_durations[int(len(sorted_durations) * 0.95)]
if sorted_durations
else 0
)
p99_duration = (
sorted_durations[int(len(sorted_durations) * 0.99)]
if sorted_durations
else 0
)
success_rate = (
(successful_executions / total_executions) * 100
if total_executions > 0
else 0
)
error_rate = (
(failed_executions / total_executions) * 100
if total_executions > 0
else 0
)
avg_input_size = (
statistics.mean(stats["input_sizes"]) if stats["input_sizes"] else 0
)
avg_output_size = (
statistics.mean(stats["output_sizes"]) if stats["output_sizes"] else 0
)
# Calculate 7-day trend
trend_7d = self._calculate_success_trend(tool_name, days=7)
return ToolPerformanceStats(
tool_name=tool_name,
total_executions=total_executions,
successful_executions=successful_executions,
failed_executions=failed_executions,
avg_duration_ms=avg_duration,
min_duration_ms=min_duration,
max_duration_ms=max_duration,
p95_duration_ms=p95_duration,
p99_duration_ms=p99_duration,
success_rate=success_rate,
error_rate=error_rate,
avg_input_size_bytes=avg_input_size,
avg_output_size_bytes=avg_output_size,
last_execution=stats["last_execution"],
error_distribution=dict(stats["error_counts"]),
trend_7d=trend_7d,
)
def _calculate_success_trend(self, tool_name: str, days: int = 7) -> List[float]:
"""Calculate success rate trend for the specified number of days."""
cutoff_time = datetime.utcnow() - timedelta(days=days)
# Get executions for the time period
tool_executions = [
metrics
for metrics in self._execution_history
if metrics.tool_name == tool_name and metrics.start_time > cutoff_time
]
if not tool_executions:
return [0.0] * days
# Group by day and calculate success rates
daily_rates = []
for day in range(days):
day_start = cutoff_time + timedelta(days=day)
day_end = day_start + timedelta(days=1)
day_executions = [
metrics
for metrics in tool_executions
if day_start <= metrics.start_time < day_end
]
if day_executions:
successful = sum(
1
for metrics in day_executions
if metrics.status == ExecutionStatus.SUCCESS
)
success_rate = (successful / len(day_executions)) * 100
daily_rates.append(success_rate)
else:
daily_rates.append(0.0)
return daily_rates
def get_all_tool_stats(self) -> Dict[str, ToolPerformanceStats]:
"""Get performance statistics for all tools."""
stats = {}
for tool_name in self._tool_stats.keys():
tool_stats = self.get_tool_performance_stats(tool_name)
if tool_stats:
stats[tool_name] = tool_stats
return stats
def get_recent_errors(
self, tool_name: Optional[str] = None, hours: int = 24
) -> List[ToolExecutionMetrics]:
"""Get recent errors for analysis."""
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
with self._lock:
recent_errors = [
metrics
for metrics in self._error_history
if metrics.start_time > cutoff_time
and (tool_name is None or metrics.tool_name == tool_name)
]
return recent_errors
def get_duration_histogram(self, tool_name: str) -> Dict[str, int]:
"""Get duration histogram for performance analysis."""
with self._lock:
histogram = self._duration_histograms.get(
tool_name, [0] * len(self._duration_buckets)
)
return {
f"<= {self._duration_buckets[i]}ms": count
for i, count in enumerate(histogram)
}
def reset_stats(self, tool_name: Optional[str] = None) -> None:
"""Reset statistics for specified tool or all tools."""
with self._lock:
if tool_name:
if tool_name in self._tool_stats:
del self._tool_stats[tool_name]
if tool_name in self._duration_histograms:
self._duration_histograms[tool_name] = [0] * len(
self._duration_buckets
)
else:
self._tool_stats.clear()
self._duration_histograms.clear()
self._execution_history.clear()
self._error_history.clear()
self._recent_errors.clear()
def monitor_tool_execution(
monitor: ToolExecutionMonitor,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Callable[[F], F]:
"""
Decorator for automatic tool execution monitoring.
Provides transparent monitoring of tool execution with comprehensive
metrics collection and performance analysis.
"""
def decorator(func: F) -> F:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
# Extract tool name from function
tool_name = func.__name__.replace("_tool", "").replace("_", "-")
# Create execution context
with monitor.create_execution_context(
tool_name=tool_name,
user_id=user_id,
session_id=session_id,
metadata=metadata,
) as context:
# Calculate input size
input_str = str(args) + str(kwargs)
context.set_input_size(len(input_str.encode("utf-8")))
# Execute function
result = await func(*args, **kwargs)
# Calculate output size
output_str = str(result)
context.set_output_size(len(output_str.encode("utf-8")))
return result
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Extract tool name from function
tool_name = func.__name__.replace("_tool", "").replace("_", "-")
# Create execution context
with monitor.create_execution_context(
tool_name=tool_name,
user_id=user_id,
session_id=session_id,
metadata=metadata,
) as context:
# Calculate input size
input_str = str(args) + str(kwargs)
context.set_input_size(len(input_str.encode("utf-8")))
# Execute function
result = func(*args, **kwargs)
# Calculate output size
output_str = str(result)
context.set_output_size(len(output_str.encode("utf-8")))
return result
# Return appropriate wrapper based on function type
if asyncio.iscoroutinefunction(func):
return cast(F, async_wrapper)
else:
return cast(F, sync_wrapper)
return decorator
# Global tool execution monitor
_global_monitor: Optional[ToolExecutionMonitor] = None
def get_global_monitor() -> ToolExecutionMonitor:
"""Get or create global tool execution monitor."""
global _global_monitor
if _global_monitor is None:
_global_monitor = ToolExecutionMonitor()
return _global_monitor
def set_global_monitor(monitor: ToolExecutionMonitor) -> None:
"""Set global tool execution monitor."""
global _global_monitor
_global_monitor = monitor