OpenAI MCP Server
by arthurcolle
- claude_code
- lib
- monitoring
#!/usr/bin/env python3
"""Module for tracking MCP server metrics."""
import os
import time
import json
import logging
import threading
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime, timedelta
from collections import deque, Counter
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ServerMetrics:
"""Tracks MCP server metrics for visualization."""
def __init__(self, history_size: int = 100, save_interval: int = 60):
"""Initialize the server metrics tracker.
Args:
history_size: Number of data points to keep in history
save_interval: How often to save metrics to disk (in seconds)
"""
self._start_time = time.time()
self._lock = threading.RLock()
self._history_size = history_size
self._save_interval = save_interval
self._save_path = os.path.expanduser("~/.config/claude_code/metrics.json")
# Ensure directory exists
os.makedirs(os.path.dirname(self._save_path), exist_ok=True)
# Metrics
self._request_history = deque(maxlen=history_size)
self._tool_calls = Counter()
self._resource_calls = Counter()
self._connections = 0
self._active_connections = set()
self._errors = Counter()
# Time series data for charts
self._time_series = {
"tool_calls": deque([(time.time(), 0)] * 10, maxlen=10),
"resource_calls": deque([(time.time(), 0)] * 10, maxlen=10)
}
# Start auto-save thread
self._running = True
self._save_thread = threading.Thread(target=self._auto_save, daemon=True)
self._save_thread.start()
# Load previous metrics if available
self._load_metrics()
def _auto_save(self):
"""Periodically save metrics to disk."""
while self._running:
time.sleep(self._save_interval)
try:
self.save_metrics()
except Exception as e:
logger.error(f"Error saving metrics: {e}")
def _load_metrics(self):
"""Load metrics from disk if available."""
try:
if os.path.exists(self._save_path):
with open(self._save_path, 'r', encoding='utf-8') as f:
data = json.load(f)
with self._lock:
# Load previous tool and resource calls
self._tool_calls = Counter(data.get("tool_calls", {}))
self._resource_calls = Counter(data.get("resource_calls", {}))
# Don't load time-sensitive data like connections and history
logger.info(f"Loaded metrics from {self._save_path}")
except Exception as e:
logger.error(f"Error loading metrics: {e}")
def save_metrics(self):
"""Save metrics to disk."""
try:
with self._lock:
data = {
"tool_calls": dict(self._tool_calls),
"resource_calls": dict(self._resource_calls),
"total_connections": self._connections,
"last_saved": time.time()
}
with open(self._save_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
logger.debug(f"Metrics saved to {self._save_path}")
except Exception as e:
logger.error(f"Error saving metrics: {e}")
def log_tool_call(self, tool_name: str, success: bool = True):
"""Log a tool call.
Args:
tool_name: The name of the tool that was called
success: Whether the call was successful
"""
with self._lock:
self._tool_calls[tool_name] += 1
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "tool",
"name": tool_name,
"success": success,
"timestamp": timestamp
})
# Update time series
current_time = time.time()
last_time, count = self._time_series["tool_calls"][-1]
if current_time - last_time < 60: # Less than a minute
self._time_series["tool_calls"][-1] = (last_time, count + 1)
else:
self._time_series["tool_calls"].append((current_time, 1))
def log_resource_request(self, resource_uri: str, success: bool = True):
"""Log a resource request.
Args:
resource_uri: The URI of the requested resource
success: Whether the request was successful
"""
with self._lock:
self._resource_calls[resource_uri] += 1
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "resource",
"uri": resource_uri,
"success": success,
"timestamp": timestamp
})
# Update time series
current_time = time.time()
last_time, count = self._time_series["resource_calls"][-1]
if current_time - last_time < 60: # Less than a minute
self._time_series["resource_calls"][-1] = (last_time, count + 1)
else:
self._time_series["resource_calls"].append((current_time, 1))
def log_connection(self, client_id: str, connected: bool = True):
"""Log a client connection or disconnection.
Args:
client_id: Client identifier
connected: True for connection, False for disconnection
"""
with self._lock:
if connected:
self._connections += 1
self._active_connections.add(client_id)
else:
self._active_connections.discard(client_id)
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "connection",
"client_id": client_id,
"action": "connect" if connected else "disconnect",
"timestamp": timestamp
})
def log_error(self, error_type: str, message: str):
"""Log an error.
Args:
error_type: Type of error
message: Error message
"""
with self._lock:
self._errors[error_type] += 1
# Add to request history
timestamp = time.time()
self._request_history.append({
"type": "error",
"error_type": error_type,
"message": message,
"timestamp": timestamp
})
def get_uptime(self) -> str:
"""Get the server uptime as a human-readable string.
Returns:
Uptime string (e.g., "2 hours 15 minutes")
"""
uptime_seconds = time.time() - self._start_time
uptime = timedelta(seconds=int(uptime_seconds))
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days > 0:
parts.append(f"{days} {'day' if days == 1 else 'days'}")
if hours > 0 or days > 0:
parts.append(f"{hours} {'hour' if hours == 1 else 'hours'}")
if minutes > 0 or hours > 0 or days > 0:
parts.append(f"{minutes} {'minute' if minutes == 1 else 'minutes'}")
if not parts:
return f"{seconds} seconds"
return " ".join(parts)
def get_active_connections_count(self) -> int:
"""Get the number of active connections.
Returns:
Number of active connections
"""
with self._lock:
return len(self._active_connections)
def get_total_connections(self) -> int:
"""Get the total number of connections since startup.
Returns:
Total connection count
"""
with self._lock:
return self._connections
def get_recent_activity(self, count: int = 10) -> List[Dict[str, Any]]:
"""Get recent activity.
Args:
count: Number of recent events to return
Returns:
List of recent activity events
"""
with self._lock:
recent = list(self._request_history)[-count:]
# Format timestamps
for event in recent:
ts = event["timestamp"]
event["formatted_time"] = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
return recent
def get_tool_usage_stats(self) -> Dict[str, int]:
"""Get statistics on tool usage.
Returns:
Dictionary mapping tool names to call counts
"""
with self._lock:
return dict(self._tool_calls)
def get_resource_usage_stats(self) -> Dict[str, int]:
"""Get statistics on resource usage.
Returns:
Dictionary mapping resource URIs to request counts
"""
with self._lock:
return dict(self._resource_calls)
def get_error_stats(self) -> Dict[str, int]:
"""Get statistics on errors.
Returns:
Dictionary mapping error types to counts
"""
with self._lock:
return dict(self._errors)
def get_time_series_data(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get time series data for charts.
Returns:
Dictionary with time series data
"""
with self._lock:
result = {}
# Convert deques to lists of dictionaries
for series_name, series_data in self._time_series.items():
result[series_name] = [
{"timestamp": ts, "value": val, "formatted_time": datetime.fromtimestamp(ts).strftime("%H:%M:%S")}
for ts, val in series_data
]
return result
def get_all_metrics(self) -> Dict[str, Any]:
"""Get all metrics data.
Returns:
Dictionary with all metrics
"""
return {
"uptime": self.get_uptime(),
"active_connections": self.get_active_connections_count(),
"total_connections": self.get_total_connections(),
"recent_activity": self.get_recent_activity(20),
"tool_usage": self.get_tool_usage_stats(),
"resource_usage": self.get_resource_usage_stats(),
"errors": self.get_error_stats(),
"time_series": self.get_time_series_data()
}
def reset_stats(self):
"""Reset all statistics but keep the start time."""
with self._lock:
self._request_history.clear()
self._tool_calls.clear()
self._resource_calls.clear()
self._connections = 0
self._active_connections.clear()
self._errors.clear()
# Reset time series
current_time = time.time()
self._time_series = {
"tool_calls": deque([(current_time - (600 - i * 60), 0) for i in range(10)], maxlen=10),
"resource_calls": deque([(current_time - (600 - i * 60), 0) for i in range(10)], maxlen=10)
}
def shutdown(self):
"""Shutdown the metrics tracker and save data."""
self._running = False
self.save_metrics()
# Singleton instance
_metrics_instance = None
def get_metrics() -> ServerMetrics:
"""Get or create the singleton metrics instance.
Returns:
ServerMetrics instance
"""
global _metrics_instance
if _metrics_instance is None:
_metrics_instance = ServerMetrics()
return _metrics_instance