Skip to main content
Glama

MCP Git Server

by MementoRC
metrics_protocol.py16.4 kB
""" Metrics protocol definitions for performance monitoring and data collection. This module defines protocols for performance metrics collection, operation timing, success/failure tracking, and resource usage monitoring. """ from abc import abstractmethod from collections.abc import Callable, Iterator from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from typing import Any, Protocol class MetricType(Enum): """Enumeration of metric types.""" COUNTER = "counter" # Monotonically increasing values GAUGE = "gauge" # Current value at a point in time HISTOGRAM = "histogram" # Distribution of values TIMER = "timer" # Duration measurements RATE = "rate" # Events per unit time class MetricUnit(Enum): """Enumeration of metric units.""" SECONDS = "seconds" MILLISECONDS = "milliseconds" MICROSECONDS = "microseconds" BYTES = "bytes" KILOBYTES = "kilobytes" MEGABYTES = "megabytes" COUNT = "count" PERCENT = "percent" OPERATIONS_PER_SECOND = "ops/sec" @dataclass class MetricValue: """Data structure for metric values.""" name: str value: int | float metric_type: MetricType unit: MetricUnit timestamp: datetime tags: dict[str, str] metadata: dict[str, Any] @dataclass class TimingResult: """Data structure for timing measurements.""" operation_name: str duration: float unit: MetricUnit start_time: datetime end_time: datetime success: bool metadata: dict[str, Any] class MetricCollector(Protocol): """Protocol for collecting and recording metrics.""" @abstractmethod def record_counter( self, name: str, value: int | float = 1, tags: dict[str, str] | None = None, ) -> None: """ Record a counter metric (monotonically increasing). Args: name: Metric name value: Value to add to counter (default 1) tags: Optional tags for metric categorization Example: >>> collector = MyMetricCollector() >>> collector.record_counter("git.commits.created", 1, {"branch": "main"}) >>> collector.record_counter("api.requests.total") """ ... @abstractmethod def record_gauge( self, name: str, value: int | float, tags: dict[str, str] | None = None ) -> None: """ Record a gauge metric (current value). Args: name: Metric name value: Current value tags: Optional tags for metric categorization Example: >>> collector = MyMetricCollector() >>> collector.record_gauge("system.memory.usage", 75.5, {"unit": "percent"}) >>> collector.record_gauge("active.connections", 42) """ ... @abstractmethod def record_histogram( self, name: str, value: int | float, tags: dict[str, str] | None = None ) -> None: """ Record a histogram metric (value distribution). Args: name: Metric name value: Value to record in distribution tags: Optional tags for metric categorization Example: >>> collector = MyMetricCollector() >>> collector.record_histogram("request.size.bytes", 1024) >>> collector.record_histogram("operation.duration", 250.5, {"unit": "ms"}) """ ... @abstractmethod def record_timer( self, name: str, duration: float, unit: MetricUnit = MetricUnit.MILLISECONDS, tags: dict[str, str] | None = None, ) -> None: """ Record a timing metric. Args: name: Metric name duration: Duration value unit: Time unit for duration tags: Optional tags for metric categorization Example: >>> collector = MyMetricCollector() >>> collector.record_timer("git.clone.duration", 15.2, MetricUnit.SECONDS) >>> collector.record_timer("api.response.time", 150.0) """ ... @abstractmethod def increment(self, name: str, tags: dict[str, str] | None = None) -> None: """ Increment a counter by 1. Args: name: Metric name tags: Optional tags for metric categorization Example: >>> collector = MyMetricCollector() >>> collector.increment("errors.validation") >>> collector.increment("requests.processed", {"endpoint": "/status"}) """ ... @abstractmethod def decrement(self, name: str, tags: dict[str, str] | None = None) -> None: """ Decrement a gauge by 1. Args: name: Metric name tags: Optional tags for metric categorization """ ... class PerformanceTimer(Protocol): """Protocol for timing operations and measuring performance.""" @abstractmethod def start_timer( self, operation_name: str, metadata: dict[str, Any] | None = None ) -> str: """ Start timing an operation. Args: operation_name: Name of operation being timed metadata: Optional metadata about the operation Returns: Timer ID for stopping the timer Example: >>> timer = MyPerformanceTimer() >>> timer_id = timer.start_timer("git.push", {"branch": "main"}) >>> # ... perform operation ... >>> result = timer.stop_timer(timer_id) """ ... @abstractmethod def stop_timer(self, timer_id: str, success: bool = True) -> TimingResult: """ Stop a timer and get the result. Args: timer_id: ID returned from start_timer() success: Whether the operation succeeded Returns: TimingResult with duration and metadata Example: >>> timer = MyPerformanceTimer() >>> timer_id = timer.start_timer("database.query") >>> # ... perform database operation ... >>> result = timer.stop_timer(timer_id, success=True) >>> print(f"Query took {result.duration} {result.unit.value}") """ ... @abstractmethod def time_operation( self, operation_name: str, operation: Callable[[], Any], metadata: dict[str, Any] | None = None, ) -> TimingResult: """ Time a callable operation. Args: operation_name: Name of operation operation: Callable to time metadata: Optional metadata about the operation Returns: TimingResult with duration and operation result Example: >>> timer = MyPerformanceTimer() >>> def expensive_operation(): ... # ... do work ... ... return "result" >>> result = timer.time_operation("data.processing", expensive_operation) >>> print(f"Operation took {result.duration}ms") """ ... @abstractmethod def get_timing_stats(self, operation_name: str) -> dict[str, float]: """ Get timing statistics for an operation. Args: operation_name: Name of operation to get stats for Returns: Dictionary with timing statistics (avg, min, max, p95, etc.) Example: >>> timer = MyPerformanceTimer() >>> stats = timer.get_timing_stats("git.clone") >>> print(f"Average: {stats['avg']}ms, P95: {stats['p95']}ms") """ ... class SuccessFailureTracker(Protocol): """Protocol for tracking operation success and failure rates.""" @abstractmethod def record_success( self, operation: str, metadata: dict[str, Any] | None = None ) -> None: """ Record a successful operation. Args: operation: Name of operation that succeeded metadata: Optional metadata about the operation Example: >>> tracker = MySuccessFailureTracker() >>> tracker.record_success("git.commit", {"files": 3}) """ ... @abstractmethod def record_failure( self, operation: str, error_type: str, metadata: dict[str, Any] | None = None ) -> None: """ Record a failed operation. Args: operation: Name of operation that failed error_type: Type or category of error metadata: Optional metadata about the failure Example: >>> tracker = MySuccessFailureTracker() >>> tracker.record_failure("git.push", "authentication_error", {"repo": "origin"}) """ ... @abstractmethod def get_success_rate( self, operation: str, time_window: timedelta | None = None ) -> float: """ Get success rate for an operation. Args: operation: Name of operation time_window: Optional time window to calculate rate for Returns: Success rate as float between 0.0 and 1.0 Example: >>> tracker = MySuccessFailureTracker() >>> rate = tracker.get_success_rate("api.requests", timedelta(hours=1)) >>> print(f"Success rate: {rate:.2%}") """ ... @abstractmethod def get_failure_breakdown( self, operation: str, time_window: timedelta | None = None ) -> dict[str, int]: """ Get breakdown of failure types for an operation. Args: operation: Name of operation time_window: Optional time window to analyze Returns: Dictionary mapping error types to occurrence counts Example: >>> tracker = MySuccessFailureTracker() >>> failures = tracker.get_failure_breakdown("git.clone") >>> # {"network_error": 5, "auth_error": 2, "timeout": 1} """ ... class ResourceMonitor(Protocol): """Protocol for monitoring system resource usage.""" @abstractmethod def get_memory_usage(self) -> dict[str, float]: """ Get current memory usage statistics. Returns: Dictionary with memory statistics (used, available, percent, etc.) Example: >>> monitor = MyResourceMonitor() >>> memory = monitor.get_memory_usage() >>> print(f"Memory usage: {memory['percent']:.1f}%") """ ... @abstractmethod def get_cpu_usage(self) -> dict[str, float]: """ Get current CPU usage statistics. Returns: Dictionary with CPU statistics (percent, load average, etc.) """ ... @abstractmethod def get_disk_usage(self, path: str = "/") -> dict[str, float]: """ Get disk usage statistics for a path. Args: path: Path to check disk usage for Returns: Dictionary with disk statistics (used, free, percent, etc.) """ ... @abstractmethod def get_network_stats(self) -> dict[str, int]: """ Get network usage statistics. Returns: Dictionary with network statistics (bytes sent/received, packets, etc.) """ ... @abstractmethod def start_resource_monitoring(self, interval: float = 60.0) -> None: """ Start continuous resource monitoring. Args: interval: Monitoring interval in seconds """ ... @abstractmethod def stop_resource_monitoring(self) -> None: """Stop continuous resource monitoring.""" ... class MetricsAggregator(Protocol): """Protocol for aggregating and analyzing metrics.""" @abstractmethod def get_metric_summary( self, metric_name: str, time_window: timedelta | None = None ) -> dict[str, float]: """ Get summary statistics for a metric. Args: metric_name: Name of metric to summarize time_window: Optional time window for analysis Returns: Dictionary with summary statistics (count, avg, min, max, etc.) """ ... @abstractmethod def get_metrics_by_tag(self, tag_filter: dict[str, str]) -> list[MetricValue]: """ Get metrics matching tag filters. Args: tag_filter: Dictionary of tag key-value pairs to match Returns: List of MetricValue objects matching the filter """ ... @abstractmethod def export_metrics(self, format: str = "json") -> str: """ Export metrics in specified format. Args: format: Export format (json, csv, prometheus, etc.) Returns: Formatted metrics data """ ... @abstractmethod def get_top_metrics( self, metric_type: MetricType, limit: int = 10 ) -> list[MetricValue]: """ Get top metrics by value for a given type. Args: metric_type: Type of metrics to analyze limit: Maximum number of metrics to return Returns: List of top metrics sorted by value """ ... class MetricsSystem(Protocol): """ Comprehensive metrics system protocol. This protocol combines all metrics collection capabilities into a unified interface for components that need full metrics functionality. """ # Composition of all metrics sub-protocols collector: MetricCollector timer: PerformanceTimer tracker: SuccessFailureTracker resource_monitor: ResourceMonitor aggregator: MetricsAggregator @abstractmethod def initialize_metrics(self, config: dict[str, Any]) -> None: """ Initialize the metrics system with configuration. Args: config: Configuration dictionary with collection settings, etc. Example: >>> metrics_system = MyMetricsSystem() >>> config = { ... "collection_interval": 60, ... "retention_days": 30, ... "export_endpoints": ["prometheus", "grafana"] ... } >>> metrics_system.initialize_metrics(config) """ ... @abstractmethod def shutdown_metrics(self) -> None: """Gracefully shutdown the metrics system.""" ... @abstractmethod def get_system_health(self) -> dict[str, bool | float | int]: """ Get overall system health metrics. Returns: Dictionary with health indicators and key metrics Example: >>> metrics_system = MyMetricsSystem() >>> health = metrics_system.get_system_health() >>> print(f"System healthy: {health['healthy']}") >>> print(f"Error rate: {health['error_rate']:.2%}") """ ... @abstractmethod def create_dashboard_data(self, dashboard_type: str = "overview") -> dict[str, Any]: """ Create data for metrics dashboards. Args: dashboard_type: Type of dashboard (overview, performance, errors, etc.) Returns: Dictionary with dashboard data and visualization configs """ ... class AsyncMetricsSystem(Protocol): """Protocol for asynchronous metrics operations.""" @abstractmethod async def collect_metrics_async(self) -> None: """Async collection of all metrics.""" ... @abstractmethod async def export_metrics_async(self, endpoints: list[str]) -> dict[str, bool]: """ Async export to multiple endpoints. Args: endpoints: List of export endpoints Returns: Dictionary mapping endpoint to success status """ ... @abstractmethod async def metrics_stream( self, filters: dict[str, Any] | None = None ) -> Iterator[MetricValue]: """ Stream metrics as they are collected. Args: filters: Optional filters for metric types, names, etc. Yields: MetricValue objects as they are collected """ ...

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server