"""Debugger manager for handling dbgsrv.exe and CDB connections."""
import asyncio
import os
import re
import subprocess
import threading
import queue
import time
from datetime import datetime
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
from enum import Enum
class DebuggerState(Enum):
"""Debugger connection states."""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
ATTACHED = "attached"
BROKEN = "broken"
RUNNING = "running"
@dataclass
class DebugSession:
"""Represents an active debug session."""
server_process: Optional[subprocess.Popen] = None
client_process: Optional[subprocess.Popen] = None
port: int = 5005
state: DebuggerState = DebuggerState.DISCONNECTED
attached_pid: Optional[int] = None
attached_name: Optional[str] = None
# Output handling - queue-based architecture
output_queue: queue.Queue = field(default_factory=queue.Queue) # Raw CDB output lines
reader_thread: Optional[threading.Thread] = None # Single output reader thread
# Command execution synchronization
command_lock: threading.Lock = field(default_factory=threading.Lock)
current_command_id: Optional[str] = None
command_responses: Dict[str, List[str]] = field(default_factory=dict)
# Prompt detection
prompt_pattern: re.Pattern = field(default_factory=lambda: re.compile(r'^\d+:\d+>\s*$', re.MULTILINE))
# Exception monitoring fields
monitoring_enabled: bool = False
monitoring_thread: Optional[threading.Thread] = None
exception_lock: threading.Lock = field(default_factory=threading.Lock)
exception_captured_event: threading.Event = field(default_factory=threading.Event)
last_exception: Optional[Dict[str, Any]] = None
auto_freeze_on_exception: bool = True
capture_full_state: bool = True
generate_minidump: bool = True
minidump_path: Optional[str] = None
# Exception detection pattern
exception_pattern: re.Pattern = field(default_factory=lambda: re.compile(
r'(?:'
r'(first|second)\s+chance.*?exception.*?([\da-fA-Fx]+)|'
r'Access violation.*?at ([\da-fA-Fx]+)'
r')',
re.IGNORECASE
))
# Hardware breakpoint tracking
hardware_breakpoints: Dict[int, Dict[str, Any]] = field(default_factory=dict) # {cdb_bp_num: bp_info}
class DebuggerManager:
"""Manages WinDbg/CDB debugger connections and dbgsrv.exe lifecycle."""
def __init__(self, dbgsrv_path: str = "dbgsrv.exe", cdb_path: str = "cdb.exe"):
"""
Initialize the debugger manager.
Args:
dbgsrv_path: Path to dbgsrv.exe (default searches PATH)
cdb_path: Path to cdb.exe (default searches PATH)
"""
self.dbgsrv_path = dbgsrv_path
self.cdb_path = cdb_path
self.sessions: Dict[int, DebugSession] = {}
self._command_timeout = 30.0
async def start_debug_server(
self,
port: int = 5005,
pid: Optional[int] = None,
process_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Start CDB in server mode on the specified port, optionally attached to a process.
Args:
port: TCP port for the CDB server
pid: Process ID to attach to (optional)
process_name: Process name to attach to (optional, e.g., 'notepad.exe')
Returns:
Status information about the started server
"""
if port in self.sessions and self.sessions[port].server_process:
if self.sessions[port].server_process.poll() is None:
return {
"status": "already_running",
"port": port,
"message": f"CDB server already running on port {port}"
}
try:
# Build CDB command for direct attach (no server mode)
cmd = [self.cdb_path]
# Add process attachment if provided
attach_info = None
if pid:
cmd.extend(["-p", str(pid)])
attach_info = f"PID {pid}"
elif process_name:
cmd.extend(["-pn", process_name])
attach_info = f"process '{process_name}'"
else:
return {
"status": "error",
"message": "Either pid or process_name must be provided"
}
# Add startup command to ensure CDB is responsive
cmd.extend(["-c", ".echo CDB-READY; .echo ---"])
# Launch CDB directly (not in server mode)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout
stdin=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
text=True,
bufsize=1, # Line buffered
universal_newlines=True
)
# Give it a moment to start and attach
await asyncio.sleep(2)
# Check if it's still running
if process.poll() is not None:
stderr = process.stderr.read() if process.stderr else ""
stdout = process.stdout.read() if process.stdout else ""
error_msg = stderr or stdout or "Unknown error"
return {
"status": "error",
"message": f"Failed to start CDB server: {error_msg}"
}
# Create session with both server and client as the same process
session = DebugSession(
server_process=process,
client_process=process, # Same process for direct attach
port=port,
state=DebuggerState.CONNECTED # Already connected
)
# If attached to process, mark state accordingly
if pid or process_name:
session.state = DebuggerState.ATTACHED
session.attached_pid = pid
session.attached_name = process_name
# Start output reader thread - CRITICAL for preventing pipe blocking
def output_reader_loop():
"""
Continuously read CDB output line-by-line and put into queue.
This thread feeds both command execution and exception monitoring.
"""
try:
for line in iter(process.stdout.readline, ''):
if not line:
break
line = line.rstrip('\r\n')
# Put line in queue for consumers
session.output_queue.put(line)
# Check for exception if monitoring enabled
if session.monitoring_enabled:
self._check_for_exception(session, line)
except Exception as e:
print(f"[Output Reader] Error: {e}")
finally:
session.output_queue.put(None) # EOF marker
reader_thread = threading.Thread(target=output_reader_loop, daemon=True, name="CDB-Output-Reader")
reader_thread.start()
session.reader_thread = reader_thread
self.sessions[port] = session
# Wait for initial CDB banner and prompt
banner_lines = []
start_time = time.time()
while (time.time() - start_time) < 5.0:
try:
line = session.output_queue.get(timeout=0.1)
if line is None: # EOF
break
banner_lines.append(line)
# Check for prompt
if session.prompt_pattern.match(line):
break
except queue.Empty:
await asyncio.sleep(0.05)
banner = '\n'.join(banner_lines[:20]) # First 20 lines
success_msg = f"CDB attached to {attach_info}" if attach_info else "CDB started"
return {
"status": "success",
"port": port,
"pid": process.pid,
"message": success_msg,
"attached": attach_info is not None,
"target_pid": pid,
"target_name": process_name,
"initial_banner": banner[:500] if banner else "(no output received)"
}
except FileNotFoundError:
return {
"status": "error",
"message": f"Could not find cdb.exe at path: {self.cdb_path}"
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to start CDB server: {str(e)}"
}
async def stop_debug_server(self, port: int) -> Dict[str, Any]:
"""
Stop the CDB server on the specified port.
Args:
port: TCP port of the CDB server to stop
Returns:
Status information
"""
if port not in self.sessions:
return {
"status": "error",
"message": f"No debug server found on port {port}"
}
session = self.sessions[port]
# Disconnect client first if connected
if session.client_process:
await self._disconnect_client(session)
# Terminate server
if session.server_process and session.server_process.poll() is None:
session.server_process.terminate()
try:
session.server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
session.server_process.kill()
session.server_process.wait()
del self.sessions[port]
return {
"status": "success",
"message": f"Debug server on port {port} stopped"
}
async def connect_to_server(self, port: int, server_address: str = "localhost") -> Dict[str, Any]:
"""
Connect to CDB session (simplified - direct attach means already connected).
Args:
port: Session port identifier
server_address: Ignored (kept for API compatibility)
Returns:
Connection status information
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found. Call start_debug_server first."
}
session = self.sessions[port]
if not session.client_process or session.client_process.poll() is not None:
return {
"status": "error",
"message": "Debug session exists but CDB process is not running"
}
# Direct attach mode - connection already established during start_debug_server
# Just verify the process is still running and return success
return {
"status": "success",
"port": port,
"server": server_address,
"message": "Already connected (direct attach mode)",
"cdb_path": self.cdb_path,
"state": session.state.value
}
async def attach_to_process(self, port: int, pid: Optional[int] = None,
process_name: Optional[str] = None) -> Dict[str, Any]:
"""
Attach debugger to a process by PID or name.
Note: If the CDB server was started with a process attached (-p or -pn),
this method will return the already-attached status.
Args:
port: Port of the debug session
pid: Process ID to attach to
process_name: Process name to attach to
Returns:
Attachment status information
"""
if port not in self.sessions:
return {
"status": "error",
"message": "Not connected to debug server. Use connect_to_server first."
}
session = self.sessions[port]
# Check if already attached during server start
if session.state == DebuggerState.ATTACHED:
return {
"status": "already_attached",
"message": f"Already attached to process during server start",
"pid": session.attached_pid,
"process_name": session.attached_name
}
if not session.client_process or session.client_process.poll() is not None:
return {
"status": "error",
"message": "Debugger client not running"
}
try:
if pid:
# Attach by PID
result = await self._execute_command(session, f".attach 0x{pid:x}")
session.attached_pid = pid
elif process_name:
# Attach by name
result = await self._execute_command(session, f".attach -pn {process_name}")
session.attached_name = process_name
else:
return {
"status": "error",
"message": "Must specify either pid or process_name"
}
if "Attach will occur" in result or "Attached to" in result:
session.state = DebuggerState.ATTACHED
return {
"status": "success",
"pid": pid,
"process_name": process_name,
"output": result
}
else:
return {
"status": "error",
"message": f"Failed to attach: {result}"
}
except Exception as e:
return {
"status": "error",
"message": f"Attachment failed: {str(e)}"
}
async def detach_from_process(self, port: int) -> Dict[str, Any]:
"""
Detach debugger from current process.
Args:
port: Port of the debug session
Returns:
Detachment status
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
try:
result = await self._execute_command(session, ".detach")
session.state = DebuggerState.CONNECTED
session.attached_pid = None
session.attached_name = None
return {
"status": "success",
"output": result
}
except Exception as e:
return {
"status": "error",
"message": f"Detachment failed: {str(e)}"
}
async def execute_command(self, port: int, command: str) -> Dict[str, Any]:
"""
Execute a raw WinDbg command.
Args:
port: Port of the debug session
command: WinDbg command to execute
Returns:
Command output
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
if not session.client_process or session.client_process.poll() is not None:
return {
"status": "error",
"message": "Debugger client not running"
}
try:
# Handle special break command
if command == "::CTRL_BREAK::":
import signal
import os
# Send Ctrl+Break to CDB process group
os.kill(session.client_process.pid, signal.CTRL_BREAK_EVENT)
await asyncio.sleep(1) # Wait for break to take effect
return {
"status": "success",
"command": "CTRL+BREAK",
"output": "Break signal sent to debugger"
}
output = await self._execute_command(session, command)
return {
"status": "success",
"command": command,
"output": output
}
except Exception as e:
return {
"status": "error",
"command": command,
"message": f"Command execution failed: {str(e)}"
}
async def _execute_command(self, session: DebugSession, command: str, timeout: float = 5.0) -> str:
"""
Execute a command and collect response from output queue.
This method:
1. Acquires command lock to ensure sequential execution
2. Clears output queue of any stale data
3. Sends command to CDB stdin
4. Collects response lines until prompt is detected
5. Returns collected output
Args:
session: Debug session
command: Command to execute
timeout: Maximum time to wait for response
Returns:
Command output as string
"""
if not session.client_process or not session.client_process.stdin:
raise Exception("No active debugger client")
with session.command_lock:
# Clear any stale output from queue
cleared = 0
while not session.output_queue.empty():
try:
session.output_queue.get_nowait()
cleared += 1
except queue.Empty:
break
if cleared > 0:
print(f"[Command] Cleared {cleared} stale lines from queue")
# Send command
cmd_str = f"{command}\n"
session.client_process.stdin.write(cmd_str)
session.client_process.stdin.flush()
# Collect response lines until we see the prompt
response_lines = []
start_time = time.time()
seen_command_echo = False
while (time.time() - start_time) < timeout:
try:
line = session.output_queue.get(timeout=0.1)
if line is None: # EOF marker
break
# Skip the command echo (first line is usually the command itself)
if not seen_command_echo:
if command.strip() in line:
seen_command_echo = True
continue
# Check for prompt (indicates command finished)
if session.prompt_pattern.match(line):
break
response_lines.append(line)
except queue.Empty:
# No output available yet
# If we've been waiting a while without output, may be done
if seen_command_echo and len(response_lines) > 0:
elapsed_since_last = time.time() - start_time
if elapsed_since_last > 1.0: # 1 second of silence
break
continue
# Use asyncio.sleep to yield control
await asyncio.sleep(0)
return '\n'.join(response_lines)
async def _disconnect_client(self, session: DebugSession):
"""Disconnect and cleanup client process."""
if session.client_process and session.client_process.poll() is None:
try:
# Try graceful quit
if session.client_process.stdin:
session.client_process.stdin.write("q\n")
session.client_process.stdin.flush()
session.client_process.wait(timeout=2)
except:
pass
# Force terminate if still running
if session.client_process.poll() is None:
session.client_process.terminate()
try:
session.client_process.wait(timeout=2)
except subprocess.TimeoutExpired:
session.client_process.kill()
session.client_process = None
session.state = DebuggerState.DISCONNECTED
def _check_for_exception(self, session: DebugSession, line: str):
"""
Check if line contains exception information.
Called from output reader thread.
Args:
session: Debug session
line: Output line to check
"""
match = session.exception_pattern.search(line)
if not match:
return
# Extract exception info
if match.group(1): # first/second chance pattern
chance = match.group(1).lower()
code = match.group(2) if match.group(2) else "unknown"
else: # Access violation pattern
chance = "first"
code = match.group(3) if len(match.groups()) >= 3 and match.group(3) else "c0000005"
print(f"[Exception Monitor] Detected {chance} chance exception: {code}")
# Auto-freeze threads if configured
if session.auto_freeze_on_exception:
try:
# Send Ctrl+C to break execution
import signal
if os.name == 'nt':
os.kill(session.client_process.pid, signal.CTRL_BREAK_EVENT)
else:
session.client_process.send_signal(signal.SIGINT)
time.sleep(0.2) # Let break take effect
except Exception as e:
print(f"[Exception Monitor] Failed to freeze threads: {e}")
# Capture exception state in separate thread to avoid blocking output reader
if session.capture_full_state:
capture_thread = threading.Thread(
target=self._capture_exception_state_sync,
args=(session, code, chance),
daemon=True,
name="Exception-Capture"
)
capture_thread.start()
async def enable_exception_monitoring(
self,
port: int,
auto_freeze_on_exception: bool = True,
capture_full_state: bool = True,
generate_minidump: bool = True,
minidump_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Enable background exception monitoring with auto-capture.
Args:
port: Port of the debug session
auto_freeze_on_exception: Freeze all threads when exception occurs
capture_full_state: Capture registers, callstack, etc.
generate_minidump: Generate minidump file on exception
minidump_path: Path for minidump files (default: C:\\dumps)
Returns:
Status information
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
if session.monitoring_enabled:
return {
"status": "already_enabled",
"message": "Exception monitoring already enabled"
}
# Update session configuration
session.auto_freeze_on_exception = auto_freeze_on_exception
session.capture_full_state = capture_full_state
session.generate_minidump = generate_minidump
session.minidump_path = minidump_path or "C:\\dumps"
# Ensure dump directory exists
if generate_minidump:
os.makedirs(session.minidump_path, exist_ok=True)
# Enable exception monitoring (happens in output reader thread)
session.monitoring_enabled = True
# Set CDB to break on exceptions
try:
await self._execute_command(session, "sxe av", timeout=2.0) # Access violations
await self._execute_command(session, "sxe *", timeout=2.0) # All exceptions
except Exception as e:
print(f"[Exception Monitor] Warning: Failed to set exception breaks: {e}")
return {
"status": "success",
"message": "Exception monitoring enabled",
"output_reader_active": session.reader_thread.is_alive() if session.reader_thread else False,
"auto_freeze": auto_freeze_on_exception,
"capture_state": capture_full_state,
"minidump": generate_minidump,
"minidump_path": session.minidump_path
}
def _capture_exception_state_sync(self, session: DebugSession, code: str, chance: str):
"""
Synchronously capture exception state (runs in separate thread).
This method executes CDB commands to capture full exception state.
Uses synchronous command execution since it runs in a non-async thread.
Args:
session: Debug session
code: Exception code (e.g., "c0000005")
chance: "first" or "second"
"""
print(f"[Exception Capture] Starting state capture for {code}")
exception_data = {
"code": code if code.startswith("0x") else f"0x{code}",
"is_first_chance": chance == "first",
"timestamp": datetime.now().isoformat(),
"captured_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"registers": {},
"callstack": [],
"disassembly": "",
"analyze": "",
"lastevent": ""
}
if not session.client_process or not session.client_process.stdin:
print("[Exception Capture] No active CDB process")
return
# Helper function for synchronous command execution from capture thread
def send_and_collect(command: str, cmd_timeout: float = 3.0) -> str:
"""Send command and collect response synchronously"""
# Clear any pending output
cleared = 0
while not session.output_queue.empty():
try:
session.output_queue.get_nowait()
cleared += 1
if cleared > 100: # Safety limit
break
except queue.Empty:
break
# Send command
session.client_process.stdin.write(f"{command}\n")
session.client_process.stdin.flush()
# Collect response
response_lines = []
start_time = time.time()
seen_echo = False
while (time.time() - start_time) < cmd_timeout:
try:
line = session.output_queue.get(timeout=0.1)
if line is None: # EOF
break
# Skip command echo
if not seen_echo and command.strip() in line:
seen_echo = True
continue
# Check for prompt
if session.prompt_pattern.match(line):
break
response_lines.append(line)
except queue.Empty:
if seen_echo and len(response_lines) > 0:
if (time.time() - start_time) > 1.0: # 1 sec silence
break
continue
return '\n'.join(response_lines)
try:
# Capture registers
print("[Exception Capture] Getting registers...")
reg_output = send_and_collect("r", 2.0)
exception_data["registers_raw"] = reg_output
# Parse registers
for line in reg_output.split('\n'):
if '=' in line:
match = re.match(r'(\w+)=([0-9a-fA-F`]+)', line.strip())
if match:
reg_name = match.group(1).lower()
reg_value = match.group(2).replace('`', '')
exception_data["registers"][reg_name] = f"0x{reg_value}"
# Capture callstack
print("[Exception Capture] Getting callstack...")
stack_output = send_and_collect("k", 2.0)
exception_data["callstack_raw"] = stack_output
# Parse callstack frames
for line in stack_output.split('\n'):
if re.match(r'^\s*[0-9a-f]{2}\s+', line, re.IGNORECASE):
exception_data["callstack"].append(line.strip())
# Capture last event
print("[Exception Capture] Getting last event...")
event_output = send_and_collect(".lastevent", 2.0)
exception_data["lastevent"] = event_output.strip()
# Capture disassembly
if exception_data["registers"].get("rip"):
print("[Exception Capture] Disassembling at RIP...")
rip = exception_data["registers"]["rip"]
disasm_output = send_and_collect(f"u {rip} L10", 2.0)
exception_data["disassembly"] = disasm_output.strip()
# Capture analysis (can be slow)
print("[Exception Capture] Running !analyze -v...")
analyze_output = send_and_collect("!analyze -v", 5.0)
exception_data["analyze"] = analyze_output.strip()
# Generate minidump if enabled
if session.generate_minidump and session.minidump_path:
print("[Exception Capture] Generating minidump...")
timestamp = int(time.time())
dump_file = os.path.join(
session.minidump_path,
f"crash_{code}_{timestamp}.dmp"
)
try:
dump_output = send_and_collect(f".dump /ma {dump_file}", 10.0)
exception_data["minidump_path"] = dump_file
exception_data["minidump_output"] = dump_output
print(f"[Exception Capture] Minidump saved to: {dump_file}")
except Exception as e:
exception_data["minidump_error"] = str(e)
print(f"[Exception Capture] Minidump failed: {e}")
# Store exception data
with session.exception_lock:
session.last_exception = exception_data
session.exception_captured_event.set()
print("[Exception Capture] State capture completed successfully")
except Exception as e:
exception_data["capture_error"] = str(e)
print(f"[Exception Capture] Error during capture: {e}")
import traceback
traceback.print_exc()
# Still store partial data
with session.exception_lock:
session.last_exception = exception_data
session.exception_captured_event.set()
async def get_last_exception_captured(self, port: int) -> Dict[str, Any]:
"""
Get the last captured exception data.
Args:
port: Port of the debug session
Returns:
Exception data or status
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
if session.last_exception:
return {
"status": "success",
"exception_found": True,
"exception": session.last_exception
}
else:
return {
"status": "success",
"exception_found": False,
"message": "No exception captured yet"
}
async def create_minidump(
self,
port: int,
output_path: str,
dump_type: str = "full"
) -> Dict[str, Any]:
"""
Generate a minidump on demand.
Args:
port: Port of the debug session
output_path: Path for the dump file
dump_type: "mini", "heap", "full" (default)
Returns:
Status information
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
# Map dump type to CDB command
dump_commands = {
"mini": ".dump /m",
"heap": ".dump /mh",
"full": ".dump /ma"
}
dump_cmd = dump_commands.get(dump_type, ".dump /ma")
try:
# Ensure directory exists
dump_dir = os.path.dirname(output_path)
if dump_dir:
os.makedirs(dump_dir, exist_ok=True)
# Execute dump command
result = await self._execute_command(session, f"{dump_cmd} {output_path}")
if "Creating" in result or "Dump successfully written" in result:
return {
"status": "success",
"dump_path": output_path,
"dump_type": dump_type,
"output": result
}
else:
return {
"status": "error",
"message": f"Failed to create dump: {result}"
}
except Exception as e:
return {
"status": "error",
"message": f"Dump creation failed: {str(e)}"
}
async def set_exception_break(
self,
port: int,
exception_code: str,
break_type: str = "first",
auto_capture_state: bool = True
) -> Dict[str, Any]:
"""
Set exception breakpoint with auto-capture.
Args:
port: Port of the debug session
exception_code: Exception code ("av", "c0000005", "*" for all)
break_type: "first" or "second" chance
auto_capture_state: Auto-capture state on break
Returns:
Status information
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
try:
# Build command
if break_type == "first":
cmd = f"sxe {exception_code}"
elif break_type == "second":
cmd = f"sxd {exception_code}"
else:
return {
"status": "error",
"message": "break_type must be 'first' or 'second'"
}
result = await self._execute_command(session, cmd)
return {
"status": "success",
"exception_code": exception_code,
"break_type": break_type,
"auto_capture": auto_capture_state,
"output": result
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to set exception break: {str(e)}"
}
async def resume_with_monitoring(
self,
port: int,
timeout: Optional[int] = None,
stop_on_exception: bool = True
) -> Dict[str, Any]:
"""
Resume execution with exception monitoring.
Args:
port: Port of the debug session
timeout: Seconds to wait for exception (None = no timeout)
stop_on_exception: Auto-stop when exception occurs
Returns:
Status information with any captured exception
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
if not session.monitoring_enabled:
return {
"status": "error",
"message": "Exception monitoring not enabled. Call enable_exception_monitoring first."
}
try:
# Clear previous exception and reset event
with session.exception_lock:
session.last_exception = None
session.exception_captured_event.clear()
# Resume execution
print("[Resume] Sending 'g' command to resume execution")
await self._execute_command(session, "g", timeout=1.0)
session.state = DebuggerState.RUNNING
# Wait for exception or timeout
if timeout:
print(f"[Resume] Waiting up to {timeout} seconds for exception...")
# Wait using event (runs in separate thread, non-blocking)
loop = asyncio.get_event_loop()
async def wait_for_exception():
"""Async wrapper for event wait"""
start_time = time.time()
while (time.time() - start_time) < timeout:
if session.exception_captured_event.is_set():
return True
await asyncio.sleep(0.1)
return False
exception_occurred = await wait_for_exception()
if exception_occurred:
print("[Resume] Exception captured!")
with session.exception_lock:
return {
"status": "exception_captured",
"exception": session.last_exception,
"message": "Exception occurred during execution"
}
else:
print(f"[Resume] Timeout after {timeout} seconds, no exception")
# Send break to stop execution
try:
import signal
if os.name == 'nt':
os.kill(session.client_process.pid, signal.CTRL_BREAK_EVENT)
else:
session.client_process.send_signal(signal.SIGINT)
except:
pass
return {
"status": "timeout",
"message": f"No exception occurred within {timeout} seconds",
"exception_found": False
}
# No timeout - return immediately
return {
"status": "running",
"message": "Execution resumed with monitoring active"
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to resume: {str(e)}"
}
async def set_hardware_breakpoint(
self,
port: int,
address: str,
hw_type: str = "execute",
size: int = 1,
condition: Optional[str] = None,
thread_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Set a hardware breakpoint using CPU debug registers.
Hardware breakpoints are fast, non-intrusive, and work even with
fast-terminating threads. Limited to 4 breakpoints (DR0-DR3).
Args:
port: Port of the debug session
address: Memory address in hex (e.g., "0x140001234")
hw_type: Breakpoint type:
- "execute" - Break on instruction execution (default)
- "write" - Break on memory write
- "readwrite" - Break on memory read or write
- "io" - Break on I/O port access (x86 only)
size: Size of watched region in bytes (1, 2, 4, or 8)
Only applies to data breakpoints (write/readwrite)
Execute breakpoints always use size=1
condition: Optional WinDbg condition expression
thread_id: Apply only to specific thread (None = all threads)
Returns:
Status dict with breakpoint info or error
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
# Validate hw_type
valid_types = ["execute", "write", "readwrite", "io"]
if hw_type not in valid_types:
return {
"status": "error",
"error": "InvalidTypeError",
"message": f"Invalid type '{hw_type}'. Must be one of: {', '.join(valid_types)}",
"provided_type": hw_type
}
# Validate size
valid_sizes = [1, 2, 4, 8]
if size not in valid_sizes:
return {
"status": "error",
"error": "InvalidSizeError",
"message": f"Size must be 1, 2, 4, or 8 bytes",
"provided_size": size
}
# Execute breakpoints always use size 1
if hw_type == "execute":
size = 1
# Check address alignment
try:
addr_int = int(address, 16) if isinstance(address, str) else address
if addr_int % size != 0:
return {
"status": "error",
"error": "InvalidAddressError",
"message": f"Address {address} not aligned for size {size} (must be {size}-byte aligned)",
"address": address,
"size": size
}
except ValueError:
return {
"status": "error",
"error": "InvalidAddressError",
"message": f"Invalid address format: {address}",
"address": address
}
# Check if we have available hardware breakpoint slots (max 4)
if len(session.hardware_breakpoints) >= 4:
return {
"status": "error",
"error": "NoAvailableSlotError",
"message": "All 4 hardware breakpoint slots are in use",
"available_slots": 0,
"in_use": list(session.hardware_breakpoints.keys())
}
try:
# Generate CDB hardware breakpoint command
type_map = {
"execute": "e",
"write": "w",
"readwrite": "r",
"io": "i"
}
type_code = type_map[hw_type]
# Build ba (breakpoint address) command
ba_cmd = f"ba {type_code}{size} {address}"
# Add condition if specified
if condition:
ba_cmd += f" \"{condition}\""
# Add thread specification if provided
if thread_id is not None:
ba_cmd = f"~{thread_id} {ba_cmd}"
# Execute command
result = await self._execute_command(session, ba_cmd, timeout=2.0)
# Parse breakpoint number from output
# CDB returns something like "Breakpoint 0 created"
bp_num_match = re.search(r'Breakpoint (\d+)', result, re.IGNORECASE)
if bp_num_match:
bp_num = int(bp_num_match.group(1))
# Determine which DR register (0-3 based on order)
dr_num = len(session.hardware_breakpoints)
dr_register = f"DR{dr_num}"
# Store breakpoint info
bp_info = {
"cdb_bp_number": bp_num,
"address": address,
"type": hw_type,
"size": size,
"enabled": True,
"thread_id": thread_id,
"dr_register": dr_register,
"condition": condition,
"hit_count": 0
}
session.hardware_breakpoints[bp_num] = bp_info
return {
"status": "success",
"breakpoint_id": bp_num,
"address": address,
"type": hw_type,
"size": size,
"enabled": True,
"thread_id": thread_id,
"dr_register": dr_register,
"command": ba_cmd,
"output": result
}
else:
# Failed to create breakpoint
return {
"status": "error",
"message": f"Failed to create hardware breakpoint: {result}",
"command": ba_cmd,
"output": result
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to set hardware breakpoint: {str(e)}"
}
async def remove_hardware_breakpoint(
self,
port: int,
breakpoint_id: int
) -> Dict[str, Any]:
"""
Remove a hardware breakpoint.
Args:
port: Port of the debug session
breakpoint_id: Hardware breakpoint ID (CDB breakpoint number)
Returns:
Status dict
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
if breakpoint_id not in session.hardware_breakpoints:
return {
"status": "error",
"message": f"Hardware breakpoint {breakpoint_id} not found",
"available_breakpoints": list(session.hardware_breakpoints.keys())
}
try:
# Execute bc (breakpoint clear) command
bc_cmd = f"bc {breakpoint_id}"
result = await self._execute_command(session, bc_cmd, timeout=2.0)
# Remove from tracking
bp_info = session.hardware_breakpoints.pop(breakpoint_id)
return {
"status": "success",
"breakpoint_id": breakpoint_id,
"message": f"Hardware breakpoint {breakpoint_id} removed",
"was": bp_info,
"output": result
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to remove hardware breakpoint: {str(e)}"
}
async def list_hardware_breakpoints(self, port: int) -> Dict[str, Any]:
"""
List all active hardware breakpoints.
Args:
port: Port of the debug session
Returns:
Dict with list of breakpoints and available slots
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
try:
# Execute bl (breakpoint list) command
bl_cmd = "bl"
result = await self._execute_command(session, bl_cmd, timeout=2.0)
# Update hit counts from bl output
# Format: " 0 e Disable Clear 00007ff6`c2b60000 0001 (0001) 0:**** "
for line in result.split('\n'):
match = re.match(r'\s*(\d+)\s+e.*?(\d+)\s+\((\d+)\)', line)
if match:
bp_num = int(match.group(1))
hit_count = int(match.group(3))
if bp_num in session.hardware_breakpoints:
session.hardware_breakpoints[bp_num]["hit_count"] = hit_count
# Build response
breakpoints = []
for bp_num, bp_info in session.hardware_breakpoints.items():
breakpoints.append({
"id": bp_num,
"address": bp_info["address"],
"type": bp_info["type"],
"size": bp_info["size"],
"enabled": bp_info["enabled"],
"hit_count": bp_info["hit_count"],
"dr_register": bp_info["dr_register"],
"thread_id": bp_info["thread_id"],
"condition": bp_info["condition"]
})
available_slots = 4 - len(session.hardware_breakpoints)
return {
"status": "success",
"breakpoints": breakpoints,
"available_slots": available_slots,
"total_slots": 4,
"output": result
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to list hardware breakpoints: {str(e)}"
}
async def toggle_hardware_breakpoint(
self,
port: int,
breakpoint_id: int,
enabled: bool
) -> Dict[str, Any]:
"""
Enable or disable a hardware breakpoint.
Args:
port: Port of the debug session
breakpoint_id: Hardware breakpoint ID
enabled: True to enable, False to disable
Returns:
Status dict
"""
if port not in self.sessions:
return {
"status": "error",
"message": "No debug session found"
}
session = self.sessions[port]
if breakpoint_id not in session.hardware_breakpoints:
return {
"status": "error",
"message": f"Hardware breakpoint {breakpoint_id} not found"
}
try:
# Execute be (enable) or bd (disable) command
cmd = f"be {breakpoint_id}" if enabled else f"bd {breakpoint_id}"
result = await self._execute_command(session, cmd, timeout=2.0)
# Update tracking
session.hardware_breakpoints[breakpoint_id]["enabled"] = enabled
return {
"status": "success",
"breakpoint_id": breakpoint_id,
"enabled": enabled,
"message": f"Hardware breakpoint {breakpoint_id} {'enabled' if enabled else 'disabled'}",
"output": result
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to toggle hardware breakpoint: {str(e)}"
}
async def cleanup(self):
"""Cleanup all active sessions."""
# Disable monitoring and wait for threads
for session in self.sessions.values():
session.monitoring_enabled = False
# Reader thread will exit when process terminates
if session.reader_thread and session.reader_thread.is_alive():
# Don't join - will exit when process stdout closes
pass
# Stop all debug servers
ports = list(self.sessions.keys())
for port in ports:
await self.stop_debug_server(port)