Skip to main content
Glama
server.py26.5 kB
""" MCP SDK-based Server Implementation (v2) This is the new implementation using the official MCP Python SDK. It replaces the custom JSON-RPC implementation in server.py while maintaining all functionality and tool definitions. Tools Implemented: - sessions_create: Create a new debug session - sessions_breakpoint: Run to breakpoint and capture locals - sessions_continue: Continue execution to next breakpoint - sessions_state: Get current session state - sessions_end: End a debug session """ import asyncio import json import logging import sys from pathlib import Path from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool from .schemas import ( BreakpointRequest, ContinueRequest, StartSessionRequest, ) # Internal imports from .sessions import SessionManager logger = logging.getLogger(__name__) class MCPServerV2: """MCP SDK-based server for Python debugging""" def __init__(self, workspace_root: Path | None = None): """ Initialize the SDK-based server. Args: workspace_root: Path to workspace root directory """ self.workspace_root = workspace_root or Path.cwd() self.server = Server("python-debug") self.session_manager = SessionManager(self.workspace_root) self._setup_handlers() def _setup_handlers(self): """Register all tool handlers with the SDK server""" @self.server.list_tools() async def list_tools() -> list[Tool]: """List all available debugging tools""" return [ Tool( name="sessions_create", description="Create a new debug session for a Python script", inputSchema={ "type": "object", "properties": { "entry": { "type": "string", "description": "Project-relative path to Python script", }, "pythonPath": { "type": "string", "description": "Absolute path to Python interpreter (must have debugpy installed)", }, "args": { "type": "array", "items": {"type": "string"}, "description": "Command-line arguments for the script", }, "env": { "type": "object", "additionalProperties": {"type": "string"}, "description": "Environment variables", }, }, "required": ["entry", "pythonPath"], }, ), Tool( name="sessions_breakpoint", description="Run to a breakpoint and capture local variables", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, "file": { "type": "string", "description": "Project-relative file path", }, "line": { "type": "integer", "description": "Line number (1-based)", "minimum": 1, }, }, "required": ["sessionId", "file", "line"], }, ), Tool( name="sessions_continue", description="Continue execution to the next breakpoint", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, "file": { "type": "string", "description": "Project-relative file path", }, "line": { "type": "integer", "description": "Line number (1-based)", "minimum": 1, }, }, "required": ["sessionId", "file", "line"], }, ), Tool( name="sessions_state", description="Get the current state of a debug session", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, }, "required": ["sessionId"], }, ), Tool( name="sessions_end", description="End a debug session and clean up resources", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, }, "required": ["sessionId"], }, ), Tool( name="sessions_step_in", description="Step into the next function call (requires active breakpoint)", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, }, "required": ["sessionId"], }, ), Tool( name="sessions_step_over", description="Step over the current line (requires active breakpoint)", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, }, "required": ["sessionId"], }, ), Tool( name="sessions_step_out", description="Step out of the current function (requires active breakpoint)", inputSchema={ "type": "object", "properties": { "sessionId": { "type": "string", "description": "The debug session ID", }, }, "required": ["sessionId"], }, ), ] @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Main tool dispatcher - routes to appropriate handler""" try: if name == "sessions_create": return await self._handle_sessions_create(arguments) elif name == "sessions_breakpoint": return await self._handle_sessions_breakpoint(arguments) elif name == "sessions_continue": return await self._handle_sessions_continue(arguments) elif name == "sessions_state": return await self._handle_sessions_state(arguments) elif name == "sessions_end": return await self._handle_sessions_end(arguments) elif name == "sessions_step_in": return await self._handle_sessions_step_in(arguments) elif name == "sessions_step_over": return await self._handle_sessions_step_over(arguments) elif name == "sessions_step_out": return await self._handle_sessions_step_out(arguments) else: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "UnknownTool", "message": f"Unknown tool: {name}", } }), ) ] except Exception as e: logger.exception(f"Unexpected error in call_tool for {name}") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_create(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_create tool. Creates a new debug session for a Python script. """ try: request = StartSessionRequest(**arguments) response = await self.session_manager.create_session_async(request) return [ TextContent( type="text", text=json.dumps({ "sessionId": response.sessionId, }), ) ] except FileNotFoundError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "FileNotFoundError", "message": str(e), } }), ) ] except ValueError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error creating session") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_breakpoint(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_breakpoint tool. Runs to a breakpoint and captures local variables. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] request = BreakpointRequest( file=arguments["file"], line=arguments["line"], ) response = await self.session_manager.run_to_breakpoint_async( session_id, request ) # Convert response to dict result = response.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error at breakpoint") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_continue(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_continue tool. Continues execution to the next breakpoint. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] request = ContinueRequest( file=arguments["file"], line=arguments["line"], ) # Convert to BreakpointRequest (they have same structure) bp_request = BreakpointRequest( file=request.file, line=request.line, ) response = await self.session_manager.continue_execution_async( session_id, bp_request ) result = response.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error continuing execution") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_state(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_state tool. Gets the current state of a debug session. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] state = await self.session_manager.get_state_async(session_id) result = state.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error getting session state") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_end(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_end tool. Ends a debug session and cleans up resources. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] response = await self.session_manager.end_session_async(session_id) result = response.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error ending session") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_step_in(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_step_in tool. Steps into the next function call. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] response = await self.session_manager.step_in_async(session_id) result = response.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error in step_in") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_step_over(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_step_over tool. Steps over the current line. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] response = await self.session_manager.step_over_async(session_id) result = response.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error in step_over") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def _handle_sessions_step_out(self, arguments: dict) -> list[TextContent]: """ Handler for sessions_step_out tool. Steps out of the current function. """ try: session_id = arguments.get("sessionId") if not session_id: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "ValueError", "message": "sessionId is required", } }), ) ] response = await self.session_manager.step_out_async(session_id) result = response.model_dump() return [ TextContent( type="text", text=json.dumps(result), ) ] except KeyError as e: return [ TextContent( type="text", text=json.dumps({ "error": { "type": "SessionNotFound", "message": str(e), } }), ) ] except Exception as e: logger.exception("Error in step_out") return [ TextContent( type="text", text=json.dumps({ "error": { "type": type(e).__name__, "message": str(e), } }), ) ] async def run(self): """Run the server with stdio transport""" async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) def parse_args(): """Parse command-line arguments""" import argparse parser = argparse.ArgumentParser( description="MCP Debug Tool - Python debugging via Model Context Protocol" ) parser.add_argument( "--workspace", type=Path, default=Path.cwd(), help="Workspace root directory (default: current directory)", ) parser.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], default="INFO", help="Logging level (default: INFO)", ) return parser.parse_args() async def async_main(): """Async entry point for the SDK-based server""" args = parse_args() # Configure logging logging.basicConfig( level=getattr(logging, args.log_level), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stderr, # Log to stderr to keep stdout clean for MCP protocol ) logger.info("Starting MCP Debug Server v2 (SDK-based)") logger.info(f"Workspace: {args.workspace}") server = MCPServerV2(workspace_root=args.workspace) await server.run() def main(): """Synchronous entry point that runs the async main""" asyncio.run(async_main()) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kaina3/Debug-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server