Skip to main content
Glama
gdb_manager.py11.4 kB
#!/usr/bin/env python3 """GDB process management for MCP server.""" import asyncio import logging import uuid from datetime import datetime, timedelta from typing import Dict, Optional, Any logger = logging.getLogger(__name__) class GDBSession: """Represents a single GDB debugging session.""" def __init__(self, session_id: str, timeout: int = 300): self.id = session_id self.timeout = timeout # Session idle timeout in seconds self.created = datetime.now() self.last_activity = datetime.now() self.process: Optional[asyncio.subprocess.Process] = None self.reader_task: Optional[asyncio.Task] = None self.timeout_task: Optional[asyncio.Task] = None self.output_queue: asyncio.Queue = asyncio.Queue() self._lock = asyncio.Lock() async def start(self): """Start the GDB process in MI mode.""" cmd = ["gdb", "--interpreter=mi2"] logger.info(f"Starting GDB session {self.id} with command: {' '.join(cmd)}") self.process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Start output reader task self.reader_task = asyncio.create_task(self._read_output()) # Start timeout monitoring task self.timeout_task = asyncio.create_task(self._monitor_timeout()) # Wait for GDB to initialize await asyncio.sleep(0.1) async def _read_output(self): """Read output from GDB process.""" if not self.process or not self.process.stdout: return try: while True: line = await self.process.stdout.readline() if not line: break decoded = line.decode('utf-8', errors='replace').strip() if decoded: await self.output_queue.put(decoded) logger.debug(f"GDB output: {decoded}") except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error reading GDB output: {e}") async def _monitor_timeout(self): """Monitor session timeout and close if idle.""" try: while True: await asyncio.sleep(10) # Check every 10 seconds idle_time = (datetime.now() - self.last_activity).total_seconds() if idle_time > self.timeout: logger.info(f"Session {self.id} timed out after {idle_time:.1f} seconds") # Session will be cleaned up by the manager break except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error in timeout monitor: {e}") async def send_command(self, command: str) -> Dict[str, Any]: """Send a command to GDB and collect response.""" if not self.process or self.process.returncode is not None: raise RuntimeError("GDB process is not running") # Update last activity time self.last_activity = datetime.now() async with self._lock: # Clear output queue while not self.output_queue.empty(): try: self.output_queue.get_nowait() except asyncio.QueueEmpty: break # Send command logger.info(f"Sending command to GDB: {command}") if self.process.stdin: self.process.stdin.write(f"{command}\n".encode()) await self.process.stdin.drain() else: raise RuntimeError("GDB stdin is not available") # Collect output output_lines = [] result = None try: # Wait for response with timeout timeout_task = asyncio.create_task(asyncio.sleep(5)) while True: get_task = asyncio.create_task(self.output_queue.get()) done, pending = await asyncio.wait( {get_task, timeout_task}, return_when=asyncio.FIRST_COMPLETED ) if timeout_task in done: # Timeout for task in pending: task.cancel() break line = await get_task output_lines.append(line) # Check for result record if line.startswith('^'): result = self._parse_mi_result(line) # Continue reading to get (gdb) prompt # Check for prompt if line == '(gdb)': for task in pending: task.cancel() break except Exception as e: logger.error(f"Error reading GDB response: {e}") return { "result": result, "output": '\n'.join(output_lines) } def _parse_mi_result(self, line: str) -> Dict[str, Any]: """Parse MI result record.""" if line.startswith('^done'): return {"status": "done", "data": line[5:].strip()} elif line.startswith('^running'): return {"status": "running"} elif line.startswith('^connected'): return {"status": "connected"} elif line.startswith('^error'): # Extract error message msg_start = line.find('msg="') if msg_start >= 0: msg_start += 5 msg_end = line.find('"', msg_start) if msg_end >= 0: return {"status": "error", "message": line[msg_start:msg_end]} return {"status": "error", "message": line[6:].strip()} elif line.startswith('^exit'): return {"status": "exit"} else: return {"status": "unknown", "data": line} async def close(self): """Close the GDB session.""" # Cancel timeout monitoring if self.timeout_task: self.timeout_task.cancel() try: await self.timeout_task except asyncio.CancelledError: pass # Cancel output reader if self.reader_task: self.reader_task.cancel() try: await self.reader_task except asyncio.CancelledError: pass if self.process: if self.process.returncode is None: # Send quit command try: if self.process.stdin: self.process.stdin.write(b"quit\n") await self.process.stdin.drain() await asyncio.wait_for(self.process.wait(), timeout=2) except Exception: # Force terminate if quit doesn't work self.process.terminate() await asyncio.wait_for(self.process.wait(), timeout=2) self.process = None class GDBManager: """Manages multiple GDB sessions.""" def __init__(self): self.sessions: Dict[str, GDBSession] = {} self._lock = asyncio.Lock() self._cleanup_task: Optional[asyncio.Task] = None async def start(self): """Start the GDB manager.""" self._cleanup_task = asyncio.create_task(self._cleanup_timed_out_sessions()) async def _cleanup_timed_out_sessions(self): """Periodically check and clean up timed out sessions.""" try: while True: await asyncio.sleep(30) # Check every 30 seconds async with self._lock: timed_out = [] for session_id, session in self.sessions.items(): idle_time = (datetime.now() - session.last_activity).total_seconds() if idle_time > session.timeout: timed_out.append(session_id) # Close timed out sessions outside the lock for session_id in timed_out: try: logger.info(f"Cleaning up timed out session: {session_id}") await self.close_session(session_id) except Exception as e: logger.error(f"Error cleaning up session {session_id}: {e}") except asyncio.CancelledError: pass async def create_session(self, timeout: int = 300) -> str: """Create a new GDB session.""" session_id = str(uuid.uuid4()) async with self._lock: session = GDBSession(session_id, timeout) await session.start() self.sessions[session_id] = session logger.info(f"Created GDB session: {session_id}") return session_id async def send_command(self, session_id: str, command: str) -> Dict[str, Any]: """Send command to a specific session.""" async with self._lock: session = self.sessions.get(session_id) if not session: raise ValueError(f"Session not found: {session_id}") return await session.send_command(command) async def close_session(self, session_id: str): """Close a specific session.""" async with self._lock: session = self.sessions.pop(session_id, None) if not session: raise ValueError(f"Session not found: {session_id}") await session.close() logger.info(f"Closed GDB session: {session_id}") async def list_sessions(self) -> list: """List all active sessions.""" async with self._lock: return [ { "id": session.id, "created": session.created.isoformat() } for session in self.sessions.values() ] async def cleanup(self): """Clean up all sessions.""" # Cancel cleanup task if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass async with self._lock: session_ids = list(self.sessions.keys()) for session_id in session_ids: try: await self.close_session(session_id) except Exception as e: logger.error(f"Error closing session {session_id}: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hnmr293/gdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server