Skip to main content
Glama

Carla MCP Server

by agrathwohl
event_monitor.py•2.21 kB
#!/usr/bin/env python3 """ Event Monitor for Carla MCP Server """ import logging import asyncio from datetime import datetime from typing import Dict, Any, List, Callable logger = logging.getLogger(__name__) class EventMonitor: """Real-time event monitoring for Carla""" def __init__(self, carla_controller): """Initialize event monitor Args: carla_controller: CarlaController instance """ self.carla = carla_controller self.event_handlers = {} self.event_history = [] self.max_history = 1000 logger.info("EventMonitor initialized") async def handle_event(self, event: dict): """Handle incoming Carla event Args: event: Event data from Carla callback """ # Add timestamp if not present if 'timestamp' not in event: event['timestamp'] = datetime.now().isoformat() # Store in history self.event_history.append(event) if len(self.event_history) > self.max_history: self.event_history.pop(0) # Call registered handlers action = event.get('action') if action in self.event_handlers: for handler in self.event_handlers[action]: try: await handler(event) except Exception as e: logger.error(f"Event handler error: {str(e)}") def subscribe(self, event_type: str, handler: Callable): """Subscribe to specific event type Args: event_type: Type of event to subscribe to handler: Callback function """ if event_type not in self.event_handlers: self.event_handlers[event_type] = [] self.event_handlers[event_type].append(handler) logger.info(f"Subscribed to event type: {event_type}") def get_recent_events(self, count: int = 10) -> List[dict]: """Get recent events Args: count: Number of events to return Returns: List of recent events """ return self.event_history[-count:]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agrathwohl/carla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server