dap_wrapper.py•35 kB
"""
DAP Synchronous Wrapper
Bridges the gap between MCP's synchronous API and DAP's asynchronous event model.
Converts async DAP events to synchronous responses that MCP tools can return.
Key responsibilities:
- Initialize DAP connection with debugpy
- Set breakpoints and manage execution
- Wait for DAP events (stopped, continued, terminated) with timeout
- Fetch variables and convert to MCP response format
"""
import logging
import queue
import subprocess
import threading
import time
from pathlib import Path
from typing import Any
from .dap_client import DAPClient
from .schemas import BreakpointResponse, ExecutionError, FrameInfo
logger = logging.getLogger(__name__)
class DAPSyncWrapper:
"""
Synchronous wrapper around async DAP operations.
Manages debugpy server lifecycle, DAP connection, and converts async events
to synchronous responses suitable for MCP tool returns.
"""
def __init__(self, debugpy_host: str = 'localhost', debugpy_port: int | None = None):
"""
Initialize DAP sync wrapper.
Args:
debugpy_host: Host for debugpy server
debugpy_port: Port for debugpy server (will be allocated if None)
"""
self.debugpy_host = debugpy_host
self.debugpy_port = debugpy_port
self.debugpy_process: subprocess.Popen | None = None
self.client = DAPClient()
# Register event handler immediately after client creation
# This ensures we don't miss any events
self.client.add_event_handler(self._on_dap_event)
# Event-type specific queues for efficient event routing
self.event_queues: dict[str, queue.Queue[dict[str, Any]]] = {
'initialized': queue.Queue(),
'stopped': queue.Queue(),
'continued': queue.Queue(),
'terminated': queue.Queue(),
'exited': queue.Queue(),
'output': queue.Queue(),
'breakpoint': queue.Queue(),
'thread': queue.Queue(),
}
# General queue for unknown/unexpected events
self.general_event_queue: queue.Queue[dict[str, Any]] = queue.Queue()
self._initialized = False
self._configuration_done = False # Track if configurationDone has been sent
self._thread_id: int | None = None
def initialize_and_launch(
self,
python_path: str,
script_path: Path,
args: list[str] | None = None,
env: dict[str, str] | None = None,
cwd: Path | None = None,
) -> None:
"""
Initialize DAP connection and launch the target script (reverse connection).
This performs the complete DAP initialization sequence with reverse connection:
1. Start listening on a port (our DAP client acts as server)
2. Launch debugpy wrapper that calls debugpy.connect()
3. debugpy connects to us
4. Send initialize request
5. Send attach request (not launch - process is already running)
6. Wait for initialized event
7. Send configurationDone
Args:
python_path: Path to Python interpreter to use
script_path: Path to Python script to debug
args: Command-line arguments for script
env: Environment variables
cwd: Working directory
Raises:
ConnectionError: If DAP connection fails
RuntimeError: If initialization fails
"""
try:
# Step 1: Start listening FIRST (before launching debugpy)
# We need to be ready to accept the connection
# Allocate port if not specified
if not self.debugpy_port:
self.debugpy_port = self._find_free_port()
# Start listening in a separate thread so we can launch debugpy
listen_thread = threading.Thread(
target=self._connect_client,
daemon=True,
name="DAPListen"
)
listen_thread.start()
# Give the listen socket a moment to bind
time.sleep(0.1)
# Step 2: Launch debugpy wrapper (will connect to us)
# (Event handler was already registered in __init__)
self._launch_debugpy_server(python_path, script_path, args, env, cwd)
# Step 3: Wait for listen thread to accept connection
listen_thread.join(timeout=30.0)
if listen_thread.is_alive():
raise ConnectionError("Timeout waiting for debugpy to connect")
# Step 4: Send initialize request
self._send_initialize()
# Step 5: Send attach request (process is already running)
self._send_launch(script_path, args, env, cwd)
# Step 6: Wait for initialized event
self._wait_for_initialized_event()
# Note: configurationDone is NOT sent here
# It will be sent in run_to_breakpoint() after breakpoints are set
# This prevents the program from starting before breakpoints are configured
self._initialized = True
logger.info("DAP session initialized and ready (reverse connection, waiting for breakpoints)")
except Exception as e:
logger.error(f"Failed to initialize DAP session: {e}")
self.cleanup()
raise
def run_to_breakpoint(
self,
file: str,
line: int,
timeout: float = 20.0
) -> BreakpointResponse:
"""
Run to a specific breakpoint and capture variables.
This is the main synchronous API for MCP tools.
Args:
file: Absolute path to file
line: Line number (1-based)
timeout: Maximum time to wait for breakpoint
Returns:
BreakpointResponse with hit status and variables
"""
if not self._initialized:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InitializationError",
message="DAP session not initialized"
)
)
try:
# Step 1: Set breakpoint
self._set_breakpoints(file, [line])
# Step 2: Send configurationDone on first run (starts execution)
if not self._configuration_done:
self._send_configuration_done()
self._configuration_done = True
logger.info("Sent configurationDone - program execution will begin")
# Step 3: Handle first-time execution flow
if self._thread_id is None:
# First time: execution stopped at debugpy.breakpoint() in wrapper
# We need to:
# 1. Wait for the entry breakpoint stop event
# 2. Get thread ID
# 3. Continue past the entry breakpoint to reach user breakpoints
logger.debug("First breakpoint call - waiting for entry breakpoint stop")
# Wait for stopped event at entry breakpoint (debugpy.breakpoint())
entry_stop_event = self._wait_for_event('stopped', timeout=5.0)
if not entry_stop_event:
raise RuntimeError("Did not receive entry breakpoint stop event")
# Get thread ID from the stop event
self._thread_id = entry_stop_event['body'].get('threadId')
logger.debug(f"Stopped at entry breakpoint, thread ID: {self._thread_id}")
# Continue past the entry breakpoint
self._continue_execution()
logger.debug("Continued past entry breakpoint, now waiting for user breakpoint")
else:
# Not first time - continue from current position
self._continue_execution()
# Step 4: Wait for stopped event at user breakpoint
stopped_event = self._wait_for_event('stopped', timeout)
if not stopped_event:
# Timeout or execution completed
return BreakpointResponse(
hit=False,
completed=True,
error=ExecutionError(
type="TimeoutError",
message=f"Breakpoint not hit within {timeout}s"
)
)
# Step 5: Extract frame info
reason = stopped_event['body'].get('reason')
if reason != 'breakpoint':
logger.warning(f"Stopped for reason: {reason}")
self._thread_id = stopped_event['body'].get('threadId')
# Step 6: Get stack frames
frames = self._get_stack_frames(self._thread_id)
if not frames:
return BreakpointResponse(
hit=True,
completed=False,
error=ExecutionError(
type="StackError",
message="No stack frames available"
)
)
top_frame = frames[0]
frame_info = FrameInfo(
file=top_frame.get('source', {}).get('path', file),
line=top_frame.get('line', line)
)
# Step 7: Get variables
locals_dict = self._get_frame_variables(top_frame['id'])
return BreakpointResponse(
hit=True,
frameInfo=frame_info,
locals=locals_dict,
completed=False
)
except TimeoutError as e:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="TimeoutError",
message=str(e)
)
)
except Exception as e:
logger.exception("Error in run_to_breakpoint")
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e)
)
)
def step_in(self, timeout: float = 10.0) -> BreakpointResponse:
"""
Step into the next function call.
Executes the current line and stops at the first statement of any called function.
If the current line doesn't call a function, behaves like step_over.
Args:
timeout: Maximum time to wait for step completion
Returns:
BreakpointResponse with new execution location and variables
"""
if not self._initialized:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InitializationError",
message="DAP session not initialized"
)
)
if self._thread_id is None:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="StateError",
message="No active thread (must hit breakpoint first)"
)
)
return self._execute_step('stepIn', timeout)
def step_over(self, timeout: float = 10.0) -> BreakpointResponse:
"""
Step over the current line.
Executes the current line completely, including any function calls,
and stops at the next line in the current function.
Args:
timeout: Maximum time to wait for step completion
Returns:
BreakpointResponse with new execution location and variables
"""
if not self._initialized:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InitializationError",
message="DAP session not initialized"
)
)
if self._thread_id is None:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="StateError",
message="No active thread (must hit breakpoint first)"
)
)
return self._execute_step('next', timeout)
def step_out(self, timeout: float = 10.0) -> BreakpointResponse:
"""
Step out of the current function.
Continues execution until the current function returns,
then stops at the next line in the calling function.
Args:
timeout: Maximum time to wait for step completion
Returns:
BreakpointResponse with new execution location and variables
"""
if not self._initialized:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InitializationError",
message="DAP session not initialized"
)
)
if self._thread_id is None:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="StateError",
message="No active thread (must hit breakpoint first)"
)
)
return self._execute_step('stepOut', timeout)
def _execute_step(self, step_type: str, timeout: float) -> BreakpointResponse:
"""
Common logic for executing step operations.
Args:
step_type: DAP step command ('stepIn', 'next', or 'stepOut')
timeout: Maximum time to wait
Returns:
BreakpointResponse with execution results
"""
try:
# Send step request
# Note: DAP may not send response, only events
try:
response = self.client.send_request(step_type, {
'threadId': self._thread_id,
'singleThread': False,
}, timeout=1.0)
if not response.get('success', True):
logger.warning(f"{step_type} request returned unsuccessfully: {response.get('message')}")
except TimeoutError:
# This is OK - debugpy often doesn't send response, only sends stopped event
logger.debug(f"{step_type} request timed out (waiting for stopped event)")
# Wait for stopped event
stopped_event = self._wait_for_event('stopped', timeout)
if not stopped_event:
# Check if execution completed
return BreakpointResponse(
hit=False,
completed=True,
error=ExecutionError(
type="TimeoutError",
message=f"Step operation did not complete within {timeout}s"
)
)
# Extract stop reason
reason = stopped_event['body'].get('reason')
logger.debug(f"Stopped after {step_type}: reason={reason}")
# Update thread ID (should be same, but be safe)
self._thread_id = stopped_event['body'].get('threadId')
# Get stack frames
frames = self._get_stack_frames(self._thread_id)
if not frames:
return BreakpointResponse(
hit=True,
completed=False,
error=ExecutionError(
type="StackError",
message="No stack frames available after step"
)
)
top_frame = frames[0]
frame_info = FrameInfo(
file=top_frame.get('source', {}).get('path', ''),
line=top_frame.get('line', 0)
)
# Get variables at new location
locals_dict = self._get_frame_variables(top_frame['id'])
return BreakpointResponse(
hit=True,
frameInfo=frame_info,
locals=locals_dict,
completed=False
)
except Exception as e:
logger.exception(f"Error in {step_type}")
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e)
)
)
def cleanup(self) -> None:
"""Clean up resources: disconnect client, terminate debugpy process."""
try:
# Disconnect DAP client
if self.client:
try:
self.client.send_request('disconnect', timeout=2.0)
except Exception:
pass
self.client.disconnect()
# Terminate debugpy process
if self.debugpy_process:
try:
self.debugpy_process.terminate()
self.debugpy_process.wait(timeout=5.0)
except subprocess.TimeoutExpired:
self.debugpy_process.kill()
self.debugpy_process.wait()
except Exception as e:
logger.warning(f"Error terminating debugpy process: {e}")
logger.info("DAP session cleaned up")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
# ========================================
# Internal: debugpy server management
# ========================================
def _launch_debugpy_server(
self,
python_path: str,
script_path: Path,
args: list[str] | None,
env: dict[str, str] | None,
cwd: Path | None,
) -> None:
"""
Launch debugpy with reverse connection pattern.
Instead of debugpy listening (--listen mode, which has a message loop bug),
we use reverse connection:
1. Create a wrapper script that calls debugpy.connect()
2. Launch the wrapper (which will connect to our listening socket)
3. debugpy connects to us and DAP communication works perfectly
"""
import os
import tempfile
# Find free port if not specified
if not self.debugpy_port:
self.debugpy_port = self._find_free_port()
# Create wrapper script that uses debugpy.connect()
# This makes debugpy connect TO our listening socket
#
# KEY INSIGHT: debugpy.breakpoint() creates an initial breakpoint
# that prevents execution until DAP client sends configurationDone.
# This ensures breakpoints can be set before the script runs.
wrapper_code = f"""
import sys
import debugpy
# Connect to DAP client (reverse connection)
debugpy.connect(('{ self.debugpy_host}', {self.debugpy_port}))
# Wait for client to complete initialization
debugpy.wait_for_client()
# Set up sys.argv for the target script
sys.argv = {[str(script_path)] + (args or [])}
# Create an entry breakpoint - execution pauses here until configurationDone
# This allows the DAP client to set breakpoints before script execution begins
debugpy.breakpoint()
# Load and execute the target script
with open({repr(str(script_path))}, 'r') as f:
code = compile(f.read(), {repr(str(script_path))}, 'exec')
exec(code, {{'__name__': '__main__', '__file__': {repr(str(script_path))}}})
"""
# Write wrapper to temp file
wrapper_file = tempfile.NamedTemporaryFile(
mode='w',
suffix='.py',
delete=False,
prefix='debugpy_wrapper_'
)
wrapper_file.write(wrapper_code)
wrapper_file.close()
wrapper_path = wrapper_file.name
logger.debug(f"Created wrapper script: {wrapper_path}")
# Build command - run wrapper script with target Python
cmd = [python_path, wrapper_path]
# Prepare environment - inherit current environment if not specified
process_env = os.environ.copy()
if env:
process_env.update(env)
logger.debug(f"Launching debugpy wrapper with command: {' '.join(cmd)}")
# Launch process
self.debugpy_process = subprocess.Popen(
cmd,
env=process_env,
cwd=cwd or script_path.parent,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
)
logger.info(f"Launched debugpy wrapper (will connect to {self.debugpy_host}:{self.debugpy_port}, pid={self.debugpy_process.pid})")
# Note: We don't wait for connection here - that happens in _connect_client()
# which now uses listen() instead of connect()
def _find_free_port(self) -> int:
"""Find an available port."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
port = s.getsockname()[1]
return port
def _connect_client(self) -> None:
"""
Wait for debugpy to connect to our listening socket (reverse connection).
The wrapper script will call debugpy.connect() which connects to us.
"""
try:
# Listen for incoming connection from debugpy
# Timeout is longer (30s) to allow debugpy to start and connect
self.client.listen(self.debugpy_host, self.debugpy_port, timeout=30.0)
logger.info("debugpy connected successfully (reverse connection)")
except ConnectionError as e:
raise ConnectionError("debugpy failed to connect within timeout") from e
# ========================================
# Internal: DAP initialization sequence
# ========================================
def _send_initialize(self) -> None:
"""Send initialize request."""
response = self.client.send_request('initialize', {
'clientID': 'mcp-debug-tool',
'clientName': 'MCP Debug Tool',
'adapterID': 'python',
'pathFormat': 'path',
'linesStartAt1': True,
'columnsStartAt1': True,
'supportsVariableType': True,
'supportsVariablePaging': False,
'supportsRunInTerminalRequest': False,
})
if not response.get('success', True):
raise RuntimeError(f"Initialize failed: {response.get('message')}")
logger.debug("Sent initialize request")
def _send_launch(
self,
script_path: Path,
args: list[str] | None,
env: dict[str, str] | None,
cwd: Path | None,
) -> None:
"""
Send attach request to debugpy.
For reverse connection (debugpy.connect()), we need to send attach request
to tell debugpy that we're ready to start debugging.
The attach request requires minimal arguments for our use case.
"""
# Send attach request
# For debugpy.connect() mode, we need to provide __sessionId
attach_args: dict[str, Any] = {
'__sessionId': 'mcp-debug-tool', # Required by debugpy
}
response = self.client.send_request('attach', attach_args, timeout=10.0)
if not response.get('success', True):
raise RuntimeError(f"Attach failed: {response.get('message')}")
logger.debug("Sent attach request")
def _wait_for_initialized_event(self, timeout: float = 10.0) -> None:
"""Wait for initialized event."""
event = self._wait_for_event('initialized', timeout)
if not event:
raise RuntimeError("Did not receive initialized event")
logger.debug("Received initialized event")
def _send_configuration_done(self) -> None:
"""Send configurationDone request."""
response = self.client.send_request('configurationDone')
if not response.get('success', True):
raise RuntimeError(f"configurationDone failed: {response.get('message')}")
logger.debug("Sent configurationDone")
# ========================================
# Internal: Breakpoint and execution control
# ========================================
def _set_breakpoints(self, file: str, lines: list[int]) -> None:
"""Set breakpoints in a file."""
breakpoints = [{'line': line} for line in lines]
response = self.client.send_request('setBreakpoints', {
'source': {'path': file},
'breakpoints': breakpoints,
})
if not response.get('success', True):
raise RuntimeError(f"setBreakpoints failed: {response.get('message')}")
logger.debug(f"Set {len(lines)} breakpoints in {file}")
def _continue_execution(self) -> None:
"""Continue execution from current position."""
if self._thread_id is None:
raise RuntimeError("Cannot continue: thread_id not set")
# Send continue request - NOTE: debugpy may not send response, only events
# So we use a short timeout and ignore timeout errors
try:
response = self.client.send_request('continue', {
'threadId': self._thread_id,
}, timeout=1.0)
if not response.get('success', True):
logger.warning(f"Continue request returned unsuccessfully: {response.get('message')}")
except TimeoutError:
# This is OK - debugpy often doesn't send response, only sends stopped/continued events
logger.debug("Continue request timed out (this is normal - waiting for events)")
logger.debug(f"Continue request sent for thread {self._thread_id}")
def _get_stack_frames(self, thread_id: int) -> list[dict[str, Any]]:
"""Get stack frames for a thread."""
response = self.client.send_request('stackTrace', {
'threadId': thread_id,
})
if not response.get('success', True):
raise RuntimeError(f"stackTrace failed: {response.get('message')}")
frames = response.get('body', {}).get('stackFrames', [])
return frames
def _get_frame_variables(self, frame_id: int) -> dict[str, Any]:
"""
Get all variables for a frame (locals, globals, etc.).
Returns dictionary in MCP format (compatible with current bdb format).
"""
# Get scopes for frame
response = self.client.send_request('scopes', {
'frameId': frame_id,
})
if not response.get('success', True):
return {}
scopes = response.get('body', {}).get('scopes', [])
# Collect variables from all scopes
all_vars = {}
for scope in scopes:
scope_name = scope.get('name', 'unknown')
variables_ref = scope.get('variablesReference', 0)
if variables_ref == 0:
continue
# Get variables for this scope
vars_response = self.client.send_request('variables', {
'variablesReference': variables_ref,
})
if not vars_response.get('success', True):
continue
variables = vars_response.get('body', {}).get('variables', [])
# Convert to MCP format with enhanced information
for var in variables:
name = var.get('name', '')
value = var.get('value', '')
var_type = var.get('type', 'unknown')
variables_ref = var.get('variablesReference', 0)
# Skip private variables (starting with __)
if name.startswith('__'):
continue
var_info: dict[str, Any] = {
'type': var_type,
'repr': value,
'isTruncated': False,
}
# For collection types, use evaluate to get accurate length
collection_types = ('list', 'tuple', 'dict', 'set', 'str', 'frozenset', 'bytes', 'bytearray')
if var_type in collection_types:
try:
eval_response = self.client.send_request('evaluate', {
'expression': f'len({name})',
'frameId': frame_id,
'context': 'watch',
})
if eval_response.get('success'):
length_str = eval_response.get('body', {}).get('result', '')
try:
length = int(length_str)
var_info['length'] = length
# Mark as indexed for list-like types
if var_type in ('list', 'tuple', 'str', 'bytes', 'bytearray'):
var_info['isIndexed'] = True
# Mark as having named properties for dict-like types
if var_type in ('dict',):
var_info['hasNamedProperties'] = True
var_info['namedCount'] = length
# Mark large collections as truncated
if length > 100:
var_info['isTruncated'] = True
except (ValueError, TypeError):
# If we can't parse the length, just skip it
pass
except Exception:
# If evaluate fails, fall back to checking variablesReference
pass
# If variable has children (is expandable), note it
# Empty collections should not be marked as expandable
if variables_ref > 0:
# Only mark as expandable if length is not explicitly 0
if 'length' not in var_info or var_info['length'] > 0:
var_info['isExpandable'] = True
var_info['variablesReference'] = variables_ref
all_vars[name] = var_info
return all_vars
# ========================================
# Internal: Event handling
# ========================================
def _on_dap_event(self, event: dict[str, Any]) -> None:
"""
Callback for DAP events.
Routes events to type-specific queues for efficient synchronous waiting.
This pattern eliminates the need for event re-queuing and improves performance.
"""
event_type = event.get('event', 'unknown')
logger.debug(f"Received DAP event: {event_type}")
# Route to specific queue if available
if event_type in self.event_queues:
self.event_queues[event_type].put(event)
logger.debug(f"Routed '{event_type}' event to dedicated queue")
else:
# Route to general queue for unexpected events
self.general_event_queue.put(event)
logger.debug(f"Routed '{event_type}' event to general queue")
def _wait_for_event(
self,
event_type: str,
timeout: float
) -> dict[str, Any] | None:
"""
Wait for a specific DAP event type using dedicated queues.
This implementation uses type-specific queues to avoid the inefficient
pattern of repeatedly getting and re-queuing events. Each event type
has its own queue, so we can directly block on the queue we need.
Args:
event_type: Event type to wait for (e.g., 'stopped', 'terminated')
timeout: Maximum time to wait in seconds
Returns:
Event dictionary if received, None if timeout
Raises:
TimeoutError: If event not received within timeout period
"""
try:
# If we have a dedicated queue for this event type, use it directly
if event_type in self.event_queues:
event = self.event_queues[event_type].get(timeout=timeout)
logger.debug(f"Received '{event_type}' event from dedicated queue")
return event
else:
# Fall back to searching general queue for unknown event types
logger.debug(f"Searching general queue for '{event_type}' event")
start_time = time.time()
# Temporarily store events we don't want
temp_events = []
try:
while time.time() - start_time < timeout:
remaining_timeout = timeout - (time.time() - start_time)
if remaining_timeout <= 0:
break
try:
event = self.general_event_queue.get(timeout=min(0.1, remaining_timeout))
if event.get('event') == event_type:
# Found it! Re-queue the others and return
for temp_event in temp_events:
self.general_event_queue.put(temp_event)
logger.debug(f"Found '{event_type}' event in general queue")
return event
else:
# Not the one we want, save for later
temp_events.append(event)
except queue.Empty:
continue
# Timeout - re-queue everything we took out
for temp_event in temp_events:
self.general_event_queue.put(temp_event)
finally:
# Ensure we always re-queue in case of exception
pass
except queue.Empty:
logger.warning(f"Timeout waiting for event: {event_type} (timeout={timeout}s)")
return None
logger.warning(f"Timeout waiting for event: {event_type} (timeout={timeout}s)")
return None
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.cleanup()
return False