Skip to main content
Glama
XRDS76354

SUMO-MCP Server

by XRDS76354
connection.py2.89 kB
import traci import logging from typing import Optional from utils.sumo import find_sumo_binary logger = logging.getLogger(__name__) class SUMOConnection: """ Singleton class to manage the connection to the SUMO server via TraCI. """ _instance: Optional['SUMOConnection'] = None _connected: bool def __new__(cls) -> "SUMOConnection": if cls._instance is None: cls._instance = super(SUMOConnection, cls).__new__(cls) cls._instance._connected = False return cls._instance def connect( self, config_file: Optional[str] = None, gui: bool = False, port: int = 8813, host: str = "localhost", ) -> None: """ Start SUMO and connect, or connect to an existing instance. If config_file is provided, starts a new instance. If config_file is None, attempts to connect to existing server at host:port. """ if self._connected: logger.info("Already connected to SUMO.") return try: if config_file: binary_name = "sumo-gui" if gui else "sumo" binary = find_sumo_binary(binary_name) if not binary: raise RuntimeError( "Could not locate SUMO executable. " "Please ensure SUMO is installed and either the binary is in PATH or SUMO_HOME is set." ) # Add --no-step-log to prevent stdout pollution which breaks JSON-RPC cmd = [binary, "-c", config_file, "--no-step-log", "true"] logger.info(f"Starting SUMO with command: {cmd}") traci.start(cmd, port=port) # Note: traci.start usually handles port selection or defaults else: logger.info(f"Connecting to existing SUMO at {host}:{port}") traci.init(host=host, port=port) self._connected = True logger.info("Successfully connected to SUMO.") except Exception as e: logger.error(f"Failed to connect to SUMO: {e}") raise def disconnect(self) -> None: """Disconnect from SUMO server.""" if not self._connected: return try: traci.close() logger.info("Disconnected from SUMO.") except Exception as e: logger.error(f"Error during disconnect: {e}") finally: self._connected = False def is_connected(self) -> bool: return self._connected def simulation_step(self, step: float = 0) -> None: """Advance the simulation.""" if not self._connected: raise RuntimeError("Not connected to SUMO.") traci.simulationStep(step) # Global instance connection_manager = SUMOConnection()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/XRDS76354/SUMO-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server