Skip to main content
Glama

MCP Git Server

by MementoRC
metrics.py4.23 kB
import asyncio import time from collections import defaultdict from typing import Any class MetricsCollector: """ Global metrics collector for MCP Git Server. Aggregates server-wide metrics, performance, and health. Thread-safe for async operations. """ def __init__(self): self._lock = asyncio.Lock() self._metrics = { "messages_processed": 0, "operations": defaultdict(int), "errors": defaultdict(int), "message_durations_ms": [], "operation_durations_ms": [], "active_sessions": 0, "session_events": defaultdict(int), "last_health_check": None, "startup_time": time.time(), } async def record_message(self, message_type: str, duration_ms: float): async with self._lock: self._metrics["messages_processed"] += 1 self._metrics["operations"][message_type] += 1 self._metrics["message_durations_ms"].append(duration_ms) async def record_session_event(self, event_type: str): async with self._lock: self._metrics["session_events"][event_type] += 1 if event_type == "session_started": self._metrics["active_sessions"] += 1 elif event_type in ("session_closed", "session_terminated"): self._metrics["active_sessions"] = max( 0, self._metrics["active_sessions"] - 1 ) async def record_operation( self, operation_type: str, success: bool, duration_ms: float | None = None ): async with self._lock: self._metrics["operations"][operation_type] += 1 if not success: self._metrics["errors"][operation_type] += 1 if duration_ms is not None: self._metrics["operation_durations_ms"].append(duration_ms) async def record_error(self, error_type: str): async with self._lock: self._metrics["errors"][error_type] += 1 async def get_metrics(self) -> dict[str, Any]: async with self._lock: # Compute summary stats durations = self._metrics["message_durations_ms"] op_durations = self._metrics["operation_durations_ms"] avg_msg_duration = sum(durations) / len(durations) if durations else 0 avg_op_duration = ( sum(op_durations) / len(op_durations) if op_durations else 0 ) return { "messages_processed": self._metrics["messages_processed"], "operations": dict(self._metrics["operations"]), "errors": dict(self._metrics["errors"]), "active_sessions": self._metrics["active_sessions"], "session_events": dict(self._metrics["session_events"]), "avg_message_duration_ms": avg_msg_duration, "avg_operation_duration_ms": avg_op_duration, "uptime_sec": time.time() - self._metrics["startup_time"], } async def get_health_status(self) -> dict[str, Any]: async with self._lock: health = { "uptime_sec": time.time() - self._metrics["startup_time"], "active_sessions": self._metrics["active_sessions"], "messages_processed": self._metrics["messages_processed"], "error_count": sum(self._metrics["errors"].values()), "last_health_check": time.time(), } self._metrics["last_health_check"] = health["last_health_check"] return health async def reset(self): async with self._lock: self._metrics = { "messages_processed": 0, "operations": defaultdict(int), "errors": defaultdict(int), "message_durations_ms": [], "operation_durations_ms": [], "active_sessions": 0, "session_events": defaultdict(int), "last_health_check": None, "startup_time": time.time(), } # Singleton instance for global use global_metrics_collector = MetricsCollector()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server