Skip to main content
Glama
dap_wrapper.py35 kB
""" DAP Synchronous Wrapper Bridges the gap between MCP's synchronous API and DAP's asynchronous event model. Converts async DAP events to synchronous responses that MCP tools can return. Key responsibilities: - Initialize DAP connection with debugpy - Set breakpoints and manage execution - Wait for DAP events (stopped, continued, terminated) with timeout - Fetch variables and convert to MCP response format """ import logging import queue import subprocess import threading import time from pathlib import Path from typing import Any from .dap_client import DAPClient from .schemas import BreakpointResponse, ExecutionError, FrameInfo logger = logging.getLogger(__name__) class DAPSyncWrapper: """ Synchronous wrapper around async DAP operations. Manages debugpy server lifecycle, DAP connection, and converts async events to synchronous responses suitable for MCP tool returns. """ def __init__(self, debugpy_host: str = 'localhost', debugpy_port: int | None = None): """ Initialize DAP sync wrapper. Args: debugpy_host: Host for debugpy server debugpy_port: Port for debugpy server (will be allocated if None) """ self.debugpy_host = debugpy_host self.debugpy_port = debugpy_port self.debugpy_process: subprocess.Popen | None = None self.client = DAPClient() # Register event handler immediately after client creation # This ensures we don't miss any events self.client.add_event_handler(self._on_dap_event) # Event-type specific queues for efficient event routing self.event_queues: dict[str, queue.Queue[dict[str, Any]]] = { 'initialized': queue.Queue(), 'stopped': queue.Queue(), 'continued': queue.Queue(), 'terminated': queue.Queue(), 'exited': queue.Queue(), 'output': queue.Queue(), 'breakpoint': queue.Queue(), 'thread': queue.Queue(), } # General queue for unknown/unexpected events self.general_event_queue: queue.Queue[dict[str, Any]] = queue.Queue() self._initialized = False self._configuration_done = False # Track if configurationDone has been sent self._thread_id: int | None = None def initialize_and_launch( self, python_path: str, script_path: Path, args: list[str] | None = None, env: dict[str, str] | None = None, cwd: Path | None = None, ) -> None: """ Initialize DAP connection and launch the target script (reverse connection). This performs the complete DAP initialization sequence with reverse connection: 1. Start listening on a port (our DAP client acts as server) 2. Launch debugpy wrapper that calls debugpy.connect() 3. debugpy connects to us 4. Send initialize request 5. Send attach request (not launch - process is already running) 6. Wait for initialized event 7. Send configurationDone Args: python_path: Path to Python interpreter to use script_path: Path to Python script to debug args: Command-line arguments for script env: Environment variables cwd: Working directory Raises: ConnectionError: If DAP connection fails RuntimeError: If initialization fails """ try: # Step 1: Start listening FIRST (before launching debugpy) # We need to be ready to accept the connection # Allocate port if not specified if not self.debugpy_port: self.debugpy_port = self._find_free_port() # Start listening in a separate thread so we can launch debugpy listen_thread = threading.Thread( target=self._connect_client, daemon=True, name="DAPListen" ) listen_thread.start() # Give the listen socket a moment to bind time.sleep(0.1) # Step 2: Launch debugpy wrapper (will connect to us) # (Event handler was already registered in __init__) self._launch_debugpy_server(python_path, script_path, args, env, cwd) # Step 3: Wait for listen thread to accept connection listen_thread.join(timeout=30.0) if listen_thread.is_alive(): raise ConnectionError("Timeout waiting for debugpy to connect") # Step 4: Send initialize request self._send_initialize() # Step 5: Send attach request (process is already running) self._send_launch(script_path, args, env, cwd) # Step 6: Wait for initialized event self._wait_for_initialized_event() # Note: configurationDone is NOT sent here # It will be sent in run_to_breakpoint() after breakpoints are set # This prevents the program from starting before breakpoints are configured self._initialized = True logger.info("DAP session initialized and ready (reverse connection, waiting for breakpoints)") except Exception as e: logger.error(f"Failed to initialize DAP session: {e}") self.cleanup() raise def run_to_breakpoint( self, file: str, line: int, timeout: float = 20.0 ) -> BreakpointResponse: """ Run to a specific breakpoint and capture variables. This is the main synchronous API for MCP tools. Args: file: Absolute path to file line: Line number (1-based) timeout: Maximum time to wait for breakpoint Returns: BreakpointResponse with hit status and variables """ if not self._initialized: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InitializationError", message="DAP session not initialized" ) ) try: # Step 1: Set breakpoint self._set_breakpoints(file, [line]) # Step 2: Send configurationDone on first run (starts execution) if not self._configuration_done: self._send_configuration_done() self._configuration_done = True logger.info("Sent configurationDone - program execution will begin") # Step 3: Handle first-time execution flow if self._thread_id is None: # First time: execution stopped at debugpy.breakpoint() in wrapper # We need to: # 1. Wait for the entry breakpoint stop event # 2. Get thread ID # 3. Continue past the entry breakpoint to reach user breakpoints logger.debug("First breakpoint call - waiting for entry breakpoint stop") # Wait for stopped event at entry breakpoint (debugpy.breakpoint()) entry_stop_event = self._wait_for_event('stopped', timeout=5.0) if not entry_stop_event: raise RuntimeError("Did not receive entry breakpoint stop event") # Get thread ID from the stop event self._thread_id = entry_stop_event['body'].get('threadId') logger.debug(f"Stopped at entry breakpoint, thread ID: {self._thread_id}") # Continue past the entry breakpoint self._continue_execution() logger.debug("Continued past entry breakpoint, now waiting for user breakpoint") else: # Not first time - continue from current position self._continue_execution() # Step 4: Wait for stopped event at user breakpoint stopped_event = self._wait_for_event('stopped', timeout) if not stopped_event: # Timeout or execution completed return BreakpointResponse( hit=False, completed=True, error=ExecutionError( type="TimeoutError", message=f"Breakpoint not hit within {timeout}s" ) ) # Step 5: Extract frame info reason = stopped_event['body'].get('reason') if reason != 'breakpoint': logger.warning(f"Stopped for reason: {reason}") self._thread_id = stopped_event['body'].get('threadId') # Step 6: Get stack frames frames = self._get_stack_frames(self._thread_id) if not frames: return BreakpointResponse( hit=True, completed=False, error=ExecutionError( type="StackError", message="No stack frames available" ) ) top_frame = frames[0] frame_info = FrameInfo( file=top_frame.get('source', {}).get('path', file), line=top_frame.get('line', line) ) # Step 7: Get variables locals_dict = self._get_frame_variables(top_frame['id']) return BreakpointResponse( hit=True, frameInfo=frame_info, locals=locals_dict, completed=False ) except TimeoutError as e: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="TimeoutError", message=str(e) ) ) except Exception as e: logger.exception("Error in run_to_breakpoint") return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e) ) ) def step_in(self, timeout: float = 10.0) -> BreakpointResponse: """ Step into the next function call. Executes the current line and stops at the first statement of any called function. If the current line doesn't call a function, behaves like step_over. Args: timeout: Maximum time to wait for step completion Returns: BreakpointResponse with new execution location and variables """ if not self._initialized: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InitializationError", message="DAP session not initialized" ) ) if self._thread_id is None: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="StateError", message="No active thread (must hit breakpoint first)" ) ) return self._execute_step('stepIn', timeout) def step_over(self, timeout: float = 10.0) -> BreakpointResponse: """ Step over the current line. Executes the current line completely, including any function calls, and stops at the next line in the current function. Args: timeout: Maximum time to wait for step completion Returns: BreakpointResponse with new execution location and variables """ if not self._initialized: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InitializationError", message="DAP session not initialized" ) ) if self._thread_id is None: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="StateError", message="No active thread (must hit breakpoint first)" ) ) return self._execute_step('next', timeout) def step_out(self, timeout: float = 10.0) -> BreakpointResponse: """ Step out of the current function. Continues execution until the current function returns, then stops at the next line in the calling function. Args: timeout: Maximum time to wait for step completion Returns: BreakpointResponse with new execution location and variables """ if not self._initialized: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="InitializationError", message="DAP session not initialized" ) ) if self._thread_id is None: return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type="StateError", message="No active thread (must hit breakpoint first)" ) ) return self._execute_step('stepOut', timeout) def _execute_step(self, step_type: str, timeout: float) -> BreakpointResponse: """ Common logic for executing step operations. Args: step_type: DAP step command ('stepIn', 'next', or 'stepOut') timeout: Maximum time to wait Returns: BreakpointResponse with execution results """ try: # Send step request # Note: DAP may not send response, only events try: response = self.client.send_request(step_type, { 'threadId': self._thread_id, 'singleThread': False, }, timeout=1.0) if not response.get('success', True): logger.warning(f"{step_type} request returned unsuccessfully: {response.get('message')}") except TimeoutError: # This is OK - debugpy often doesn't send response, only sends stopped event logger.debug(f"{step_type} request timed out (waiting for stopped event)") # Wait for stopped event stopped_event = self._wait_for_event('stopped', timeout) if not stopped_event: # Check if execution completed return BreakpointResponse( hit=False, completed=True, error=ExecutionError( type="TimeoutError", message=f"Step operation did not complete within {timeout}s" ) ) # Extract stop reason reason = stopped_event['body'].get('reason') logger.debug(f"Stopped after {step_type}: reason={reason}") # Update thread ID (should be same, but be safe) self._thread_id = stopped_event['body'].get('threadId') # Get stack frames frames = self._get_stack_frames(self._thread_id) if not frames: return BreakpointResponse( hit=True, completed=False, error=ExecutionError( type="StackError", message="No stack frames available after step" ) ) top_frame = frames[0] frame_info = FrameInfo( file=top_frame.get('source', {}).get('path', ''), line=top_frame.get('line', 0) ) # Get variables at new location locals_dict = self._get_frame_variables(top_frame['id']) return BreakpointResponse( hit=True, frameInfo=frame_info, locals=locals_dict, completed=False ) except Exception as e: logger.exception(f"Error in {step_type}") return BreakpointResponse( hit=False, completed=False, error=ExecutionError( type=type(e).__name__, message=str(e) ) ) def cleanup(self) -> None: """Clean up resources: disconnect client, terminate debugpy process.""" try: # Disconnect DAP client if self.client: try: self.client.send_request('disconnect', timeout=2.0) except Exception: pass self.client.disconnect() # Terminate debugpy process if self.debugpy_process: try: self.debugpy_process.terminate() self.debugpy_process.wait(timeout=5.0) except subprocess.TimeoutExpired: self.debugpy_process.kill() self.debugpy_process.wait() except Exception as e: logger.warning(f"Error terminating debugpy process: {e}") logger.info("DAP session cleaned up") except Exception as e: logger.error(f"Error during cleanup: {e}") # ======================================== # Internal: debugpy server management # ======================================== def _launch_debugpy_server( self, python_path: str, script_path: Path, args: list[str] | None, env: dict[str, str] | None, cwd: Path | None, ) -> None: """ Launch debugpy with reverse connection pattern. Instead of debugpy listening (--listen mode, which has a message loop bug), we use reverse connection: 1. Create a wrapper script that calls debugpy.connect() 2. Launch the wrapper (which will connect to our listening socket) 3. debugpy connects to us and DAP communication works perfectly """ import os import tempfile # Find free port if not specified if not self.debugpy_port: self.debugpy_port = self._find_free_port() # Create wrapper script that uses debugpy.connect() # This makes debugpy connect TO our listening socket # # KEY INSIGHT: debugpy.breakpoint() creates an initial breakpoint # that prevents execution until DAP client sends configurationDone. # This ensures breakpoints can be set before the script runs. wrapper_code = f""" import sys import debugpy # Connect to DAP client (reverse connection) debugpy.connect(('{ self.debugpy_host}', {self.debugpy_port})) # Wait for client to complete initialization debugpy.wait_for_client() # Set up sys.argv for the target script sys.argv = {[str(script_path)] + (args or [])} # Create an entry breakpoint - execution pauses here until configurationDone # This allows the DAP client to set breakpoints before script execution begins debugpy.breakpoint() # Load and execute the target script with open({repr(str(script_path))}, 'r') as f: code = compile(f.read(), {repr(str(script_path))}, 'exec') exec(code, {{'__name__': '__main__', '__file__': {repr(str(script_path))}}}) """ # Write wrapper to temp file wrapper_file = tempfile.NamedTemporaryFile( mode='w', suffix='.py', delete=False, prefix='debugpy_wrapper_' ) wrapper_file.write(wrapper_code) wrapper_file.close() wrapper_path = wrapper_file.name logger.debug(f"Created wrapper script: {wrapper_path}") # Build command - run wrapper script with target Python cmd = [python_path, wrapper_path] # Prepare environment - inherit current environment if not specified process_env = os.environ.copy() if env: process_env.update(env) logger.debug(f"Launching debugpy wrapper with command: {' '.join(cmd)}") # Launch process self.debugpy_process = subprocess.Popen( cmd, env=process_env, cwd=cwd or script_path.parent, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, # Line buffered ) logger.info(f"Launched debugpy wrapper (will connect to {self.debugpy_host}:{self.debugpy_port}, pid={self.debugpy_process.pid})") # Note: We don't wait for connection here - that happens in _connect_client() # which now uses listen() instead of connect() def _find_free_port(self) -> int: """Find an available port.""" import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.listen(1) port = s.getsockname()[1] return port def _connect_client(self) -> None: """ Wait for debugpy to connect to our listening socket (reverse connection). The wrapper script will call debugpy.connect() which connects to us. """ try: # Listen for incoming connection from debugpy # Timeout is longer (30s) to allow debugpy to start and connect self.client.listen(self.debugpy_host, self.debugpy_port, timeout=30.0) logger.info("debugpy connected successfully (reverse connection)") except ConnectionError as e: raise ConnectionError("debugpy failed to connect within timeout") from e # ======================================== # Internal: DAP initialization sequence # ======================================== def _send_initialize(self) -> None: """Send initialize request.""" response = self.client.send_request('initialize', { 'clientID': 'mcp-debug-tool', 'clientName': 'MCP Debug Tool', 'adapterID': 'python', 'pathFormat': 'path', 'linesStartAt1': True, 'columnsStartAt1': True, 'supportsVariableType': True, 'supportsVariablePaging': False, 'supportsRunInTerminalRequest': False, }) if not response.get('success', True): raise RuntimeError(f"Initialize failed: {response.get('message')}") logger.debug("Sent initialize request") def _send_launch( self, script_path: Path, args: list[str] | None, env: dict[str, str] | None, cwd: Path | None, ) -> None: """ Send attach request to debugpy. For reverse connection (debugpy.connect()), we need to send attach request to tell debugpy that we're ready to start debugging. The attach request requires minimal arguments for our use case. """ # Send attach request # For debugpy.connect() mode, we need to provide __sessionId attach_args: dict[str, Any] = { '__sessionId': 'mcp-debug-tool', # Required by debugpy } response = self.client.send_request('attach', attach_args, timeout=10.0) if not response.get('success', True): raise RuntimeError(f"Attach failed: {response.get('message')}") logger.debug("Sent attach request") def _wait_for_initialized_event(self, timeout: float = 10.0) -> None: """Wait for initialized event.""" event = self._wait_for_event('initialized', timeout) if not event: raise RuntimeError("Did not receive initialized event") logger.debug("Received initialized event") def _send_configuration_done(self) -> None: """Send configurationDone request.""" response = self.client.send_request('configurationDone') if not response.get('success', True): raise RuntimeError(f"configurationDone failed: {response.get('message')}") logger.debug("Sent configurationDone") # ======================================== # Internal: Breakpoint and execution control # ======================================== def _set_breakpoints(self, file: str, lines: list[int]) -> None: """Set breakpoints in a file.""" breakpoints = [{'line': line} for line in lines] response = self.client.send_request('setBreakpoints', { 'source': {'path': file}, 'breakpoints': breakpoints, }) if not response.get('success', True): raise RuntimeError(f"setBreakpoints failed: {response.get('message')}") logger.debug(f"Set {len(lines)} breakpoints in {file}") def _continue_execution(self) -> None: """Continue execution from current position.""" if self._thread_id is None: raise RuntimeError("Cannot continue: thread_id not set") # Send continue request - NOTE: debugpy may not send response, only events # So we use a short timeout and ignore timeout errors try: response = self.client.send_request('continue', { 'threadId': self._thread_id, }, timeout=1.0) if not response.get('success', True): logger.warning(f"Continue request returned unsuccessfully: {response.get('message')}") except TimeoutError: # This is OK - debugpy often doesn't send response, only sends stopped/continued events logger.debug("Continue request timed out (this is normal - waiting for events)") logger.debug(f"Continue request sent for thread {self._thread_id}") def _get_stack_frames(self, thread_id: int) -> list[dict[str, Any]]: """Get stack frames for a thread.""" response = self.client.send_request('stackTrace', { 'threadId': thread_id, }) if not response.get('success', True): raise RuntimeError(f"stackTrace failed: {response.get('message')}") frames = response.get('body', {}).get('stackFrames', []) return frames def _get_frame_variables(self, frame_id: int) -> dict[str, Any]: """ Get all variables for a frame (locals, globals, etc.). Returns dictionary in MCP format (compatible with current bdb format). """ # Get scopes for frame response = self.client.send_request('scopes', { 'frameId': frame_id, }) if not response.get('success', True): return {} scopes = response.get('body', {}).get('scopes', []) # Collect variables from all scopes all_vars = {} for scope in scopes: scope_name = scope.get('name', 'unknown') variables_ref = scope.get('variablesReference', 0) if variables_ref == 0: continue # Get variables for this scope vars_response = self.client.send_request('variables', { 'variablesReference': variables_ref, }) if not vars_response.get('success', True): continue variables = vars_response.get('body', {}).get('variables', []) # Convert to MCP format with enhanced information for var in variables: name = var.get('name', '') value = var.get('value', '') var_type = var.get('type', 'unknown') variables_ref = var.get('variablesReference', 0) # Skip private variables (starting with __) if name.startswith('__'): continue var_info: dict[str, Any] = { 'type': var_type, 'repr': value, 'isTruncated': False, } # For collection types, use evaluate to get accurate length collection_types = ('list', 'tuple', 'dict', 'set', 'str', 'frozenset', 'bytes', 'bytearray') if var_type in collection_types: try: eval_response = self.client.send_request('evaluate', { 'expression': f'len({name})', 'frameId': frame_id, 'context': 'watch', }) if eval_response.get('success'): length_str = eval_response.get('body', {}).get('result', '') try: length = int(length_str) var_info['length'] = length # Mark as indexed for list-like types if var_type in ('list', 'tuple', 'str', 'bytes', 'bytearray'): var_info['isIndexed'] = True # Mark as having named properties for dict-like types if var_type in ('dict',): var_info['hasNamedProperties'] = True var_info['namedCount'] = length # Mark large collections as truncated if length > 100: var_info['isTruncated'] = True except (ValueError, TypeError): # If we can't parse the length, just skip it pass except Exception: # If evaluate fails, fall back to checking variablesReference pass # If variable has children (is expandable), note it # Empty collections should not be marked as expandable if variables_ref > 0: # Only mark as expandable if length is not explicitly 0 if 'length' not in var_info or var_info['length'] > 0: var_info['isExpandable'] = True var_info['variablesReference'] = variables_ref all_vars[name] = var_info return all_vars # ======================================== # Internal: Event handling # ======================================== def _on_dap_event(self, event: dict[str, Any]) -> None: """ Callback for DAP events. Routes events to type-specific queues for efficient synchronous waiting. This pattern eliminates the need for event re-queuing and improves performance. """ event_type = event.get('event', 'unknown') logger.debug(f"Received DAP event: {event_type}") # Route to specific queue if available if event_type in self.event_queues: self.event_queues[event_type].put(event) logger.debug(f"Routed '{event_type}' event to dedicated queue") else: # Route to general queue for unexpected events self.general_event_queue.put(event) logger.debug(f"Routed '{event_type}' event to general queue") def _wait_for_event( self, event_type: str, timeout: float ) -> dict[str, Any] | None: """ Wait for a specific DAP event type using dedicated queues. This implementation uses type-specific queues to avoid the inefficient pattern of repeatedly getting and re-queuing events. Each event type has its own queue, so we can directly block on the queue we need. Args: event_type: Event type to wait for (e.g., 'stopped', 'terminated') timeout: Maximum time to wait in seconds Returns: Event dictionary if received, None if timeout Raises: TimeoutError: If event not received within timeout period """ try: # If we have a dedicated queue for this event type, use it directly if event_type in self.event_queues: event = self.event_queues[event_type].get(timeout=timeout) logger.debug(f"Received '{event_type}' event from dedicated queue") return event else: # Fall back to searching general queue for unknown event types logger.debug(f"Searching general queue for '{event_type}' event") start_time = time.time() # Temporarily store events we don't want temp_events = [] try: while time.time() - start_time < timeout: remaining_timeout = timeout - (time.time() - start_time) if remaining_timeout <= 0: break try: event = self.general_event_queue.get(timeout=min(0.1, remaining_timeout)) if event.get('event') == event_type: # Found it! Re-queue the others and return for temp_event in temp_events: self.general_event_queue.put(temp_event) logger.debug(f"Found '{event_type}' event in general queue") return event else: # Not the one we want, save for later temp_events.append(event) except queue.Empty: continue # Timeout - re-queue everything we took out for temp_event in temp_events: self.general_event_queue.put(temp_event) finally: # Ensure we always re-queue in case of exception pass except queue.Empty: logger.warning(f"Timeout waiting for event: {event_type} (timeout={timeout}s)") return None logger.warning(f"Timeout waiting for event: {event_type} (timeout={timeout}s)") return None def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.cleanup() return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaina3/Debug-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server