Skip to main content
Glama
port_manager.py2.75 kB
""" Port manager for assigning unique ports to Joern server instances """ import logging import threading from typing import Dict, Optional, Set logger = logging.getLogger(__name__) class PortManager: """Manages port allocation for Joern server instances (2000-2999)""" PORT_MIN = 2000 PORT_MAX = 2999 def __init__(self): self._session_to_port: Dict[str, int] = {} # session_id -> port self._port_to_session: Dict[int, str] = {} # port -> session_id self._available_ports: Set[int] = set(range(self.PORT_MIN, self.PORT_MAX + 1)) self._lock = threading.Lock() def allocate_port(self, session_id: str) -> int: """Allocate a port for a session""" with self._lock: # Check if session already has a port if session_id in self._session_to_port: port = self._session_to_port[session_id] logger.info(f"Session {session_id} already has port {port}") return port # Allocate a new port if not self._available_ports: raise RuntimeError("No available ports in range 2000-2999") port = min(self._available_ports) self._available_ports.remove(port) self._session_to_port[session_id] = port self._port_to_session[port] = session_id logger.info(f"Allocated port {port} for session {session_id}") return port def get_port(self, session_id: str) -> Optional[int]: """Get the port assigned to a session""" with self._lock: return self._session_to_port.get(session_id) def release_port(self, session_id: str) -> bool: """Release the port assigned to a session""" with self._lock: if session_id not in self._session_to_port: logger.warning(f"Session {session_id} has no allocated port") return False port = self._session_to_port[session_id] del self._session_to_port[session_id] del self._port_to_session[port] self._available_ports.add(port) logger.info(f"Released port {port} from session {session_id}") return True def get_session_by_port(self, port: int) -> Optional[str]: """Get the session ID for a given port""" with self._lock: return self._port_to_session.get(port) def get_all_allocations(self) -> Dict[str, int]: """Get all current port allocations""" with self._lock: return self._session_to_port.copy() def available_count(self) -> int: """Get the count of available ports""" with self._lock: return len(self._available_ports)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lekssays/codebadger'

If you have feedback or need assistance with the MCP directory API, please join our Discord server