dap_client.py•14.7 kB
"""
Low-level DAP (Debug Adapter Protocol) client implementation.
Handles socket communication, message encoding/decoding, and request/response correlation
for the Debug Adapter Protocol.
References:
- DAP Specification: https://microsoft.github.io/debug-adapter-protocol/
- Message Format: Content-Length header + JSON body
"""
import json
import logging
import socket
import threading
from collections.abc import Callable
from typing import Any
logger = logging.getLogger(__name__)
class DAPClient:
"""
Low-level DAP protocol communication client.
Manages socket connection, message encoding/decoding, and request/response correlation.
Messages follow the DAP format: Content-Length header followed by JSON body.
"""
def __init__(self):
"""Initialize DAP client."""
self.socket: socket.socket | None = None
self.seq_counter = 1
self.pending_requests: dict[int, threading.Event] = {}
self.responses: dict[int, dict[str, Any]] = {}
self.event_handlers: list[Callable[[dict[str, Any]], None]] = []
self._receiver_thread: threading.Thread | None = None
self._stop_receiver = threading.Event()
self._lock = threading.Lock()
self._read_buffer = b'' # Buffer for partial messages
def connect(self, host: str, port: int, timeout: float = 10.0) -> None:
"""
Establish socket connection to DAP server (client mode - NOT WORKING with debugpy).
WARNING: This mode does not work with debugpy due to adapter message loop issue.
Use listen() instead for reverse connection.
Args:
host: Hostname or IP address
port: Port number
timeout: Connection timeout in seconds
Raises:
ConnectionError: If connection fails
"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Disable Nagle's algorithm for low-latency communication
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.socket.settimeout(timeout)
self.socket.connect((host, port))
logger.info(f"Connected to DAP server at {host}:{port}")
# Start receiver thread
self._stop_receiver.clear()
self._receiver_thread = threading.Thread(
target=self._receive_loop,
daemon=True,
name="DAPClientReceiver"
)
self._receiver_thread.start()
except Exception as e:
logger.error(f"Failed to connect to DAP server: {e}")
raise ConnectionError(f"Failed to connect to {host}:{port}: {e}") from e
def listen(self, host: str, port: int, timeout: float = 30.0) -> None:
"""
Listen for incoming DAP connection (server mode - WORKING with debugpy.connect()).
This is the reverse connection pattern:
1. Our DAP client listens on a port
2. debugpy connects to us via debugpy.connect()
3. Message exchange works perfectly
This approach solves the debugpy adapter message loop issue.
Args:
host: Hostname to bind to (usually '127.0.0.1')
port: Port to listen on
timeout: Connection accept timeout in seconds
Raises:
ConnectionError: If listen/accept fails
"""
server_socket = None
try:
# Create server socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(1)
server_socket.settimeout(timeout)
logger.info(f"Listening for DAP connection on {host}:{port}")
# Accept connection from debugpy
client_socket, addr = server_socket.accept()
logger.info(f"Accepted DAP connection from {addr}")
# Configure client socket
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Store as our communication socket
self.socket = client_socket
# Start receiver thread
self._stop_receiver.clear()
self._receiver_thread = threading.Thread(
target=self._receive_loop,
daemon=True,
name="DAPClientReceiver"
)
self._receiver_thread.start()
except TimeoutError:
raise ConnectionError(f"Timeout waiting for DAP connection on {host}:{port}")
except Exception as e:
logger.error(f"Failed to listen for DAP connection: {e}")
raise ConnectionError(f"Failed to listen on {host}:{port}: {e}") from e
finally:
# Close server socket (we only need the client socket now)
if server_socket:
try:
server_socket.close()
except Exception:
pass
def disconnect(self) -> None:
"""Close the socket connection and stop receiver thread."""
if self._receiver_thread and self._receiver_thread.is_alive():
self._stop_receiver.set()
if self._receiver_thread != threading.current_thread():
self._receiver_thread.join(timeout=2.0)
if self.socket:
try:
self.socket.close()
except Exception as e:
logger.warning(f"Error closing socket: {e}")
finally:
self.socket = None
logger.info("Disconnected from DAP server")
def send_request(
self,
command: str,
arguments: dict[str, Any] | None = None,
timeout: float = 10.0
) -> dict[str, Any]:
"""
Send a DAP request and wait for response.
Args:
command: DAP command name (e.g., 'initialize', 'launch', 'setBreakpoints')
arguments: Command arguments (optional)
timeout: Response timeout in seconds
Returns:
Response message as dictionary
Raises:
ConnectionError: If not connected or send fails
TimeoutError: If response not received within timeout
"""
if not self.socket:
raise ConnectionError("Not connected to DAP server")
with self._lock:
seq = self.seq_counter
self.seq_counter += 1
# Build request message
message: dict[str, Any] = {
'seq': seq,
'type': 'request',
'command': command,
}
if arguments:
message['arguments'] = arguments
# Encode message with Content-Length header
body = json.dumps(message)
body_bytes = body.encode('utf-8')
header = f"Content-Length: {len(body_bytes)}\r\n\r\n"
full_message = header.encode('utf-8') + body_bytes
logger.debug(f"Sending message: {header.strip()} | Body: {body[:100]}...")
# Prepare event for response
response_event = threading.Event()
self.pending_requests[seq] = response_event
try:
# Send message
self.socket.sendall(full_message)
logger.debug(f"Sent request: {command} (seq={seq})")
# Wait for response
if not response_event.wait(timeout=timeout):
raise TimeoutError(f"No response for {command} within {timeout}s")
# Get response
response = self.responses.pop(seq, None)
if not response:
raise RuntimeError(f"Response for seq={seq} not found")
# Check for error response
if not response.get('success', True):
error_msg = response.get('message', 'Unknown error')
logger.warning(f"DAP request failed: {command} - {error_msg}")
return response
except Exception as e:
# Clean up on error
self.pending_requests.pop(seq, None)
self.responses.pop(seq, None)
logger.error(f"Error sending request {command}: {e}")
raise
finally:
# Clean up pending request
self.pending_requests.pop(seq, None)
def add_event_handler(self, handler: Callable[[dict[str, Any]], None]) -> None:
"""
Add a handler for DAP events.
Args:
handler: Callback function that receives event messages
"""
self.event_handlers.append(handler)
def _receive_loop(self) -> None:
"""
Background thread that receives and processes DAP messages.
Reads messages from socket, parses them, and dispatches to appropriate handlers.
"""
logger.debug("DAP receiver thread started")
try:
while not self._stop_receiver.is_set():
if not self.socket:
logger.debug("Socket is None, stopping receiver")
break
try:
# Read message
logger.debug("Waiting for message...")
message = self._read_message()
if not message:
logger.debug("Received None message, connection closed")
break
logger.debug(f"Received message: {message}")
# Process based on message type
msg_type = message.get('type')
if msg_type == 'response':
self._handle_response(message)
elif msg_type == 'event':
self._handle_event(message)
else:
logger.warning(f"Unknown message type: {msg_type}")
except TimeoutError:
logger.debug("Socket timeout, continuing...")
continue
except Exception as e:
if not self._stop_receiver.is_set():
logger.error(f"Error in receive loop: {e}", exc_info=True)
break
finally:
logger.debug("DAP receiver thread stopped")
def _read_message(self) -> dict[str, Any] | None:
"""
Read a single DAP message from socket using buffered reading.
Returns:
Parsed message dictionary, or None if connection closed
Raises:
socket.timeout: If read times out
ValueError: If message format is invalid
"""
if not self.socket:
return None
# Use longer timeout for message reads - debugpy can be slow
self.socket.settimeout(2.0)
try:
# Read into buffer until we have at least one complete message
while b'\r\n\r\n' not in self._read_buffer:
chunk = self.socket.recv(4096)
if not chunk:
logger.debug("Received empty chunk, connection closed")
return None
self._read_buffer += chunk
# Find header end
header_end = self._read_buffer.index(b'\r\n\r\n')
header_part = self._read_buffer[:header_end]
body_start_pos = header_end + 4
# Parse headers
headers = {}
header_lines = header_part.decode('utf-8').split('\r\n')
for line in header_lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
# Get content length
content_length_str = headers.get('Content-Length')
if not content_length_str:
raise ValueError("Missing Content-Length header")
content_length = int(content_length_str)
message_end_pos = body_start_pos + content_length
# Read more data if needed to get complete message
while len(self._read_buffer) < message_end_pos:
remaining = message_end_pos - len(self._read_buffer)
chunk = self.socket.recv(remaining)
if not chunk:
logger.debug("Received empty chunk while reading body")
return None
self._read_buffer += chunk
# Extract message body
message_body = self._read_buffer[body_start_pos:message_end_pos]
# Remove this message from buffer
self._read_buffer = self._read_buffer[message_end_pos:]
# Parse JSON
body = message_body.decode('utf-8')
logger.debug(f"Body: {body[:200]}...") # First 200 chars
message = json.loads(body)
logger.debug(f"Received message: type={message.get('type')}, command={message.get('command', message.get('event', 'N/A'))}")
return message
except TimeoutError as e:
logger.debug(f"Socket timeout while reading message: {e}")
raise
except Exception as e:
logger.error(f"Error reading message: {e}", exc_info=True)
raise
def _handle_response(self, message: dict[str, Any]) -> None:
"""
Handle a DAP response message.
Args:
message: Response message dictionary
"""
seq = message.get('request_seq')
if seq is None:
logger.warning("Response missing request_seq")
return
# Store response
self.responses[seq] = message
# Signal waiting thread
event = self.pending_requests.get(seq)
if event:
event.set()
else:
logger.warning(f"No pending request for seq={seq}")
def _handle_event(self, message: dict[str, Any]) -> None:
"""
Handle a DAP event message.
Args:
message: Event message dictionary
"""
event_type = message.get('event')
logger.debug(f"Received event: {event_type}")
# Dispatch to all event handlers
for handler in self.event_handlers:
try:
handler(message)
except Exception as e:
logger.error(f"Error in event handler: {e}")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - ensures disconnect."""
self.disconnect()
return False