Skip to main content
Glama

MCP Git Server

by MementoRC
optimizations.py14.3 kB
"""Performance optimizations for MCP Git Server.""" import json import logging import threading import time from collections.abc import Callable from functools import ( _CacheInfo, # Import the internal CacheInfo type for type hinting lru_cache, wraps, ) from typing import ( Any, Protocol, runtime_checkable, ) from .models.validation import ValidationResult logger = logging.getLogger(__name__) # --- CPU Profiling Utilities --- class CPUProfiler: """ Context manager and utility for CPU profiling using cProfile. Can be used in production or test to profile code blocks. """ def __init__(self, profile_name: str = "cpu_profile", enabled: bool = True): self.profile_name = profile_name self.enabled = enabled self.profiler = None self.stats_output = None def __enter__(self): if self.enabled: import cProfile self.profiler = cProfile.Profile() self.profiler.enable() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.enabled and self.profiler: import pstats self.profiler.disable() import io s = io.StringIO() ps = pstats.Stats(self.profiler, stream=s).sort_stats("cumulative") ps.print_stats(30) # Print top 30 functions self.stats_output = s.getvalue() logger.info(f"CPU Profile [{self.profile_name}]:\n{self.stats_output}") def get_stats(self) -> str | None: return self.stats_output def profile_cpu_block(name: str = "cpu_profile", enabled: bool = True): """ Decorator/context for profiling a function or code block. """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): with CPUProfiler(profile_name=name, enabled=enabled): return func(*args, **kwargs) return wrapper return decorator # --- Memory Leak Detection Utilities --- class MemoryLeakDetector: """ Utility for detecting memory leaks by tracking object counts and memory usage. """ def __init__(self): import gc import tracemalloc self.gc = gc self.tracemalloc = tracemalloc self.snapshots: list[tuple[float, int, int]] = [] self.tracemalloc.start() def take_snapshot(self, label: str = ""): self.gc.collect() current, peak = self.tracemalloc.get_traced_memory() obj_count = len(self.gc.get_objects()) timestamp = time.time() self.snapshots.append((timestamp, current, obj_count)) logger.info( f"[MemoryLeakDetector] {label}: {current / 1024 / 1024:.2f} MB, {obj_count} objects" ) def report_growth(self) -> dict[str, float]: if len(self.snapshots) < 2: return {"memory_growth_mb": 0.0, "object_growth": 0} start = self.snapshots[0] end = self.snapshots[-1] mem_growth = (end[1] - start[1]) / 1024 / 1024 obj_growth = end[2] - start[2] logger.info( f"[MemoryLeakDetector] Memory growth: {mem_growth:.2f} MB, Object growth: {obj_growth}" ) return {"memory_growth_mb": mem_growth, "object_growth": obj_growth} def stop(self): self.tracemalloc.stop() # --- Performance Regression Detection --- class PerformanceRegressionMonitor: """ Tracks and detects performance regressions based on historical baselines. """ def __init__(self): self.baselines: dict[str, float] = {} self.regressions: list[str] = [] def set_baseline(self, test_name: str, value: float): self.baselines[test_name] = value def check(self, test_name: str, value: float, threshold: float = 1.2) -> bool: """ Returns True if regression detected (value is threshold*baseline or worse). """ baseline = self.baselines.get(test_name) if baseline is None: logger.info( f"[RegressionMonitor] No baseline for {test_name}, setting to {value:.4f}" ) self.set_baseline(test_name, value) return False if value > baseline * threshold: logger.warning( f"[RegressionMonitor] Regression detected in {test_name}: {value:.4f} vs baseline {baseline:.4f}" ) self.regressions.append(test_name) return True logger.info( f"[RegressionMonitor] {test_name}: {value:.4f} (baseline {baseline:.4f})" ) return False def get_regressions(self) -> list[str]: return self.regressions # --- Production Performance Monitoring --- class PerformanceMonitor: """ Thread-safe, lightweight performance monitor for production. Tracks operation timings, counts, and can emit periodic reports. """ def __init__(self, name: str, report_interval: float = 60.0): self.name = name self.report_interval = report_interval self.lock = threading.Lock() self.timings: list[float] = [] self.count = 0 self.last_report = time.time() def record(self, duration: float): with self.lock: self.timings.append(duration) self.count += 1 now = time.time() if now - self.last_report > self.report_interval: self.report() self.last_report = now def report(self): with self.lock: if not self.timings: return avg = sum(self.timings) / len(self.timings) p95 = ( sorted(self.timings)[int(0.95 * len(self.timings)) - 1] if len(self.timings) > 1 else avg ) logger.info( f"[PerfMonitor:{self.name}] Count: {self.count}, Avg: {avg:.6f}s, P95: {p95:.6f}s" ) self.timings.clear() self.count = 0 def get_stats(self) -> dict[str, Any]: with self.lock: if not self.timings: return {"count": 0, "avg": 0.0, "p95": 0.0} avg = sum(self.timings) / len(self.timings) p95 = ( sorted(self.timings)[int(0.95 * len(self.timings)) - 1] if len(self.timings) > 1 else avg ) return {"count": self.count, "avg": avg, "p95": p95} # Global production monitor for message processing message_perf_monitor = PerformanceMonitor("message_processing", report_interval=60.0) # Define a protocol for the LRU cached function to include cache_info and cache_clear @runtime_checkable class LRUCachedFunction(Protocol): def __call__(self, *args: Any, **kwargs: Any) -> Any: ... def cache_info(self) -> _CacheInfo: ... # Use the actual CacheInfo type def cache_clear(self) -> None: ... # This will hold the reference to the actual cached function _cached_parse_function: LRUCachedFunction | None = None _cache_maxsize: int = 1024 _cache_enabled: bool = True def _get_cache_info(): """Helper to get cache info if the function is cached.""" if _cached_parse_function is not None: return _cached_parse_function.cache_info() return None def _clear_cache(): """Helper to clear the cache if the function is cached.""" if _cached_parse_function is not None: _cached_parse_function.cache_clear() logger.info("Validation cache cleared.") def enable_validation_cache(): """Enables the validation cache.""" global _cache_enabled _cache_enabled = True logger.info("Validation cache enabled.") def disable_validation_cache(): """Disables the validation cache.""" global _cache_enabled _cache_enabled = False logger.info("Validation cache disabled.") def clear_validation_cache(): """Clears all items from the validation cache.""" _clear_cache() def get_validation_cache_stats() -> dict[str, Any]: """Returns validation cache statistics.""" info = _get_cache_info() if info: return { "hits": info.hits, "misses": info.misses, "current_size": info.currsize, "max_size": info.maxsize, "enabled": _cache_enabled, } else: return { "hits": 0, "misses": 0, "current_size": 0, "max_size": _cache_maxsize, "enabled": _cache_enabled, } def _create_cache_key(data: dict[str, Any]) -> str: """Create a stable cache key from message data.""" try: # Create a reproducible key from the essential fields key_data = { "method": data.get("method", "unknown"), "jsonrpc": data.get("jsonrpc", "2.0"), } # Add params if present, but normalize them if "params" in data: params = data["params"] if isinstance(params, dict): # Sort keys to ensure consistent ordering key_data["params"] = json.dumps(params, sort_keys=True) else: key_data["params"] = str(params) # Create final key return json.dumps(key_data, sort_keys=True) except Exception: # Fallback to string representation return str(data) def apply_validation_cache( func: Callable[[dict[str, Any]], ValidationResult], ) -> Callable[[dict[str, Any]], ValidationResult]: """ Decorator to apply caching to validation functions. This creates an LRU cache that can be enabled/disabled and provides cache statistics for performance monitoring. """ global _cached_parse_function # Create the cached version of the function @lru_cache(maxsize=_cache_maxsize) def _cached_validation(cache_key: str, original_data_str: str) -> ValidationResult: """Internal cached function that works on cache keys.""" # Reconstruct the original data and call the function try: data = json.loads(original_data_str) return func(data) except Exception as e: logger.error(f"Cache key reconstruction failed: {e}") # Return error result return ValidationResult(error=e, raw_data={}) @wraps(func) def wrapper(data: dict[str, Any]) -> ValidationResult: """Wrapper that handles caching logic.""" if not _cache_enabled: # Cache disabled, call function directly return func(data) try: # Create cache key and serialized data cache_key = _create_cache_key(data) # Ensure data is always serializable for the cache key serialized_data = json.dumps(data, sort_keys=True) # Call cached function return _cached_validation(cache_key, serialized_data) except Exception as e: logger.warning(f"Caching failed, falling back to direct call: {e}") # Fallback to direct function call return func(data) # Store reference to cached function for stats # The lru_cache decorator returns an object that implements the LRUCachedFunction protocol. _cached_parse_function = _cached_validation # type: ignore[assignment] return wrapper # Performance monitoring utilities class PerformanceTimer: """Simple performance timer for benchmarking.""" def __init__(self, name: str): self.name = name self.start_time: float | None = None self.end_time: float | None = None def __enter__(self): import time self.start_time = time.perf_counter() return self def __exit__(self, exc_type, exc_val, exc_tb): import time self.end_time = time.perf_counter() # Ensure start_time and end_time are not None before subtraction if self.start_time is not None and self.end_time is not None: duration = self.end_time - self.start_time logger.info(f"Performance [{self.name}]: {duration:.6f} seconds") else: logger.warning( f"PerformanceTimer '{self.name}' exited without proper start/end times." ) @property def duration(self) -> float: """Get the duration in seconds.""" if self.start_time is None or self.end_time is None: return 0.0 # Type checker should now correctly infer float types here return self.end_time - self.start_time def measure_performance(name: str): """Decorator to measure function performance.""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: with PerformanceTimer(name): return func(*args, **kwargs) return wrapper return decorator # Memory optimization utilities def optimize_message_validation(data: dict[str, Any]) -> ValidationResult: """ Optimized message validation function. This function applies various optimizations: - Fast path for common message types - Minimal object creation - Efficient error handling - Performance monitoring for production """ start = time.perf_counter() try: # Fast path for common cancelled notifications if data.get("method") == "notifications/cancelled": # Basic structure validation if "params" in data and isinstance(data["params"], dict): params = data["params"] if "requestId" in params: # This is likely a valid cancelled notification try: from .models.notifications import CancelledNotification model = CancelledNotification.model_validate(data) return ValidationResult(model=model, raw_data=data) except Exception as e: logger.debug(f"Fast path validation failed: {e}") # Fall back to full validation from .models.validation import safe_parse_notification return safe_parse_notification(data) finally: duration = time.perf_counter() - start message_perf_monitor.record(duration)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server