Skip to main content
Glama
dap_client.py14.7 kB
""" Low-level DAP (Debug Adapter Protocol) client implementation. Handles socket communication, message encoding/decoding, and request/response correlation for the Debug Adapter Protocol. References: - DAP Specification: https://microsoft.github.io/debug-adapter-protocol/ - Message Format: Content-Length header + JSON body """ import json import logging import socket import threading from collections.abc import Callable from typing import Any logger = logging.getLogger(__name__) class DAPClient: """ Low-level DAP protocol communication client. Manages socket connection, message encoding/decoding, and request/response correlation. Messages follow the DAP format: Content-Length header followed by JSON body. """ def __init__(self): """Initialize DAP client.""" self.socket: socket.socket | None = None self.seq_counter = 1 self.pending_requests: dict[int, threading.Event] = {} self.responses: dict[int, dict[str, Any]] = {} self.event_handlers: list[Callable[[dict[str, Any]], None]] = [] self._receiver_thread: threading.Thread | None = None self._stop_receiver = threading.Event() self._lock = threading.Lock() self._read_buffer = b'' # Buffer for partial messages def connect(self, host: str, port: int, timeout: float = 10.0) -> None: """ Establish socket connection to DAP server (client mode - NOT WORKING with debugpy). WARNING: This mode does not work with debugpy due to adapter message loop issue. Use listen() instead for reverse connection. Args: host: Hostname or IP address port: Port number timeout: Connection timeout in seconds Raises: ConnectionError: If connection fails """ try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Disable Nagle's algorithm for low-latency communication self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.settimeout(timeout) self.socket.connect((host, port)) logger.info(f"Connected to DAP server at {host}:{port}") # Start receiver thread self._stop_receiver.clear() self._receiver_thread = threading.Thread( target=self._receive_loop, daemon=True, name="DAPClientReceiver" ) self._receiver_thread.start() except Exception as e: logger.error(f"Failed to connect to DAP server: {e}") raise ConnectionError(f"Failed to connect to {host}:{port}: {e}") from e def listen(self, host: str, port: int, timeout: float = 30.0) -> None: """ Listen for incoming DAP connection (server mode - WORKING with debugpy.connect()). This is the reverse connection pattern: 1. Our DAP client listens on a port 2. debugpy connects to us via debugpy.connect() 3. Message exchange works perfectly This approach solves the debugpy adapter message loop issue. Args: host: Hostname to bind to (usually '127.0.0.1') port: Port to listen on timeout: Connection accept timeout in seconds Raises: ConnectionError: If listen/accept fails """ server_socket = None try: # Create server socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((host, port)) server_socket.listen(1) server_socket.settimeout(timeout) logger.info(f"Listening for DAP connection on {host}:{port}") # Accept connection from debugpy client_socket, addr = server_socket.accept() logger.info(f"Accepted DAP connection from {addr}") # Configure client socket client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Store as our communication socket self.socket = client_socket # Start receiver thread self._stop_receiver.clear() self._receiver_thread = threading.Thread( target=self._receive_loop, daemon=True, name="DAPClientReceiver" ) self._receiver_thread.start() except TimeoutError: raise ConnectionError(f"Timeout waiting for DAP connection on {host}:{port}") except Exception as e: logger.error(f"Failed to listen for DAP connection: {e}") raise ConnectionError(f"Failed to listen on {host}:{port}: {e}") from e finally: # Close server socket (we only need the client socket now) if server_socket: try: server_socket.close() except Exception: pass def disconnect(self) -> None: """Close the socket connection and stop receiver thread.""" if self._receiver_thread and self._receiver_thread.is_alive(): self._stop_receiver.set() if self._receiver_thread != threading.current_thread(): self._receiver_thread.join(timeout=2.0) if self.socket: try: self.socket.close() except Exception as e: logger.warning(f"Error closing socket: {e}") finally: self.socket = None logger.info("Disconnected from DAP server") def send_request( self, command: str, arguments: dict[str, Any] | None = None, timeout: float = 10.0 ) -> dict[str, Any]: """ Send a DAP request and wait for response. Args: command: DAP command name (e.g., 'initialize', 'launch', 'setBreakpoints') arguments: Command arguments (optional) timeout: Response timeout in seconds Returns: Response message as dictionary Raises: ConnectionError: If not connected or send fails TimeoutError: If response not received within timeout """ if not self.socket: raise ConnectionError("Not connected to DAP server") with self._lock: seq = self.seq_counter self.seq_counter += 1 # Build request message message: dict[str, Any] = { 'seq': seq, 'type': 'request', 'command': command, } if arguments: message['arguments'] = arguments # Encode message with Content-Length header body = json.dumps(message) body_bytes = body.encode('utf-8') header = f"Content-Length: {len(body_bytes)}\r\n\r\n" full_message = header.encode('utf-8') + body_bytes logger.debug(f"Sending message: {header.strip()} | Body: {body[:100]}...") # Prepare event for response response_event = threading.Event() self.pending_requests[seq] = response_event try: # Send message self.socket.sendall(full_message) logger.debug(f"Sent request: {command} (seq={seq})") # Wait for response if not response_event.wait(timeout=timeout): raise TimeoutError(f"No response for {command} within {timeout}s") # Get response response = self.responses.pop(seq, None) if not response: raise RuntimeError(f"Response for seq={seq} not found") # Check for error response if not response.get('success', True): error_msg = response.get('message', 'Unknown error') logger.warning(f"DAP request failed: {command} - {error_msg}") return response except Exception as e: # Clean up on error self.pending_requests.pop(seq, None) self.responses.pop(seq, None) logger.error(f"Error sending request {command}: {e}") raise finally: # Clean up pending request self.pending_requests.pop(seq, None) def add_event_handler(self, handler: Callable[[dict[str, Any]], None]) -> None: """ Add a handler for DAP events. Args: handler: Callback function that receives event messages """ self.event_handlers.append(handler) def _receive_loop(self) -> None: """ Background thread that receives and processes DAP messages. Reads messages from socket, parses them, and dispatches to appropriate handlers. """ logger.debug("DAP receiver thread started") try: while not self._stop_receiver.is_set(): if not self.socket: logger.debug("Socket is None, stopping receiver") break try: # Read message logger.debug("Waiting for message...") message = self._read_message() if not message: logger.debug("Received None message, connection closed") break logger.debug(f"Received message: {message}") # Process based on message type msg_type = message.get('type') if msg_type == 'response': self._handle_response(message) elif msg_type == 'event': self._handle_event(message) else: logger.warning(f"Unknown message type: {msg_type}") except TimeoutError: logger.debug("Socket timeout, continuing...") continue except Exception as e: if not self._stop_receiver.is_set(): logger.error(f"Error in receive loop: {e}", exc_info=True) break finally: logger.debug("DAP receiver thread stopped") def _read_message(self) -> dict[str, Any] | None: """ Read a single DAP message from socket using buffered reading. Returns: Parsed message dictionary, or None if connection closed Raises: socket.timeout: If read times out ValueError: If message format is invalid """ if not self.socket: return None # Use longer timeout for message reads - debugpy can be slow self.socket.settimeout(2.0) try: # Read into buffer until we have at least one complete message while b'\r\n\r\n' not in self._read_buffer: chunk = self.socket.recv(4096) if not chunk: logger.debug("Received empty chunk, connection closed") return None self._read_buffer += chunk # Find header end header_end = self._read_buffer.index(b'\r\n\r\n') header_part = self._read_buffer[:header_end] body_start_pos = header_end + 4 # Parse headers headers = {} header_lines = header_part.decode('utf-8').split('\r\n') for line in header_lines: if ':' in line: key, value = line.split(':', 1) headers[key.strip()] = value.strip() # Get content length content_length_str = headers.get('Content-Length') if not content_length_str: raise ValueError("Missing Content-Length header") content_length = int(content_length_str) message_end_pos = body_start_pos + content_length # Read more data if needed to get complete message while len(self._read_buffer) < message_end_pos: remaining = message_end_pos - len(self._read_buffer) chunk = self.socket.recv(remaining) if not chunk: logger.debug("Received empty chunk while reading body") return None self._read_buffer += chunk # Extract message body message_body = self._read_buffer[body_start_pos:message_end_pos] # Remove this message from buffer self._read_buffer = self._read_buffer[message_end_pos:] # Parse JSON body = message_body.decode('utf-8') logger.debug(f"Body: {body[:200]}...") # First 200 chars message = json.loads(body) logger.debug(f"Received message: type={message.get('type')}, command={message.get('command', message.get('event', 'N/A'))}") return message except TimeoutError as e: logger.debug(f"Socket timeout while reading message: {e}") raise except Exception as e: logger.error(f"Error reading message: {e}", exc_info=True) raise def _handle_response(self, message: dict[str, Any]) -> None: """ Handle a DAP response message. Args: message: Response message dictionary """ seq = message.get('request_seq') if seq is None: logger.warning("Response missing request_seq") return # Store response self.responses[seq] = message # Signal waiting thread event = self.pending_requests.get(seq) if event: event.set() else: logger.warning(f"No pending request for seq={seq}") def _handle_event(self, message: dict[str, Any]) -> None: """ Handle a DAP event message. Args: message: Event message dictionary """ event_type = message.get('event') logger.debug(f"Received event: {event_type}") # Dispatch to all event handlers for handler in self.event_handlers: try: handler(message) except Exception as e: logger.error(f"Error in event handler: {e}") def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - ensures disconnect.""" self.disconnect() return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaina3/Debug-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server