sessions.py•35.6 kB
"""
Session manager for debug sessions.
Handles session lifecycle: creation, state tracking, and cleanup.
"""
import asyncio
import contextlib
import json
import os
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from .schemas import (
BreakpointRequest,
BreakpointResponse,
BreakpointTarget,
EndSessionResponse,
ExecutionError,
SessionStateResponse,
SessionStatus,
SessionTimings,
StartSessionRequest,
StartSessionResponse,
)
from .utils import DEFAULT_TIMEOUT_SECONDS, Timer, resolve_workspace_path, validate_file_and_line
class DebugSession:
"""Represents an active debug session."""
def __init__(
self,
session_id: str,
entry: Path,
python_path: str,
workspace_root: Path,
args: list[str] | None = None,
env: dict[str, str] | None = None,
use_dap: bool = True,
):
self.id = session_id
self.entry = entry
self.workspace_root = workspace_root
self.args = args or []
self.env = env or {}
self.python_path = python_path
self.use_dap = use_dap
self.status = SessionStatus.IDLE
self.created_at = datetime.now()
self.updated_at = datetime.now()
self.last_breakpoint: BreakpointTarget | None = None
self.timings = SessionTimings()
# bdb mode (legacy)
self.process: subprocess.Popen | None = None
self.reader_task = None # Background task for reading subprocess output
# DAP mode (default)
self.dap_wrapper = None # Will be set if use_dap=True
def update_status(self, status: SessionStatus) -> None:
"""Update session status and timestamp."""
self.status = status
self.updated_at = datetime.now()
def update_breakpoint(self, file: str, line: int) -> None:
"""Update last breakpoint information."""
if self.last_breakpoint and self.last_breakpoint.file == file and self.last_breakpoint.line == line:
self.last_breakpoint.hitCount += 1
else:
self.last_breakpoint = BreakpointTarget(file=file, line=line, hitCount=1)
self.updated_at = datetime.now()
def update_timings(self, last_run_ms: float) -> None:
"""Update timing information."""
self.timings.lastRunMs = last_run_ms
self.timings.totalCpuTimeMs += last_run_ms
self.updated_at = datetime.now()
def to_state_response(self) -> SessionStateResponse:
"""Convert to state response model."""
return SessionStateResponse(
status=self.status,
lastBreakpoint=self.last_breakpoint,
timings=self.timings,
)
class SessionManager:
"""Manages multiple debug sessions."""
def __init__(self, workspace_root: Path):
self.workspace_root = workspace_root
self.sessions: dict[str, DebugSession] = {}
def create_session(self, request: StartSessionRequest) -> StartSessionResponse:
"""
Create a new debug session.
Args:
request: Session creation request
Returns:
Response with new session ID
Raises:
ValueError: If entry path is invalid
FileNotFoundError: If entry script doesn't exist
"""
# Validate and resolve entry path
entry_path = resolve_workspace_path(self.workspace_root, request.entry)
if not entry_path.exists():
raise FileNotFoundError(f"Entry script not found: {request.entry}")
if not entry_path.is_file():
raise ValueError(f"Entry path is not a file: {request.entry}")
if entry_path.suffix != ".py":
raise ValueError(f"Entry must be a Python file (.py): {request.entry}")
# Determine workspace root for this session
# If entry is absolute path, find the workspace root from the entry path
session_workspace = self._find_workspace_root(entry_path)
# Validate Python path (required)
python_path = Path(request.pythonPath)
if not python_path.exists():
raise FileNotFoundError(f"Python interpreter not found: {request.pythonPath}")
if not python_path.is_file():
raise ValueError(f"Python path is not a file: {request.pythonPath}")
# Generate session ID
session_id = str(uuid.uuid4())
# Create session with DAP enabled by default
use_dap = request.useDap if request.useDap is not None else True
session = DebugSession(
session_id=session_id,
entry=entry_path,
python_path=request.pythonPath,
workspace_root=session_workspace,
args=request.args or [],
env=request.env or {},
use_dap=use_dap,
)
self.sessions[session_id] = session
return StartSessionResponse(sessionId=session_id)
def _find_workspace_root(self, entry_path: Path) -> Path:
"""
Find the workspace root for a given entry path.
Searches upwards from the entry path for common project markers:
- .git directory
- pyproject.toml
- setup.py
- requirements.txt
Args:
entry_path: Absolute path to the entry script
Returns:
Workspace root path (defaults to entry's parent if no markers found)
"""
current = entry_path.parent
markers = ['.git', 'pyproject.toml', 'setup.py', 'requirements.txt', '.venv', 'venv']
# Search up to 10 levels
for _ in range(10):
for marker in markers:
if (current / marker).exists():
return current
parent = current.parent
if parent == current: # Reached root
break
current = parent
# Default to entry file's directory
return entry_path.parent
def get_session(self, session_id: str) -> DebugSession:
"""
Get session by ID.
Args:
session_id: Session ID
Returns:
Debug session
Raises:
KeyError: If session not found
"""
if session_id not in self.sessions:
raise KeyError(f"Session not found: {session_id}")
return self.sessions[session_id]
def get_state(self, session_id: str) -> SessionStateResponse:
"""
Get current session state.
Args:
session_id: Session ID
Returns:
Current state response
"""
session = self.get_session(session_id)
return session.to_state_response()
def end_session(self, session_id: str) -> EndSessionResponse:
"""
End a debug session and clean up resources.
Args:
session_id: Session ID
Returns:
End confirmation response
"""
session = self.get_session(session_id)
# Clean up DAP wrapper if using DAP
if session.dap_wrapper:
try:
session.dap_wrapper.terminate()
except Exception:
pass
# Clean up subprocess if running (bdb mode)
if session.process and session.process.poll() is None:
# Try graceful termination first
try:
terminate_cmd = json.dumps({"command": "terminate"}) + '\n'
session.process.stdin.write(terminate_cmd)
session.process.stdin.flush()
session.process.wait(timeout=5)
except Exception:
pass
# Force terminate if still running
if session.process.poll() is None:
session.process.terminate()
try:
session.process.wait(timeout=5)
except subprocess.TimeoutExpired:
session.process.kill()
session.process.wait()
# Update status
session.update_status(SessionStatus.COMPLETED)
# Remove from active sessions
del self.sessions[session_id]
return EndSessionResponse(ended=True)
def run_to_breakpoint(
self, session_id: str, request: BreakpointRequest, timeout: float | None = None
) -> BreakpointResponse:
"""
Run session to a breakpoint and capture locals.
Args:
session_id: Session ID
request: Breakpoint request with file and line
timeout: Optional timeout in seconds (defaults to DEFAULT_TIMEOUT_SECONDS)
Returns:
Breakpoint response with locals
Raises:
KeyError: If session not found
ValueError: If file/line is invalid
"""
session = self.get_session(session_id)
# Validate breakpoint location using session's workspace root
breakpoint_path = resolve_workspace_path(session.workspace_root, request.file)
validate_file_and_line(breakpoint_path, request.line)
# Use default timeout if not specified
if timeout is None:
timeout = DEFAULT_TIMEOUT_SECONDS
# Use DAP if enabled (default)
if session.use_dap:
return self._run_to_breakpoint_dap(session, breakpoint_path, request.line, timeout)
else:
return self._run_to_breakpoint_bdb(session, breakpoint_path, request.line, timeout)
def _run_to_breakpoint_dap(
self, session: DebugSession, breakpoint_path: Path, line: int, timeout: float
) -> BreakpointResponse:
"""Run to breakpoint using DAP (debugpy)."""
from .dap_wrapper import DAPSyncWrapper
# Initialize DAP wrapper if not already done
if not session.dap_wrapper:
try:
# Create and initialize DAP wrapper
# python_path is guaranteed to be set (required field)
session.dap_wrapper = DAPSyncWrapper()
session.dap_wrapper.initialize_and_launch(
python_path=session.python_path,
script_path=session.entry,
args=session.args,
env=session.env,
cwd=session.workspace_root,
)
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=f"Failed to initialize DAP: {e}",
),
)
# Run to breakpoint
session.update_status(SessionStatus.RUNNING)
with Timer() as timer:
try:
response = session.dap_wrapper.run_to_breakpoint(
str(breakpoint_path),
line,
timeout=timeout,
)
# Update session state
if response.hit:
session.update_breakpoint(str(breakpoint_path), line)
session.update_status(SessionStatus.PAUSED)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def _run_to_breakpoint_bdb(
self, session: DebugSession, breakpoint_path: Path, line: int, timeout: float
) -> BreakpointResponse:
"""Run to breakpoint using bdb (legacy)."""
# Start subprocess if not already running
if not session.process:
# Use the specified Python interpreter (required field)
python_executable = session.python_path
# Path to runner script
runner_script = Path(__file__).parent / "runner_main.py"
# Prepare environment with PYTHONPATH
env = os.environ.copy()
# Add Debug-MCP's src directory to PYTHONPATH so runner can import mcp_debug_tool
debug_mcp_src = str(Path(__file__).parent.parent.resolve())
if 'PYTHONPATH' in env:
env['PYTHONPATH'] = f"{debug_mcp_src}{os.pathsep}{env['PYTHONPATH']}"
else:
env['PYTHONPATH'] = debug_mcp_src
# Start subprocess with specified Python interpreter
# Use session's workspace root instead of manager's workspace root
process = subprocess.Popen(
[python_executable, str(runner_script), str(session.workspace_root)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
env=env, # Use modified environment
)
session.process = process
# Update status
session.update_status(SessionStatus.RUNNING)
# Send run_to_breakpoint command
with Timer() as timer:
try:
command = {
"command": "run_to_breakpoint",
"script_path": str(session.entry),
"file": str(breakpoint_path),
"line": line,
"args": session.args,
"env": session.env,
}
# Send command as JSON line
command_json = json.dumps(command) + '\n'
session.process.stdin.write(command_json)
session.process.stdin.flush()
# Read response with timeout
import select
# Wait for output with timeout
ready, _, _ = select.select([session.process.stdout], [], [], timeout)
if ready:
response_line = session.process.stdout.readline()
if not response_line:
# Process terminated
session.update_status(SessionStatus.ERROR)
stderr_output = session.process.stderr.read()
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="ProcessError",
message=f"Runner process terminated unexpectedly. stderr: {stderr_output[:500]}",
),
)
response_data = json.loads(response_line)
if response_data["status"] == "error":
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="ExecutionError",
message=response_data["message"],
),
)
# Parse response
data = response_data["data"]
response = BreakpointResponse(**data)
# Update session state
if response.hit:
session.update_breakpoint(str(breakpoint_path), line)
session.update_status(SessionStatus.PAUSED)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
else:
# Timeout
session.update_status(SessionStatus.ERROR)
debug_msg = f"DEBUG: timeout waiting for runner response. poll={session.process.poll() if session.process else 'N/A'}"
print(debug_msg, file=sys.stderr)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="TimeoutError",
message=f"Execution timed out after {timeout}s",
),
)
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def continue_execution(
self, session_id: str, request: BreakpointRequest
) -> BreakpointResponse:
"""
Continue execution to next breakpoint within same session.
Args:
session_id: Session ID
request: Breakpoint request with file and line
Returns:
Breakpoint response with locals at next breakpoint
Raises:
KeyError: If session not found
ValueError: If file/line is invalid
"""
session = self.get_session(session_id)
# Validate breakpoint location using session's workspace root
breakpoint_path = resolve_workspace_path(session.workspace_root, request.file)
validate_file_and_line(breakpoint_path, request.line)
# Session must be paused (from previous breakpoint)
if session.status != SessionStatus.PAUSED:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InvalidStateError",
message=f"Cannot continue from state: {session.status}",
),
)
# Use DAP if enabled (default)
if session.use_dap:
return self._continue_execution_dap(session, breakpoint_path, request.line)
else:
return self._continue_execution_bdb(session, breakpoint_path, request.line)
def _continue_execution_dap(
self, session: DebugSession, breakpoint_path: Path, line: int
) -> BreakpointResponse:
"""Continue execution using DAP."""
if not session.dap_wrapper:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="SessionError",
message="DAP session not initialized",
),
)
session.update_status(SessionStatus.RUNNING)
with Timer() as timer:
try:
# For DAP, continue to next breakpoint uses run_to_breakpoint
# (DAP manages the execution state internally)
response = session.dap_wrapper.run_to_breakpoint(
str(breakpoint_path),
line,
timeout=DEFAULT_TIMEOUT_SECONDS,
)
# Update session state
if response.hit:
session.update_breakpoint(str(breakpoint_path), line)
session.update_status(SessionStatus.PAUSED)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def _continue_execution_bdb(
self, session: DebugSession, breakpoint_path: Path, line: int
) -> BreakpointResponse:
"""Continue execution using bdb (legacy)."""
# Process must be running
if not session.process or session.process.poll() is not None:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="SessionError",
message="Debug process is not running",
),
)
# Update status
session.update_status(SessionStatus.RUNNING)
# Update status
session.update_status(SessionStatus.RUNNING)
# Send continue command with next breakpoint
with Timer() as timer:
try:
command = {
"command": "continue",
"file": str(breakpoint_path),
"line": line,
}
# Send command as JSON line
command_json = json.dumps(command) + '\n'
session.process.stdin.write(command_json)
session.process.stdin.flush()
# Read response with timeout
import select
ready, _, _ = select.select([session.process.stdout], [], [], DEFAULT_TIMEOUT_SECONDS)
if ready:
response_line = session.process.stdout.readline()
if not response_line:
session.update_status(SessionStatus.ERROR)
stderr_output = session.process.stderr.read()
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="ProcessError",
message=f"Runner process terminated. stderr: {stderr_output[:500]}",
),
)
response_data = json.loads(response_line)
if response_data["status"] == "error":
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="ExecutionError",
message=response_data["message"],
),
)
# Parse response
data = response_data["data"]
response = BreakpointResponse(**data)
# Update session state
if response.hit:
session.update_breakpoint(str(breakpoint_path), line)
session.update_status(SessionStatus.PAUSED)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
else:
# Timeout
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="TimeoutError",
message=f"Execution timed out after {DEFAULT_TIMEOUT_SECONDS}s",
),
)
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def step_in(self, session_id: str) -> BreakpointResponse:
"""
Step into the next function call (DAP only).
Args:
session_id: Session ID
Returns:
BreakpointResponse with new location and variables
Raises:
KeyError: If session not found
ValueError: If session is not using DAP
"""
session = self.get_session(session_id)
if not session.use_dap:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="NotImplementedError",
message="Step operations require DAP integration (set useDap=true when creating session)",
),
)
if not session.dap_wrapper:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="SessionError",
message="DAP session not initialized",
),
)
if session.status != SessionStatus.PAUSED:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InvalidStateError",
message=f"Cannot step from state: {session.status}. Must be PAUSED.",
),
)
session.update_status(SessionStatus.RUNNING)
with Timer() as timer:
try:
response = session.dap_wrapper.step_in(timeout=DEFAULT_TIMEOUT_SECONDS)
# Update session state
if response.hit:
session.update_status(SessionStatus.PAUSED)
if response.frameInfo:
session.update_breakpoint(response.frameInfo.file, response.frameInfo.line)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def step_over(self, session_id: str) -> BreakpointResponse:
"""
Step over the current line (DAP only).
Args:
session_id: Session ID
Returns:
BreakpointResponse with new location and variables
Raises:
KeyError: If session not found
ValueError: If session is not using DAP
"""
session = self.get_session(session_id)
if not session.use_dap:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="NotImplementedError",
message="Step operations require DAP integration (set useDap=true when creating session)",
),
)
if not session.dap_wrapper:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="SessionError",
message="DAP session not initialized",
),
)
if session.status != SessionStatus.PAUSED:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InvalidStateError",
message=f"Cannot step from state: {session.status}. Must be PAUSED.",
),
)
session.update_status(SessionStatus.RUNNING)
with Timer() as timer:
try:
response = session.dap_wrapper.step_over(timeout=DEFAULT_TIMEOUT_SECONDS)
# Update session state
if response.hit:
session.update_status(SessionStatus.PAUSED)
if response.frameInfo:
session.update_breakpoint(response.frameInfo.file, response.frameInfo.line)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def step_out(self, session_id: str) -> BreakpointResponse:
"""
Step out of the current function (DAP only).
Args:
session_id: Session ID
Returns:
BreakpointResponse with new location and variables
Raises:
KeyError: If session not found
ValueError: If session is not using DAP
"""
session = self.get_session(session_id)
if not session.use_dap:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="NotImplementedError",
message="Step operations require DAP integration (set useDap=true when creating session)",
),
)
if not session.dap_wrapper:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="SessionError",
message="DAP session not initialized",
),
)
if session.status != SessionStatus.PAUSED:
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type="InvalidStateError",
message=f"Cannot step from state: {session.status}. Must be PAUSED.",
),
)
session.update_status(SessionStatus.RUNNING)
with Timer() as timer:
try:
response = session.dap_wrapper.step_out(timeout=DEFAULT_TIMEOUT_SECONDS)
# Update session state
if response.hit:
session.update_status(SessionStatus.PAUSED)
if response.frameInfo:
session.update_breakpoint(response.frameInfo.file, response.frameInfo.line)
elif response.completed:
session.update_status(SessionStatus.COMPLETED)
elif response.error:
session.update_status(SessionStatus.ERROR)
session.update_timings(timer.elapsed_ms)
return response
except Exception as e:
session.update_status(SessionStatus.ERROR)
return BreakpointResponse(
hit=False,
completed=False,
error=ExecutionError(
type=type(e).__name__,
message=str(e),
),
)
def cleanup_all(self) -> None:
"""Clean up all active sessions."""
session_ids = list(self.sessions.keys())
for session_id in session_ids:
with contextlib.suppress(Exception):
self.end_session(session_id)
# ========================================
# Async Wrappers for MCP SDK Integration
# ========================================
async def create_session_async(self, request: StartSessionRequest) -> StartSessionResponse:
"""
Async wrapper for create_session.
Runs the synchronous create_session in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.create_session, request)
async def get_state_async(self, session_id: str) -> SessionStateResponse:
"""
Async wrapper for get_state.
Runs the synchronous get_state in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.get_state, session_id)
async def end_session_async(self, session_id: str) -> EndSessionResponse:
"""
Async wrapper for end_session.
Runs the synchronous end_session in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.end_session, session_id)
async def run_to_breakpoint_async(
self, session_id: str, request: BreakpointRequest
) -> BreakpointResponse:
"""
Async wrapper for run_to_breakpoint.
Runs the synchronous run_to_breakpoint in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.run_to_breakpoint, session_id, request)
async def continue_execution_async(
self, session_id: str, request: BreakpointRequest
) -> BreakpointResponse:
"""
Async wrapper for continue_execution.
Runs the synchronous continue_execution in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.continue_execution, session_id, request)
async def step_in_async(self, session_id: str) -> BreakpointResponse:
"""
Async wrapper for step_in.
Runs the synchronous step_in in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.step_in, session_id)
async def step_over_async(self, session_id: str) -> BreakpointResponse:
"""
Async wrapper for step_over.
Runs the synchronous step_over in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.step_over, session_id)
async def step_out_async(self, session_id: str) -> BreakpointResponse:
"""
Async wrapper for step_out.
Runs the synchronous step_out in a thread pool to avoid blocking.
"""
return await asyncio.to_thread(self.step_out, session_id)