server.py•26.5 kB
"""
MCP SDK-based Server Implementation (v2)
This is the new implementation using the official MCP Python SDK.
It replaces the custom JSON-RPC implementation in server.py while maintaining
all functionality and tool definitions.
Tools Implemented:
- sessions_create: Create a new debug session
- sessions_breakpoint: Run to breakpoint and capture locals
- sessions_continue: Continue execution to next breakpoint
- sessions_state: Get current session state
- sessions_end: End a debug session
"""
import asyncio
import json
import logging
import sys
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from .schemas import (
BreakpointRequest,
ContinueRequest,
StartSessionRequest,
)
# Internal imports
from .sessions import SessionManager
logger = logging.getLogger(__name__)
class MCPServerV2:
"""MCP SDK-based server for Python debugging"""
def __init__(self, workspace_root: Path | None = None):
"""
Initialize the SDK-based server.
Args:
workspace_root: Path to workspace root directory
"""
self.workspace_root = workspace_root or Path.cwd()
self.server = Server("python-debug")
self.session_manager = SessionManager(self.workspace_root)
self._setup_handlers()
def _setup_handlers(self):
"""Register all tool handlers with the SDK server"""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""List all available debugging tools"""
return [
Tool(
name="sessions_create",
description="Create a new debug session for a Python script",
inputSchema={
"type": "object",
"properties": {
"entry": {
"type": "string",
"description": "Project-relative path to Python script",
},
"pythonPath": {
"type": "string",
"description": "Absolute path to Python interpreter (must have debugpy installed)",
},
"args": {
"type": "array",
"items": {"type": "string"},
"description": "Command-line arguments for the script",
},
"env": {
"type": "object",
"additionalProperties": {"type": "string"},
"description": "Environment variables",
},
},
"required": ["entry", "pythonPath"],
},
),
Tool(
name="sessions_breakpoint",
description="Run to a breakpoint and capture local variables",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
"file": {
"type": "string",
"description": "Project-relative file path",
},
"line": {
"type": "integer",
"description": "Line number (1-based)",
"minimum": 1,
},
},
"required": ["sessionId", "file", "line"],
},
),
Tool(
name="sessions_continue",
description="Continue execution to the next breakpoint",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
"file": {
"type": "string",
"description": "Project-relative file path",
},
"line": {
"type": "integer",
"description": "Line number (1-based)",
"minimum": 1,
},
},
"required": ["sessionId", "file", "line"],
},
),
Tool(
name="sessions_state",
description="Get the current state of a debug session",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
},
"required": ["sessionId"],
},
),
Tool(
name="sessions_end",
description="End a debug session and clean up resources",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
},
"required": ["sessionId"],
},
),
Tool(
name="sessions_step_in",
description="Step into the next function call (requires active breakpoint)",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
},
"required": ["sessionId"],
},
),
Tool(
name="sessions_step_over",
description="Step over the current line (requires active breakpoint)",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
},
"required": ["sessionId"],
},
),
Tool(
name="sessions_step_out",
description="Step out of the current function (requires active breakpoint)",
inputSchema={
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description": "The debug session ID",
},
},
"required": ["sessionId"],
},
),
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Main tool dispatcher - routes to appropriate handler"""
try:
if name == "sessions_create":
return await self._handle_sessions_create(arguments)
elif name == "sessions_breakpoint":
return await self._handle_sessions_breakpoint(arguments)
elif name == "sessions_continue":
return await self._handle_sessions_continue(arguments)
elif name == "sessions_state":
return await self._handle_sessions_state(arguments)
elif name == "sessions_end":
return await self._handle_sessions_end(arguments)
elif name == "sessions_step_in":
return await self._handle_sessions_step_in(arguments)
elif name == "sessions_step_over":
return await self._handle_sessions_step_over(arguments)
elif name == "sessions_step_out":
return await self._handle_sessions_step_out(arguments)
else:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "UnknownTool",
"message": f"Unknown tool: {name}",
}
}),
)
]
except Exception as e:
logger.exception(f"Unexpected error in call_tool for {name}")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_create(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_create tool.
Creates a new debug session for a Python script.
"""
try:
request = StartSessionRequest(**arguments)
response = await self.session_manager.create_session_async(request)
return [
TextContent(
type="text",
text=json.dumps({
"sessionId": response.sessionId,
}),
)
]
except FileNotFoundError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "FileNotFoundError",
"message": str(e),
}
}),
)
]
except ValueError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error creating session")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_breakpoint(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_breakpoint tool.
Runs to a breakpoint and captures local variables.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
request = BreakpointRequest(
file=arguments["file"],
line=arguments["line"],
)
response = await self.session_manager.run_to_breakpoint_async(
session_id, request
)
# Convert response to dict
result = response.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error at breakpoint")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_continue(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_continue tool.
Continues execution to the next breakpoint.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
request = ContinueRequest(
file=arguments["file"],
line=arguments["line"],
)
# Convert to BreakpointRequest (they have same structure)
bp_request = BreakpointRequest(
file=request.file,
line=request.line,
)
response = await self.session_manager.continue_execution_async(
session_id, bp_request
)
result = response.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error continuing execution")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_state(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_state tool.
Gets the current state of a debug session.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
state = await self.session_manager.get_state_async(session_id)
result = state.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error getting session state")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_end(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_end tool.
Ends a debug session and cleans up resources.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
response = await self.session_manager.end_session_async(session_id)
result = response.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error ending session")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_step_in(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_step_in tool.
Steps into the next function call.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
response = await self.session_manager.step_in_async(session_id)
result = response.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error in step_in")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_step_over(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_step_over tool.
Steps over the current line.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
response = await self.session_manager.step_over_async(session_id)
result = response.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error in step_over")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def _handle_sessions_step_out(self, arguments: dict) -> list[TextContent]:
"""
Handler for sessions_step_out tool.
Steps out of the current function.
"""
try:
session_id = arguments.get("sessionId")
if not session_id:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "ValueError",
"message": "sessionId is required",
}
}),
)
]
response = await self.session_manager.step_out_async(session_id)
result = response.model_dump()
return [
TextContent(
type="text",
text=json.dumps(result),
)
]
except KeyError as e:
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": "SessionNotFound",
"message": str(e),
}
}),
)
]
except Exception as e:
logger.exception("Error in step_out")
return [
TextContent(
type="text",
text=json.dumps({
"error": {
"type": type(e).__name__,
"message": str(e),
}
}),
)
]
async def run(self):
"""Run the server with stdio transport"""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
def parse_args():
"""Parse command-line arguments"""
import argparse
parser = argparse.ArgumentParser(
description="MCP Debug Tool - Python debugging via Model Context Protocol"
)
parser.add_argument(
"--workspace",
type=Path,
default=Path.cwd(),
help="Workspace root directory (default: current directory)",
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="Logging level (default: INFO)",
)
return parser.parse_args()
async def async_main():
"""Async entry point for the SDK-based server"""
args = parse_args()
# Configure logging
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr, # Log to stderr to keep stdout clean for MCP protocol
)
logger.info("Starting MCP Debug Server v2 (SDK-based)")
logger.info(f"Workspace: {args.workspace}")
server = MCPServerV2(workspace_root=args.workspace)
await server.run()
def main():
"""Synchronous entry point that runs the async main"""
asyncio.run(async_main())
if __name__ == "__main__":
main()