Skip to main content
Glama

mcp-optimizer

resource_monitor.pyโ€ข8.45 kB
"""Resource monitoring and concurrency control for MCP Optimizer.""" import asyncio import logging import time from collections.abc import Callable from functools import wraps from typing import Any import psutil from mcp_optimizer.config import settings logger = logging.getLogger(__name__) class ResourceMonitor: """Monitor system resources and control concurrency.""" def __init__(self) -> None: """Initialize resource monitor.""" self.semaphore = asyncio.Semaphore(settings.max_concurrent_requests) self.active_requests = 0 self.total_requests = 0 self.failed_requests = 0 self.memory_warnings = 0 # Cache process for better performance self._process: psutil.Process | None = None self._last_memory_check: float = 0.0 self._cached_memory_mb: float = 0.0 # Cache interval: check memory max once per second self._memory_cache_interval = 1.0 def get_memory_usage_mb(self) -> float: """Get current memory usage in MB with caching.""" current_time = time.time() # Return cached value if recent enough if current_time - self._last_memory_check < self._memory_cache_interval: return self._cached_memory_mb try: if self._process is None: self._process = psutil.Process() # Get memory info only once and cache it memory_info = self._process.memory_info() self._cached_memory_mb = memory_info.rss / 1024 / 1024 self._last_memory_check = current_time return self._cached_memory_mb except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): # Process might have changed, reset cache self._process = None return 0.0 def is_memory_available(self, required_mb: float) -> bool: """Check if enough memory is available for the operation.""" try: current_memory = self.get_memory_usage_mb() available = current_memory + required_mb <= settings.max_memory_mb if not available: self.memory_warnings += 1 logger.warning( f"Memory limit would be exceeded: current={current_memory:.1f}MB, " f"required={required_mb:.1f}MB, limit={settings.max_memory_mb}MB" ) return available except Exception as e: logger.error(f"Error checking memory availability: {e}") return True # Allow operation if we can't check async def acquire_semaphore(self, timeout_seconds: float = 30.0) -> bool: """Acquire semaphore with timeout.""" try: await asyncio.wait_for(self.semaphore.acquire(), timeout=timeout_seconds) self.active_requests += 1 return True except TimeoutError: logger.warning(f"Failed to acquire semaphore within {timeout_seconds}s") return False def release_semaphore(self) -> None: """Release semaphore.""" self.semaphore.release() self.active_requests = max(0, self.active_requests - 1) def get_stats(self) -> dict[str, Any]: """Get resource monitoring statistics.""" return { "active_requests": self.active_requests, "total_requests": self.total_requests, "failed_requests": self.failed_requests, "memory_warnings": self.memory_warnings, "current_memory_mb": self.get_memory_usage_mb(), "max_memory_mb": settings.max_memory_mb, "max_concurrent_requests": settings.max_concurrent_requests, } # Global instance resource_monitor = ResourceMonitor() class MemoryExceededError(Exception): """Raised when memory usage would exceed limits.""" pass class ConcurrencyLimitError(Exception): """Raised when concurrent request limit is exceeded.""" pass def with_resource_limits( timeout_seconds: float = 60.0, estimated_memory_mb: float = 100.0 ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Decorator to enforce resource limits on functions.""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: if asyncio.iscoroutinefunction(func): @wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Any: resource_monitor.total_requests += 1 # Check memory before starting if not resource_monitor.is_memory_available(estimated_memory_mb): resource_monitor.failed_requests += 1 raise MemoryExceededError( f"Insufficient memory: need {estimated_memory_mb}MB, " f"limit {settings.max_memory_mb}MB" ) # Acquire semaphore if not await resource_monitor.acquire_semaphore(timeout_seconds): resource_monitor.failed_requests += 1 raise ConcurrencyLimitError( f"Too many concurrent requests. Max: {settings.max_concurrent_requests}" ) start_time = time.time() try: result = await func(*args, **kwargs) execution_time = time.time() - start_time logger.debug( f"Function {func.__name__} completed in {execution_time:.2f}s, " f"memory: {resource_monitor.get_memory_usage_mb():.1f}MB" ) return result except Exception as e: resource_monitor.failed_requests += 1 execution_time = time.time() - start_time logger.error( f"Function {func.__name__} failed after {execution_time:.2f}s: {e}" ) raise finally: resource_monitor.release_semaphore() return async_wrapper else: @wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Any: resource_monitor.total_requests += 1 # Check memory before starting if not resource_monitor.is_memory_available(estimated_memory_mb): resource_monitor.failed_requests += 1 raise MemoryExceededError( f"Insufficient memory: need {estimated_memory_mb}MB, " f"limit {settings.max_memory_mb}MB" ) # For sync functions, we can't use async semaphore # Just check current count if resource_monitor.active_requests >= settings.max_concurrent_requests: resource_monitor.failed_requests += 1 raise ConcurrencyLimitError( f"Too many concurrent requests. Max: {settings.max_concurrent_requests}" ) resource_monitor.active_requests += 1 start_time = time.time() try: result = func(*args, **kwargs) execution_time = time.time() - start_time logger.debug( f"Function {func.__name__} completed in {execution_time:.2f}s, " f"memory: {resource_monitor.get_memory_usage_mb():.1f}MB" ) return result except Exception as e: resource_monitor.failed_requests += 1 execution_time = time.time() - start_time logger.error( f"Function {func.__name__} failed after {execution_time:.2f}s: {e}" ) raise finally: resource_monitor.active_requests = max(0, resource_monitor.active_requests - 1) return sync_wrapper return decorator def get_resource_status() -> dict[str, Any]: """Get current resource status for health checks.""" return resource_monitor.get_stats() def reset_resource_stats() -> None: """Reset resource statistics (for testing).""" resource_monitor.total_requests = 0 resource_monitor.failed_requests = 0 resource_monitor.memory_warnings = 0

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dmitryanchikov/mcp-optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server