Skip to main content
Glama

Carla MCP Server

by agrathwohl
cpu_monitor.py•3.76 kB
#!/usr/bin/env python3 """ CPU Monitor for Carla MCP Server """ import logging import psutil import time import threading from typing import Dict, Any, List logger = logging.getLogger(__name__) class CPUMonitor: """CPU usage monitoring for Carla""" def __init__(self, carla_controller): """Initialize CPU monitor Args: carla_controller: CarlaController instance """ self.carla = carla_controller self.monitoring = False self.monitor_thread = None self.cpu_history = [] self.plugin_loads = {} logger.info("CPUMonitor initialized") def start_monitoring(self, interval: float = 1.0): """Start CPU monitoring Args: interval: Update interval in seconds """ if self.monitoring: return self.monitoring = True self.monitor_thread = threading.Thread( target=self._monitor_loop, args=(interval,), daemon=True ) self.monitor_thread.start() logger.info(f"Started CPU monitoring with {interval}s interval") def stop_monitoring(self): """Stop CPU monitoring""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=2.0) logger.info("Stopped CPU monitoring") def _monitor_loop(self, interval: float): """Monitor loop running in thread Args: interval: Update interval in seconds """ while self.monitoring: try: # Get system CPU usage cpu_percent = psutil.cpu_percent(interval=0.1) # Get memory usage memory = psutil.virtual_memory() # Get plugin CPU loads total_plugin_load = 0 for plugin_id in self.carla.plugins: load = self.carla.get_cpu_load(plugin_id) self.plugin_loads[plugin_id] = load total_plugin_load += load # Store in history self.cpu_history.append({ 'timestamp': time.time(), 'system_cpu': cpu_percent, 'memory_percent': memory.percent, 'plugin_load': total_plugin_load }) # Limit history size if len(self.cpu_history) > 300: # 5 minutes at 1s interval self.cpu_history.pop(0) time.sleep(interval) except Exception as e: logger.error(f"CPU monitor error: {str(e)}") def get_current_usage(self) -> float: """Get current CPU usage Returns: Current CPU percentage """ if self.cpu_history: return self.cpu_history[-1]['system_cpu'] return psutil.cpu_percent(interval=0.1) def get_plugin_loads(self) -> Dict[int, float]: """Get CPU load for each plugin Returns: Dictionary of plugin loads """ return self.plugin_loads.copy() def get_history(self, duration: int = 60) -> List[dict]: """Get CPU history Args: duration: Duration in seconds Returns: List of CPU measurements """ if not self.cpu_history: return [] current_time = time.time() return [ entry for entry in self.cpu_history if current_time - entry['timestamp'] <= duration ]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agrathwohl/carla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server