Skip to main content
Glama
sessions.py35.6 kB
""" Session manager for debug sessions. Handles session lifecycle: creation, state tracking, and cleanup. """ import asyncio import contextlib import json import os import subprocess import sys import uuid from datetime import datetime from pathlib import Path from .schemas import ( BreakpointRequest, BreakpointResponse, BreakpointTarget, EndSessionResponse, ExecutionError, SessionStateResponse, SessionStatus, SessionTimings, StartSessionRequest, StartSessionResponse, ) from .utils import DEFAULT_TIMEOUT_SECONDS, Timer, resolve_workspace_path, validate_file_and_line class DebugSession: """Represents an active debug session.""" def __init__( self, session_id: str, entry: Path, python_path: str, workspace_root: Path, args: list[str] | None = None, env: dict[str, str] | None = None, use_dap: bool = True, ): self.id = session_id self.entry = entry self.workspace_root = workspace_root self.args = args or [] self.env = env or {} self.python_path = python_path self.use_dap = use_dap self.status = SessionStatus.IDLE self.created_at = datetime.now() self.updated_at = datetime.now() self.last_breakpoint: BreakpointTarget | None = None self.timings = SessionTimings() # bdb mode (legacy) self.process: subprocess.Popen | None = None self.reader_task = None # Background task for reading subprocess output # DAP mode (default) self.dap_wrapper = None # Will be set if use_dap=True def update_status(self, status: SessionStatus) -> None: """Update session status and timestamp.""" self.status = status self.updated_at = datetime.now() def update_breakpoint(self, file: str, line: int) -> None: """Update last breakpoint information.""" if self.last_breakpoint and self.last_breakpoint.file == file and self.last_breakpoint.line == line: self.last_breakpoint.hitCount += 1 else: self.last_breakpoint = BreakpointTarget(file=file, line=line, hitCount=1) self.updated_at = datetime.now() def update_timings(self, last_run_ms: float) -> None: """Update timing information.""" self.timings.lastRunMs = last_run_ms self.timings.totalCpuTimeMs += last_run_ms self.updated_at = datetime.now() def to_state_response(self) -> SessionStateResponse: """Convert to state response model.""" return SessionStateResponse( status=self.status, lastBreakpoint=self.last_breakpoint, timings=self.timings, ) class SessionManager: """Manages multiple debug sessions.""" def __init__(self, workspace_root: Path): self.workspace_root = workspace_root self.sessions: dict[str, DebugSession] = {} def create_session(self, request: StartSessionRequest) -> StartSessionResponse: """ Create a new debug session. Args: request: Session creation request Returns: Response with new session ID Raises: ValueError: If entry path is invalid FileNotFoundError: If entry script doesn't exist """ # Validate and resolve entry path entry_path = resolve_workspace_path(self.workspace_root, request.entry) if not entry_path.exists(): raise FileNotFoundError(f"Entry script not found: {request.entry}") if not entry_path.is_file(): raise ValueError(f"Entry path is not a file: {request.entry}") if entry_path.suffix != ".py": raise ValueError(f"Entry must be a Python file (.py): {request.entry}") # Determine workspace root for this session # If entry is absolute path, find the workspace root from the entry path session_workspace = self._find_workspace_root(entry_path) # Validate Python path (required) python_path = Path(request.pythonPath) if not python_path.exists(): raise FileNotFoundError(f"Python interpreter not found: {request.pythonPath}") if not python_path.is_file(): raise ValueError(f"Python path is not a file: {request.pythonPath}") # Generate session ID session_id = str(uuid.uuid4()) # Create session with DAP enabled by default use_dap = request.useDap if request.useDap is not None else True session = DebugSession( session_id=session_id, entry=entry_path, python_path=request.pythonPath, workspace_root=session_workspace, args=request.args or [], env=request.env or {}, use_dap=use_dap, ) self.sessions[session_id] = session return StartSessionResponse(sessionId=session_id) def _find_workspace_root(self, entry_path: Path) -> Path: """ Find the workspace root for a given entry path. Searches upwards from the entry path for common project markers: - .git directory - pyproject.toml - setup.py - requirements.txt Args: entry_path: Absolute path to the entry script Returns: Workspace root path (defaults to entry's parent if no markers found) """ current = entry_path.parent markers = ['.git', 'pyproject.toml', 'setup.py', 'requirements.txt', '.venv', 'venv'] # Search up to 10 levels for _ in range(10): for marker in markers: if (current / marker).exists(): return current parent = current.parent if parent == current: # Reached root break current = parent # Default to entry file's directory return entry_path.parent def get_session(self, session_id: str) -> DebugSession: """ Get session by ID. Args: session_id: Session ID Returns: Debug session Raises: KeyError: If session not found """ if session_id not in self.sessions: raise KeyError(f"Session not found: {session_id}") return self.sessions[session_id] def get_state(self, session_id: str) -> SessionStateResponse: """ Get current session state. Args: session_id: Session ID Returns: Current state response """ session = self.get_session(session_id) return session.to_state_response() def end_session(self, session_id: str) -> EndSessionResponse: """ End a debug session and clean up resources. Args: session_id: Session ID Returns: End confirmation response """ session = self.get_session(session_id) # Clean up DAP wrapper if using DAP if session.dap_wrapper: try: session.dap_wrapper.terminate() except Exception: pass # Clean up subprocess if running (bdb mode) if session.process and session.process.poll() is None: # Try graceful termination first try: terminate_cmd = json.dumps({"command": "terminate"}) + '\n' session.process.stdin.write(terminate_cmd) session.process.stdin.flush() session.process.wait(timeout=5) except Exception: pass # Force terminate if still running if session.process.poll() is None: session.process.terminate() try: session.process.wait(timeout=5) except subprocess.TimeoutExpired: session.process.kill() session.process.wait() # Update status session.update_status(SessionStatus.COMPLETED) # Remove from active sessions del self.sessions[session_id] return EndSessionResponse(ended=True) def run_to_breakpoint( self, session_id: str, request: BreakpointRequest, timeout: float | None = None ) -> BreakpointResponse: """ Run session to a breakpoint and capture locals. Args: session_id: Session ID request: Breakpoint request with file and line timeout: Optional timeout in seconds (defaults to DEFAULT_TIMEOUT_SECONDS) Returns: Breakpoint response with locals Raises: KeyError: If session not found ValueError: If file/line is invalid """ session = self.get_session(session_id) # Validate breakpoint location using session's workspace root breakpoint_path = resolve_workspace_path(session.workspace_root, request.file) validate_file_and_line(breakpoint_path, request.line) # Use default timeout if not specified if timeout is None: timeout = DEFAULT_TIMEOUT_SECONDS # Use DAP if enabled (default) if session.use_dap: return self._run_to_breakpoint_dap(session, breakpoint_path, request.line, timeout) else: return self._run_to_breakpoint_bdb(session, breakpoint_path, request.line, timeout) def _run_to_breakpoint_dap( self, session: DebugSession, breakpoint_path: Path, line: int, timeout: float ) -> BreakpointResponse: """Run to breakpoint using DAP (debugpy).""" from .dap_wrapper import DAPSyncWrapper # Initialize DAP wrapper if not already done if not session.dap_wrapper: try: # Create and initialize DAP wrapper # python_path is guaranteed to be set (required field) session.dap_wrapper = DAPSyncWrapper() session.dap_wrapper.initialize_and_launch( python_path=session.python_path, script_path=session.entry, args=session.args, env=session.env, cwd=session.workspace_root, ) except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=f"Failed to initialize DAP: {e}", ), ) # Run to breakpoint session.update_status(SessionStatus.RUNNING) with Timer() as timer: try: response = session.dap_wrapper.run_to_breakpoint( str(breakpoint_path), line, timeout=timeout, ) # Update session state if response.hit: session.update_breakpoint(str(breakpoint_path), line) session.update_status(SessionStatus.PAUSED) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def _run_to_breakpoint_bdb( self, session: DebugSession, breakpoint_path: Path, line: int, timeout: float ) -> BreakpointResponse: """Run to breakpoint using bdb (legacy).""" # Start subprocess if not already running if not session.process: # Use the specified Python interpreter (required field) python_executable = session.python_path # Path to runner script runner_script = Path(__file__).parent / "runner_main.py" # Prepare environment with PYTHONPATH env = os.environ.copy() # Add Debug-MCP's src directory to PYTHONPATH so runner can import mcp_debug_tool debug_mcp_src = str(Path(__file__).parent.parent.resolve()) if 'PYTHONPATH' in env: env['PYTHONPATH'] = f"{debug_mcp_src}{os.pathsep}{env['PYTHONPATH']}" else: env['PYTHONPATH'] = debug_mcp_src # Start subprocess with specified Python interpreter # Use session's workspace root instead of manager's workspace root process = subprocess.Popen( [python_executable, str(runner_script), str(session.workspace_root)], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, # Line buffered env=env, # Use modified environment ) session.process = process # Update status session.update_status(SessionStatus.RUNNING) # Send run_to_breakpoint command with Timer() as timer: try: command = { "command": "run_to_breakpoint", "script_path": str(session.entry), "file": str(breakpoint_path), "line": line, "args": session.args, "env": session.env, } # Send command as JSON line command_json = json.dumps(command) + '\n' session.process.stdin.write(command_json) session.process.stdin.flush() # Read response with timeout import select # Wait for output with timeout ready, _, _ = select.select([session.process.stdout], [], [], timeout) if ready: response_line = session.process.stdout.readline() if not response_line: # Process terminated session.update_status(SessionStatus.ERROR) stderr_output = session.process.stderr.read() return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="ProcessError", message=f"Runner process terminated unexpectedly. stderr: {stderr_output[:500]}", ), ) response_data = json.loads(response_line) if response_data["status"] == "error": session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="ExecutionError", message=response_data["message"], ), ) # Parse response data = response_data["data"] response = BreakpointResponse(**data) # Update session state if response.hit: session.update_breakpoint(str(breakpoint_path), line) session.update_status(SessionStatus.PAUSED) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response else: # Timeout session.update_status(SessionStatus.ERROR) debug_msg = f"DEBUG: timeout waiting for runner response. poll={session.process.poll() if session.process else 'N/A'}" print(debug_msg, file=sys.stderr) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="TimeoutError", message=f"Execution timed out after {timeout}s", ), ) except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def continue_execution( self, session_id: str, request: BreakpointRequest ) -> BreakpointResponse: """ Continue execution to next breakpoint within same session. Args: session_id: Session ID request: Breakpoint request with file and line Returns: Breakpoint response with locals at next breakpoint Raises: KeyError: If session not found ValueError: If file/line is invalid """ session = self.get_session(session_id) # Validate breakpoint location using session's workspace root breakpoint_path = resolve_workspace_path(session.workspace_root, request.file) validate_file_and_line(breakpoint_path, request.line) # Session must be paused (from previous breakpoint) if session.status != SessionStatus.PAUSED: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InvalidStateError", message=f"Cannot continue from state: {session.status}", ), ) # Use DAP if enabled (default) if session.use_dap: return self._continue_execution_dap(session, breakpoint_path, request.line) else: return self._continue_execution_bdb(session, breakpoint_path, request.line) def _continue_execution_dap( self, session: DebugSession, breakpoint_path: Path, line: int ) -> BreakpointResponse: """Continue execution using DAP.""" if not session.dap_wrapper: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="SessionError", message="DAP session not initialized", ), ) session.update_status(SessionStatus.RUNNING) with Timer() as timer: try: # For DAP, continue to next breakpoint uses run_to_breakpoint # (DAP manages the execution state internally) response = session.dap_wrapper.run_to_breakpoint( str(breakpoint_path), line, timeout=DEFAULT_TIMEOUT_SECONDS, ) # Update session state if response.hit: session.update_breakpoint(str(breakpoint_path), line) session.update_status(SessionStatus.PAUSED) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def _continue_execution_bdb( self, session: DebugSession, breakpoint_path: Path, line: int ) -> BreakpointResponse: """Continue execution using bdb (legacy).""" # Process must be running if not session.process or session.process.poll() is not None: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="SessionError", message="Debug process is not running", ), ) # Update status session.update_status(SessionStatus.RUNNING) # Update status session.update_status(SessionStatus.RUNNING) # Send continue command with next breakpoint with Timer() as timer: try: command = { "command": "continue", "file": str(breakpoint_path), "line": line, } # Send command as JSON line command_json = json.dumps(command) + '\n' session.process.stdin.write(command_json) session.process.stdin.flush() # Read response with timeout import select ready, _, _ = select.select([session.process.stdout], [], [], DEFAULT_TIMEOUT_SECONDS) if ready: response_line = session.process.stdout.readline() if not response_line: session.update_status(SessionStatus.ERROR) stderr_output = session.process.stderr.read() return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="ProcessError", message=f"Runner process terminated. stderr: {stderr_output[:500]}", ), ) response_data = json.loads(response_line) if response_data["status"] == "error": session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="ExecutionError", message=response_data["message"], ), ) # Parse response data = response_data["data"] response = BreakpointResponse(**data) # Update session state if response.hit: session.update_breakpoint(str(breakpoint_path), line) session.update_status(SessionStatus.PAUSED) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response else: # Timeout session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="TimeoutError", message=f"Execution timed out after {DEFAULT_TIMEOUT_SECONDS}s", ), ) except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def step_in(self, session_id: str) -> BreakpointResponse: """ Step into the next function call (DAP only). Args: session_id: Session ID Returns: BreakpointResponse with new location and variables Raises: KeyError: If session not found ValueError: If session is not using DAP """ session = self.get_session(session_id) if not session.use_dap: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="NotImplementedError", message="Step operations require DAP integration (set useDap=true when creating session)", ), ) if not session.dap_wrapper: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="SessionError", message="DAP session not initialized", ), ) if session.status != SessionStatus.PAUSED: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InvalidStateError", message=f"Cannot step from state: {session.status}. Must be PAUSED.", ), ) session.update_status(SessionStatus.RUNNING) with Timer() as timer: try: response = session.dap_wrapper.step_in(timeout=DEFAULT_TIMEOUT_SECONDS) # Update session state if response.hit: session.update_status(SessionStatus.PAUSED) if response.frameInfo: session.update_breakpoint(response.frameInfo.file, response.frameInfo.line) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def step_over(self, session_id: str) -> BreakpointResponse: """ Step over the current line (DAP only). Args: session_id: Session ID Returns: BreakpointResponse with new location and variables Raises: KeyError: If session not found ValueError: If session is not using DAP """ session = self.get_session(session_id) if not session.use_dap: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="NotImplementedError", message="Step operations require DAP integration (set useDap=true when creating session)", ), ) if not session.dap_wrapper: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="SessionError", message="DAP session not initialized", ), ) if session.status != SessionStatus.PAUSED: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InvalidStateError", message=f"Cannot step from state: {session.status}. Must be PAUSED.", ), ) session.update_status(SessionStatus.RUNNING) with Timer() as timer: try: response = session.dap_wrapper.step_over(timeout=DEFAULT_TIMEOUT_SECONDS) # Update session state if response.hit: session.update_status(SessionStatus.PAUSED) if response.frameInfo: session.update_breakpoint(response.frameInfo.file, response.frameInfo.line) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def step_out(self, session_id: str) -> BreakpointResponse: """ Step out of the current function (DAP only). Args: session_id: Session ID Returns: BreakpointResponse with new location and variables Raises: KeyError: If session not found ValueError: If session is not using DAP """ session = self.get_session(session_id) if not session.use_dap: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="NotImplementedError", message="Step operations require DAP integration (set useDap=true when creating session)", ), ) if not session.dap_wrapper: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="SessionError", message="DAP session not initialized", ), ) if session.status != SessionStatus.PAUSED: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InvalidStateError", message=f"Cannot step from state: {session.status}. Must be PAUSED.", ), ) session.update_status(SessionStatus.RUNNING) with Timer() as timer: try: response = session.dap_wrapper.step_out(timeout=DEFAULT_TIMEOUT_SECONDS) # Update session state if response.hit: session.update_status(SessionStatus.PAUSED) if response.frameInfo: session.update_breakpoint(response.frameInfo.file, response.frameInfo.line) elif response.completed: session.update_status(SessionStatus.COMPLETED) elif response.error: session.update_status(SessionStatus.ERROR) session.update_timings(timer.elapsed_ms) return response except Exception as e: session.update_status(SessionStatus.ERROR) return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e), ), ) def cleanup_all(self) -> None: """Clean up all active sessions.""" session_ids = list(self.sessions.keys()) for session_id in session_ids: with contextlib.suppress(Exception): self.end_session(session_id) # ======================================== # Async Wrappers for MCP SDK Integration # ======================================== async def create_session_async(self, request: StartSessionRequest) -> StartSessionResponse: """ Async wrapper for create_session. Runs the synchronous create_session in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.create_session, request) async def get_state_async(self, session_id: str) -> SessionStateResponse: """ Async wrapper for get_state. Runs the synchronous get_state in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.get_state, session_id) async def end_session_async(self, session_id: str) -> EndSessionResponse: """ Async wrapper for end_session. Runs the synchronous end_session in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.end_session, session_id) async def run_to_breakpoint_async( self, session_id: str, request: BreakpointRequest ) -> BreakpointResponse: """ Async wrapper for run_to_breakpoint. Runs the synchronous run_to_breakpoint in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.run_to_breakpoint, session_id, request) async def continue_execution_async( self, session_id: str, request: BreakpointRequest ) -> BreakpointResponse: """ Async wrapper for continue_execution. Runs the synchronous continue_execution in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.continue_execution, session_id, request) async def step_in_async(self, session_id: str) -> BreakpointResponse: """ Async wrapper for step_in. Runs the synchronous step_in in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.step_in, session_id) async def step_over_async(self, session_id: str) -> BreakpointResponse: """ Async wrapper for step_over. Runs the synchronous step_over in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.step_over, session_id) async def step_out_async(self, session_id: str) -> BreakpointResponse: """ Async wrapper for step_out. Runs the synchronous step_out in a thread pool to avoid blocking. """ return await asyncio.to_thread(self.step_out, session_id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaina3/Debug-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server