debugger.py•12.4 kB
"""
Debugger controller using Python's bdb module.
Provides breakpoint management and execution control.
"""
import bdb
from pathlib import Path
from typing import Any
from .schemas import BreakpointResponse, ContinueResponse, ExecutionError, FrameInfo
from .utils import MAX_CONTAINER_ITEMS, MAX_DEPTH, format_traceback, safe_repr
class DebugController(bdb.Bdb):
"""
Custom debugger based on bdb.
Manages breakpoints and captures local variables when hit.
Supports true step execution by pausing and waiting for commands.
"""
def __init__(self):
super().__init__()
self.target_breakpoint: tuple[str, int] | None = None
self.breakpoint_hit = False
self.current_frame = None
self.current_locals: dict[str, Any] = {}
self.execution_completed = False
self.execution_error: dict[str, Any] | None = None
self.globals_dict: dict[str, Any] = {} # Persistent globals for continue
self.script_code = None # Compiled code for continue
self.script_path: Path | None = None # Script path for reference
def _should_skip_frame(self, filename: str) -> bool:
"""Check if we should skip tracing this frame."""
# Skip frozen/internal modules
if filename.startswith("<frozen ") or filename.startswith("<string"):
return True
# Skip standard library modules (but NOT site-packages)
import sysconfig
stdlib_path = sysconfig.get_path("stdlib")
if stdlib_path and filename.startswith(stdlib_path):
# Don't skip site-packages - users may want to debug installed libraries
if "site-packages" not in filename:
return True
return False
def _reset_breakpoint_state(self):
"""
Reset only breakpoint-related state for next breakpoint.
This keeps the execution context (globals, code) intact while
preparing for the next breakpoint hit.
"""
self.breakpoint_hit = False
self.current_frame = None
self.current_locals = {}
self.execution_error = None
# Ensure no stale breakpoints remain
try:
self.clear_all_breaks()
except Exception:
pass
# Keep: globals_dict, script_code, script_path, target_breakpoint
def user_line(self, frame):
"""Called when debugger stops at a line."""
if not self.target_breakpoint:
return
# Skip internal/standard library modules
filename = frame.f_code.co_filename
if self._should_skip_frame(filename):
return
# Normalize paths for comparison
try:
frame_file = Path(filename).resolve()
except (ValueError, OSError):
# Skip if path is invalid
return
target_file = Path(self.target_breakpoint[0]).resolve()
target_line = self.target_breakpoint[1]
if frame_file == target_file and frame.f_lineno == target_line:
self.breakpoint_hit = True
self.current_frame = frame
self.current_locals = self._capture_locals(frame)
# Stop execution immediately by quitting debugger
raise bdb.BdbQuit
def user_return(self, frame, return_value):
"""Called when a function returns."""
pass
def user_exception(self, frame, exc_info):
"""Called when an exception occurs."""
pass
def _capture_locals(self, frame) -> dict[str, Any]:
"""
Capture local variables from frame with safety limits.
Args:
frame: Python frame object
Returns:
Dictionary of variable name to safe representation
"""
locals_dict = {}
frame_locals = frame.f_locals
for name, value in frame_locals.items():
# Skip private/internal variables
if name.startswith("__"):
continue
try:
repr_str, is_truncated = safe_repr(value, depth=0, max_depth=MAX_DEPTH)
var_type = type(value).__name__
locals_dict[name] = {
"type": var_type,
"repr": repr_str,
"isTruncated": is_truncated,
}
# Add enhanced collection information
if hasattr(value, "__len__"):
try:
size = len(value)
locals_dict[name]["length"] = size
# Determine if indexed (list, tuple, etc.) or named (dict, etc.)
if isinstance(value, (list, tuple, set, frozenset, range)):
locals_dict[name]["isIndexed"] = True
elif isinstance(value, dict):
locals_dict[name]["hasNamedProperties"] = True
locals_dict[name]["namedCount"] = size
if size > MAX_CONTAINER_ITEMS:
locals_dict[name]["isTruncated"] = True
# Mark as expandable if it has items
if size > 0:
locals_dict[name]["isExpandable"] = True
except Exception:
pass
except Exception:
locals_dict[name] = {
"type": "unknown",
"repr": "<capture failed>",
"isTruncated": True,
}
return locals_dict
def run_to_breakpoint(
self, script_path: Path, file: str, line: int
) -> BreakpointResponse:
"""
Run script to a specific breakpoint.
This starts a new execution or can be called on already-running script.
For true step execution, use continue_execution() after first breakpoint.
Args:
script_path: Path to Python script to execute
file: File path for breakpoint (absolute)
line: Line number for breakpoint (1-based)
Returns:
Response with breakpoint hit status and locals
"""
# Reset state for new breakpoint
self._reset_breakpoint_state()
self.execution_completed = False
# Set target breakpoint
self.target_breakpoint = (str(file), line)
# Only load and compile if this is the first run
if not self.globals_dict:
# Load script for the first time
try:
with open(script_path) as f:
code = f.read()
# Compile code
compiled = compile(code, str(script_path), "exec")
self.script_code = compiled
self.script_path = script_path
# Initialize globals
self.globals_dict = {
"__name__": "__main__",
"__file__": str(script_path),
}
except Exception as e:
import sys
traceback_str = format_traceback(type(e), e, sys.exc_info()[2], max_frames=10, max_length=2048)
self.execution_error = ExecutionError(
type=type(e).__name__,
message=str(e),
traceback=traceback_str,
)
return self._build_breakpoint_response()
# Run the script under debugger
try:
self.reset()
self.clear_all_breaks()
self.set_break(str(file), line)
self.run(self.script_code, self.globals_dict)
if not self.breakpoint_hit:
self.execution_completed = True
except bdb.BdbQuit:
# Expected when breakpoint is hit and we stop execution
pass
except Exception as e:
import sys
traceback_str = format_traceback(type(e), e, sys.exc_info()[2], max_frames=50, max_length=4096)
self.execution_error = ExecutionError(
type=type(e).__name__,
message=str(e),
traceback=traceback_str,
)
finally:
try:
self.clear_all_breaks()
except Exception:
pass
# Build response
return self._build_breakpoint_response()
def continue_execution(self, file: str, line: int) -> tuple[ContinueResponse, 'DebugController']:
"""
Continue execution to next breakpoint within the same session.
NOTE: This uses a REPLAY approach - creates a new debugger instance
and re-executes from the beginning, but preserves global state.
This is safer than trying to reuse bdb state which can corrupt sys.modules.
IMPORTANT: Returns a tuple of (response, new_debugger_instance).
The caller MUST replace the old debugger with the new one.
Args:
file: File path for next breakpoint (absolute)
line: Line number for next breakpoint (1-based)
Returns:
Tuple of (response, new_debugger_instance)
"""
if not self.globals_dict:
return (
ContinueResponse(
hit=False,
completed=True,
error=ExecutionError(
type="SessionError",
message="No active execution context. Run to breakpoint first.",
),
),
self # Return self if no execution context
)
# Create a new debugger instance to avoid bdb state corruption
new_debugger = DebugController()
# Transfer state from current debugger
new_debugger.globals_dict = self.globals_dict.copy() # Copy globals
new_debugger.script_code = self.script_code
new_debugger.script_path = self.script_path
new_debugger.target_breakpoint = (str(file), line)
# Run the new debugger
try:
new_debugger.reset()
new_debugger.clear_all_breaks()
new_debugger.set_break(str(file), line)
new_debugger.run(new_debugger.script_code, new_debugger.globals_dict)
if not new_debugger.breakpoint_hit:
new_debugger.execution_completed = True
except bdb.BdbQuit:
pass
except Exception as e:
import sys
traceback_str = format_traceback(type(e), e, sys.exc_info()[2], max_frames=10, max_length=2048)
new_debugger.execution_error = ExecutionError(
type=type(e).__name__,
message=str(e),
traceback=traceback_str,
)
finally:
try:
new_debugger.clear_all_breaks()
except Exception:
pass
# Build response from new debugger
response = new_debugger._build_continue_response()
# Return both response and new debugger instance
return (response, new_debugger)
def _build_breakpoint_response(self) -> BreakpointResponse:
"""Build breakpoint response from current state."""
frame_info = None
if self.current_frame:
frame_info = FrameInfo(
file=str(self.current_frame.f_code.co_filename),
line=self.current_frame.f_lineno,
)
return BreakpointResponse(
hit=self.breakpoint_hit,
frameInfo=frame_info,
locals=self.current_locals if self.breakpoint_hit else None,
completed=self.execution_completed,
error=self.execution_error,
)
def _build_continue_response(self) -> ContinueResponse:
"""Build continue response from current state."""
frame_info = None
if self.current_frame:
frame_info = FrameInfo(
file=str(self.current_frame.f_code.co_filename),
line=self.current_frame.f_lineno,
)
return ContinueResponse(
hit=self.breakpoint_hit,
completed=self.execution_completed,
frameInfo=frame_info,
locals=self.current_locals if self.breakpoint_hit else None,
error=self.execution_error,
)