Skip to main content
Glama
debug_api.py29 kB
"""Magic-API 调试相关 MCP 工具。 此模块提供强大的调试功能,支持: - 断点设置和管理 - 单步执行控制 - 变量检查和状态监控 - 调试会话管理 - WebSocket连接状态监控 - 异步断点调试和超时处理 - 断点状态轮询 - 会话ID管理 主要工具: - call_magic_api_with_debug: 异步调用API并监听断点,返回会话ID - get_latest_breakpoint_status: 获取最新断点状态 - resume_from_breakpoint: 恢复断点执行 - step_over_breakpoint: 单步执行,越过当前断点 - step_into_breakpoint: 步入当前断点 - step_out_breakpoint: 步出当前函数 - set_breakpoint: 在指定行号设置断点 - remove_breakpoint: 移除指定断点 - list_breakpoints: 列出所有断点 - execute_debug_session: 执行完整的调试会话 - get_debug_status: 获取当前调试状态 - inspect_ws_environments: 检查WebSocket环境 - get_websocket_status: 获取WebSocket连接状态 """ from __future__ import annotations import asyncio import json import time import uuid from typing import TYPE_CHECKING, Annotated, Any, Dict, List, Optional, Union from pydantic import Field from magicapi_tools.logging_config import get_logger from magicapi_tools.utils import error_response from magicapi_tools.ws import IDEEnvironment, MessageType, OpenFileContext from magicapi_tools.ws.debug_service import WebSocketDebugService from magicapi_tools.ws.observers import MCPObserver try: # pragma: no cover - 运行环境缺失 fastmcp 时回退 Any from fastmcp import Context, FastMCP except ImportError: # pragma: no cover Context = Any # type: ignore[assignment] FastMCP = Any # type: ignore[assignment] if TYPE_CHECKING: from fastmcp import FastMCP from magicapi_mcp.tool_registry import ToolContext logger = get_logger('tools.debug_api') class DebugAPITools: """统一的调试工具模块,整合基础调试和高级断点控制功能。""" def __init__(self): self.timeout_duration = 10.0 # 默认10秒超时 self.debug_sessions = {} # 存储调试会话信息 def register_tools(self, mcp_app: "FastMCP", context: "ToolContext") -> None: # pragma: no cover - 装饰器环境 """注册断点调试相关工具。""" @mcp_app.tool( name="call_magic_api_with_debug", description="异步调用Magic-API接口并监听断点,返回会话ID用于后续操作。支持10秒超时监听,遇到断点时返回断点信息和操作提示。", tags={"api", "call", "debug", "async", "session"}, ) async def call_with_debug( path: Annotated[ str, Field(description="API请求路径,如'/api/users'或'GET /api/users'") ] = '/algorithms/narcissistic/narcissistic-algorithm-v2', method: Annotated[ str, Field(description="HTTP请求方法,如'GET'、'POST'、'PUT'、'DELETE'等") ] = "GET", data: Annotated[ Optional[Union[Any, str]], Field(description="请求体数据,适用于POST/PUT等方法") ] = None, params: Annotated[ Optional[Union[Any, str]], Field(description="URL查询参数") ] = None, breakpoints: Annotated[ Optional[Union[List[int], str]], Field(description="断点行号列表,用于调试,如'[5,10,15]'") ] = [5,6,7], timeout: Annotated[ float, Field(description="超时时间(秒),默认为10秒") ] = 10.0, ctx: "Context" = None, ) -> Dict[str, Any]: """异步调用API并监听断点,返回会话ID用于后续操作。""" # 生成4位会话ID session_id = str(uuid.uuid4())[:4] # 参数清理:将空字符串转换为 None if isinstance(data, str) and data.strip() == "": data = None if isinstance(params, str) and params.strip() == "": params = None if isinstance(breakpoints, str) and breakpoints.strip() == "": breakpoints = None # 初始化会话信息 self.debug_sessions[session_id] = { "status": "starting", "path": path, "method": method, "start_time": time.time(), "timeout": timeout, "breakpoints_hit": [], "current_breakpoint": None, "api_completed": False } observer = MCPObserver(ctx) if ctx else None if observer: context.ws_manager.add_observer(observer) try: if ctx: await ctx.info("🧪 启动异步调试会话", extra={"session_id": session_id, "path": path, "method": method}) await ctx.report_progress(progress=0, total=100) # 异步调用API并监听断点 result = await self._async_debug_call( context, session_id, path, method, data, params, breakpoints, timeout, ctx ) if ctx: await ctx.report_progress(progress=100, total=100) return result except Exception as e: logger.error(f"异步调试调用失败: {e}") self.debug_sessions[session_id]["status"] = "error" self.debug_sessions[session_id]["error"] = str(e) return error_response("async_debug_error", f"异步调试调用失败: {str(e)}", {"session_id": session_id}) finally: if observer: await asyncio.sleep(context.settings.ws_log_capture_window) context.ws_manager.remove_observer(observer) @mcp_app.tool( name="get_latest_breakpoint_status", description="获取最新的断点调试状态,用于轮询断点执行情况。需要传入会话ID。", tags={"debug", "breakpoint", "status", "polling"}, ) def get_breakpoint_status( session_id: Annotated[ str, Field(description="调试会话ID,由call_magic_api_with_debug返回") ] ) -> Dict[str, Any]: """获取指定会话的最新断点调试状态。""" try: # 检查会话是否存在 if session_id not in self.debug_sessions: return error_response("session_not_found", f"调试会话 {session_id} 不存在") session = self.debug_sessions[session_id] context.ws_manager.ensure_running_sync() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 获取调试状态 status = debug_service.get_debug_status_tool() if status.get("success"): # 增加会话信息 status["session_id"] = session_id status["session_info"] = session status["is_breakpoint_status"] = True status["timestamp"] = time.time() # 检查是否有断点 breakpoints = status.get("status", {}).get("breakpoints", []) if breakpoints: session["current_breakpoint"] = breakpoints[0] if breakpoints else None session["status"] = "breakpoint_hit" status["available_actions"] = [ "resume_from_breakpoint", "step_over_breakpoint", "step_into_breakpoint", "step_out_breakpoint" ] status["message"] = "遇到断点,可以选择恢复执行或单步调试" elif session["api_completed"]: session["status"] = "completed" status["message"] = "断点调试结束,API返回完成" else: session["status"] = "running" status["message"] = "API正在执行中,请继续轮询" return status else: return error_response("status_check_failed", "获取断点状态失败", status.get("error")) except Exception as e: logger.error(f"获取断点状态时出错: {e}") return error_response("status_check_error", f"获取断点状态时出错: {str(e)}") @mcp_app.tool( name="resume_from_breakpoint", description="从当前断点恢复执行,继续10秒超时监听。需要传入会话ID。", tags={"debug", "breakpoint", "resume"}, ) async def resume_breakpoint( session_id: Annotated[ str, Field(description="调试会话ID") ] ) -> Dict[str, Any]: """从当前断点恢复执行,继续监听。""" try: # 检查会话是否存在 if session_id not in self.debug_sessions: return error_response("session_not_found", f"调试会话 {session_id} 不存在") session = self.debug_sessions[session_id] await context.ws_manager.ensure_running() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 执行恢复操作 result = await debug_service.resume_breakpoint_tool() if result.get("success"): session["status"] = "resumed" # 继续监听10秒 monitor_result = await self._monitor_breakpoint_with_timeout(context, session_id, 10.0) result.update(monitor_result) return result except Exception as e: logger.error(f"恢复断点执行时出错: {e}") return error_response("resume_error", f"恢复断点执行时出错: {str(e)}") @mcp_app.tool( name="step_over_breakpoint", description="单步执行,跳过当前断点,继续10秒超时监听。需要传入会话ID。", tags={"debug", "breakpoint", "step", "over"}, ) async def step_over( session_id: Annotated[ str, Field(description="调试会话ID") ] ) -> Dict[str, Any]: """单步执行,跳过当前断点,继续监听。""" try: # 检查会话是否存在 if session_id not in self.debug_sessions: return error_response("session_not_found", f"调试会话 {session_id} 不存在") session = self.debug_sessions[session_id] await context.ws_manager.ensure_running() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 执行单步跳过操作 result = await debug_service.step_over_tool() if result.get("success"): session["status"] = "stepped_over" # 继续监听10秒 monitor_result = await self._monitor_breakpoint_with_timeout(context, session_id, 10.0) result.update(monitor_result) return result except Exception as e: logger.error(f"单步跳过断点时出错: {e}") return error_response("step_over_error", f"单步跳过断点时出错: {str(e)}") @mcp_app.tool( name="step_into_breakpoint", description="步入当前断点(进入函数/方法内部),继续10秒超时监听。需要传入会话ID。", tags={"debug", "breakpoint", "step", "into"}, ) async def step_into( session_id: Annotated[ str, Field(description="调试会话ID") ] ) -> Dict[str, Any]: """步入当前断点(进入函数/方法内部),继续监听。""" try: # 检查会话是否存在 if session_id not in self.debug_sessions: return error_response("session_not_found", f"调试会话 {session_id} 不存在") session = self.debug_sessions[session_id] await context.ws_manager.ensure_running() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 发送步入指令 (step type 2) script_id = debug_service._current_script_id() if not script_id: return error_response("script_id_missing", "无法确定当前调试脚本") await context.ws_manager.send_step_into(script_id, sorted(debug_service.breakpoints)) session["status"] = "stepped_into" # 继续监听10秒 monitor_result = await self._monitor_breakpoint_with_timeout(context, session_id, 10.0) result = {"success": True, "script_id": script_id, "step_type": "into", "session_id": session_id} result.update(monitor_result) return result except Exception as e: logger.error(f"步入断点时出错: {e}") return error_response("step_into_error", f"步入断点时出错: {str(e)}") @mcp_app.tool( name="step_out_breakpoint", description="步出当前函数/方法(执行到当前函数结束),继续10秒超时监听。需要传入会话ID。", tags={"debug", "breakpoint", "step", "out"}, ) async def step_out( session_id: Annotated[ str, Field(description="调试会话ID") ] ) -> Dict[str, Any]: """步出当前函数/方法(执行到当前函数结束),继续监听。""" try: # 检查会话是否存在 if session_id not in self.debug_sessions: return error_response("session_not_found", f"调试会话 {session_id} 不存在") session = self.debug_sessions[session_id] await context.ws_manager.ensure_running() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 发送步出指令 (step type 3) script_id = debug_service._current_script_id() if not script_id: return error_response("script_id_missing", "无法确定当前调试脚本") await context.ws_manager.send_step_out(script_id, sorted(debug_service.breakpoints)) session["status"] = "stepped_out" # 继续监听10秒 monitor_result = await self._monitor_breakpoint_with_timeout(context, session_id, 10.0) result = {"success": True, "script_id": script_id, "step_type": "out", "session_id": session_id} result.update(monitor_result) return result except Exception as e: logger.error(f"步出断点时出错: {e}") return error_response("step_out_error", f"步出断点时出错: {str(e)}") @mcp_app.tool( name="set_breakpoint", description="在指定行号设置断点。", tags={"debug", "breakpoint", "set"}, ) def set_breakpoint( line_number: Annotated[ int, Field(description="要设置断点的行号") ], ) -> Dict[str, Any]: """在指定行号设置断点。""" try: context.ws_manager.ensure_running_sync() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 设置断点 result = debug_service.set_breakpoint_tool(line_number=line_number) return result except Exception as e: logger.error(f"设置断点时出错: {e}") return error_response("set_breakpoint_error", f"设置断点时出错: {str(e)}") @mcp_app.tool( name="remove_breakpoint", description="移除指定行号的断点。", tags={"debug", "breakpoint", "remove"}, ) def remove_breakpoint( line_number: Annotated[ int, Field(description="要移除断点的行号") ], ) -> Dict[str, Any]: """移除指定行号的断点。""" try: context.ws_manager.ensure_running_sync() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 移除断点 result = debug_service.remove_breakpoint_tool(line_number=line_number) return result except Exception as e: logger.error(f"移除断点时出错: {e}") return error_response("remove_breakpoint_error", f"移除断点时出错: {str(e)}") @mcp_app.tool( name="list_breakpoints", description="列出当前所有断点。", tags={"debug", "breakpoint", "list"}, ) def list_breakpoints() -> Dict[str, Any]: """列出当前所有断点。""" try: context.ws_manager.ensure_running_sync() # 获取WebSocket调试服务 debug_service: WebSocketDebugService = context.ws_debug_service # 列出断点 result = debug_service.list_breakpoints_tool() return result except Exception as e: logger.error(f"列出断点时出错: {e}") return error_response("list_breakpoints_error", f"列出断点时出错: {str(e)}") # 从 debug.py 合并过来的工具 @mcp_app.tool( name="execute_debug_session", description="执行完整的调试会话,包括断点设置和状态监控。", tags={"debug", "session", "execution"}, ) def execute_debug_session( script_id: Annotated[ str, Field(description="要调试的脚本文件ID,如'1234567890'") ], breakpoints: Annotated[ str, Field(description="断点配置,JSON数组格式如'[5,10,15]',指定在哪些行设置断点") ] = "[]" ) -> Dict[str, Any]: try: breakpoints_list = json.loads(breakpoints) except json.JSONDecodeError: return error_response("invalid_json", f"breakpoints 格式错误: {breakpoints}") result = context.ws_debug_service.execute_debug_session_tool(script_id, breakpoints_list) return result if "success" in result else error_response(result["error"]["code"], result["error"]["message"]) @mcp_app.tool( name="get_debug_status", description="获取当前调试状态,包括断点信息和连接状态。", tags={"debug", "status", "monitoring"}, ) def get_debug_status() -> Dict[str, Any]: result = context.ws_debug_service.get_debug_status_tool() return result if "success" in result else error_response(result["error"]["code"], result["error"]["message"]) @mcp_app.tool( name="inspect_ws_environments", description="列出当前MCP会话感知到的IDE环境、客户端与打开的文件上下文。", tags={"debug", "status", "websocket"}, ) def inspect_ws_environments() -> Dict[str, Any]: environments = [ _serialize_environment(env) for env in context.ws_manager.state.list_environments() ] return {"success": True, "environments": environments} @mcp_app.tool( name="get_websocket_status", description="检查WebSocket连接状态和配置信息。", tags={"websocket", "status", "connection"}, ) def websocket_status() -> Dict[str, Any]: return { "success": True, "status": "ready", "ws_url": context.settings.ws_url, "base_url": context.settings.base_url, "auth_enabled": context.settings.auth_enabled, "note": "WebSocket连接在需要时自动建立,可通过调试工具进行实时操作", } async def _async_debug_call( self, context, session_id: str, path: str, method: str, data: Any, params: Any, breakpoints: Any, timeout: float, ctx: "Context" = None ) -> Dict[str, Any]: """异步调用API并监听断点。""" try: # 调用API并设置断点 result = await context.ws_debug_service.call_api_with_debug_tool( path=path, method=method, data=data, params=params, breakpoints=breakpoints, ) # 更新会话状态 session = self.debug_sessions[session_id] session["api_call_result"] = result if "success" in result: session["status"] = "api_called" # 监听断点 monitor_result = await self._monitor_breakpoint_with_timeout(context, session_id, timeout) return { "success": True, "session_id": session_id, "message": "异步调试会话已启动", "api_result": result, "monitor_result": monitor_result, "timeout": timeout } else: session["status"] = "api_failed" session["error"] = result.get("error", {}) return error_response( result["error"]["code"], result["error"]["message"], {"session_id": session_id, "api_result": result} ) except Exception as e: logger.error(f"异步调试调用失败: {e}") session = self.debug_sessions[session_id] session["status"] = "error" session["error"] = str(e) return error_response("async_debug_call_error", str(e), {"session_id": session_id}) async def _monitor_breakpoint_with_timeout( self, context, session_id: str, timeout: float ) -> Dict[str, Any]: """在指定超时时间内监听断点。""" start_time = time.time() session = self.debug_sessions[session_id] try: while time.time() - start_time < timeout: # 检查断点状态 debug_service: WebSocketDebugService = context.ws_debug_service status = debug_service.get_debug_status_tool() if status.get("success"): breakpoints = status.get("status", {}).get("breakpoints", []) if breakpoints: # 遇到断点 session["current_breakpoint"] = breakpoints[0] session["status"] = "breakpoint_hit" session["breakpoints_hit"].append({ "breakpoint": breakpoints[0], "timestamp": time.time() }) return { "status": "breakpoint_hit", "breakpoint": breakpoints[0], "message": f"遇到断点在第 {breakpoints[0]} 行,可以选择恢复执行或单步调试", "available_actions": [ "resume_from_breakpoint", "step_over_breakpoint", "step_into_breakpoint", "step_out_breakpoint" ], "session_id": session_id, "elapsed_time": time.time() - start_time } # 检查API是否完成 if self._is_api_completed(status): session["api_completed"] = True session["status"] = "completed" return { "status": "completed", "message": "断点调试结束,API返回完成", "session_id": session_id, "elapsed_time": time.time() - start_time } # 等待一段时间再检查 await asyncio.sleep(0.5) # 超时 session["status"] = "timeout" return { "status": "timeout", "message": f"监听超时 ({timeout}秒),请使用 get_latest_breakpoint_status 查询最新状态", "session_id": session_id, "timeout": timeout, "expected_next_action": "get_latest_breakpoint_status" } except Exception as e: logger.error(f"监听断点时出错: {e}") session["status"] = "monitor_error" session["error"] = str(e) return { "status": "error", "message": f"监听断点时出错: {str(e)}", "session_id": session_id } def _is_api_completed(self, status: Dict[str, Any]) -> bool: """检查API是否已完成。""" # 这里需要根据实际的状态结构来判断 # 可能需要检查是否没有正在执行的请求或者其他标志 return False # 暂时返回false,需要根据实际情况调整 # 从 debug.py 合并过来的辅助函数 async def _emit_ws_notifications(ctx: "Context", logs: List[Dict[str, Any]]) -> None: for entry in logs or []: msg_type = (entry.get("type") or "log").upper() text = entry.get("text") or entry.get("payload") or "" extra = {k: v for k, v in entry.items() if k not in {"text", "payload"}} try: level = MessageType(msg_type) except ValueError: level = MessageType.LOG if level == MessageType.BREAKPOINT: await ctx.warning(text, extra=extra) elif level == MessageType.EXCEPTION: await ctx.error(text, extra=extra) elif level in {MessageType.LOG, MessageType.LOGS}: await ctx.debug(text, extra=extra) else: await ctx.info(text, extra=extra) def _serialize_environment(env: IDEEnvironment) -> Dict[str, Any]: opened = {} for client_id, ctx in env.opened_files.items(): opened[client_id] = _serialize_open_file_context(ctx) return { "ide_key": env.ide_key, "primary_ip": env.primary_ip, "client_ids": sorted(env.client_ids), "latest_user": env.latest_user, "opened_files": opened, "last_active_at": env.last_active_at, } def _serialize_open_file_context(ctx: OpenFileContext) -> Dict[str, Any]: return { "file_id": ctx.file_id, "resolved_at": ctx.resolved_at, "method": ctx.method, "path": ctx.path, "name": ctx.name, "group_chain": ctx.group_chain, "headers": ctx.headers, "last_breakpoint_range": ctx.last_breakpoint_range, "detail": ctx.detail, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dwsy/magic-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server