Skip to main content
Glama
messages.py9.53 kB
"""WebSocket 消息类型与解析工具。""" from __future__ import annotations import json import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, Optional, Tuple from magicapi_tools.logging_config import get_logger _logger = get_logger("ws.messages") class MessageType(str, Enum): """Magic-API WebSocket 消息类型枚举。""" LOG = "LOG" LOGS = "LOGS" BREAKPOINT = "BREAKPOINT" EXCEPTION = "EXCEPTION" LOGIN_RESPONSE = "LOGIN_RESPONSE" REFRESH_TOKEN = "REFRESH_TOKEN" USER_LOGIN = "USER_LOGIN" USER_LOGOUT = "USER_LOGOUT" ONLINE_USERS = "ONLINE_USERS" INTO_FILE_ID = "INTO_FILE_ID" SET_FILE_ID = "SET_FILE_ID" PING = "PING" PONG = "PONG" LOGIN = "LOGIN" SET_BREAKPOINT = "SET_BREAKPOINT" RESUME_BREAKPOINT = "RESUME_BREAKPOINT" SEND_ONLINE = "SEND_ONLINE" UNKNOWN = "UNKNOWN" @classmethod def from_raw(cls, value: str) -> "MessageType": """将原始字符串映射到 `MessageType`。""" normalized = value.strip().upper() return cls.__members__.get(normalized, cls.UNKNOWN) @dataclass(slots=True) class WSMessage: """结构化的 WebSocket 消息。""" type: MessageType raw: str payload: Any = None text: str = "" timestamp: float = field(default_factory=lambda: time.time()) data: Dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: """确保 `timestamp` 与 `text` 有默认值。""" if not self.text and isinstance(self.payload, str): self.text = self.payload def parse_ws_message(raw: str) -> WSMessage: """解析原始 WebSocket 文本消息为 `WSMessage`。""" ts = time.time() stripped = raw.strip() if not stripped: return WSMessage(type=MessageType.UNKNOWN, raw=raw, text="", timestamp=ts) msg_type, remainder = _split_once(stripped) message_type = MessageType.from_raw(msg_type) payload: Any = None text = remainder data: Dict[str, Any] = {} if message_type == MessageType.LOG: payload = remainder text = remainder data["logs"] = [remainder] if remainder else [] elif message_type == MessageType.LOGS: payload = _parse_json_safely(remainder, fallback=remainder) if isinstance(payload, list): data["logs"] = payload text = "\n".join(str(item) for item in payload) else: text = remainder data["logs"] = [payload] elif message_type == MessageType.LOGIN_RESPONSE: status_raw, extra = _split_once(remainder) data["status_raw"] = status_raw try: data["status"] = int(status_raw) except (TypeError, ValueError): pass payload = _parse_json_safely(extra) if extra else None data["user"] = payload if isinstance(payload, dict): client_id = payload.get("clientId") or payload.get("client_id") if client_id: data["client_id"] = str(client_id) if payload: text = f"登录状态: {data.get('status', status_raw)}" else: text = f"登录状态: {status_raw}" elif message_type in {MessageType.USER_LOGIN, MessageType.USER_LOGOUT, MessageType.ONLINE_USERS, MessageType.REFRESH_TOKEN, MessageType.SEND_ONLINE}: payload = _parse_json_safely(remainder, fallback=remainder) data["payload"] = payload if isinstance(payload, dict): client_id = payload.get("clientId") or payload.get("client_id") if client_id: data["client_id"] = str(client_id) text = _format_json_summary(payload) if isinstance(payload, (dict, list)) else remainder elif message_type in {MessageType.SET_FILE_ID, MessageType.INTO_FILE_ID}: file_id, extra = _split_once(remainder) data["file_id"] = file_id payload = _parse_json_safely(extra) if extra else None if payload: data["payload"] = payload client_id = _extract_client_from_payload(payload) if client_id: data["client_id"] = client_id text = f"文件切换 -> {file_id}" else: text = file_id elif message_type == MessageType.BREAKPOINT: # 消息格式: BREAKPOINT,script_id,{json_data} if ',' in remainder: script_id, json_str = remainder.split(',', 1) else: script_id = '未知' json_str = remainder data["script_id"] = script_id payload = _parse_json_safely(json_str, fallback=json_str) data["payload"] = payload # 提取断点信息 line_info = None if isinstance(payload, dict): variables = payload.get('variables', []) range_info = payload.get('range', []) data["variables"] = variables data["range"] = range_info headers = _extract_header_from_variables(variables) if headers: data["headers"] = headers client_from_header = headers.get("magic-request-client-id") if client_from_header: data["client_id"] = str(client_from_header) if not data.get("client_id"): client_id = payload.get("clientId") or payload.get("client_id") if client_id: data["client_id"] = str(client_id) # 从range信息提取行号 [start_line, start_col, end_line, end_col] if isinstance(range_info, (list, tuple)) and len(range_info) >= 1: line_info = range_info[0] text = f"断点命中 @ {script_id}" if line_info is not None: text = f"断点命中 @ {script_id}: 行 {line_info}" elif message_type == MessageType.EXCEPTION: payload = _parse_json_safely(remainder, fallback=remainder) data["payload"] = payload if isinstance(payload, dict): headers = payload.get("headers") if headers: parsed_headers = _parse_object_like(headers) if parsed_headers: data["headers"] = parsed_headers client_from_header = parsed_headers.get("magic-request-client-id") if client_from_header: data["client_id"] = str(client_from_header) message = payload.get("message") or payload.get("msg") text = f"异常: {message}" if message else "异常" else: text = str(payload) elif message_type in {MessageType.PING, MessageType.PONG}: text = message_type.value payload = None else: payload = remainder or None text = remainder return WSMessage( type=message_type, raw=raw, payload=payload, text=text, timestamp=ts, data=data, ) def _split_once(value: str) -> Tuple[str, str]: """按首个逗号拆分字符串。""" if not value: return "", "" if "," not in value: return value, "" head, tail = value.split(",", 1) return head, tail def _parse_json_safely(raw: str, fallback: Any = None) -> Any: """尝试解析 JSON,失败时返回 `fallback`。""" if raw is None: return fallback text = raw.strip() if not text: return fallback try: return json.loads(text) except json.JSONDecodeError: return fallback def _format_json_summary(payload: Any) -> str: """为 JSON 数据生成简要说明,避免输出过长。""" if payload is None: return "" if isinstance(payload, dict): keys = ", ".join(list(payload.keys())[:5]) return f"{len(payload)} 字段: {keys}" if keys else "{}" if isinstance(payload, list): return f"列表,共 {len(payload)} 项" return str(payload) def _extract_client_from_payload(payload: Any) -> Optional[str]: if isinstance(payload, dict): for key in ("clientId", "client_id", "id"): value = payload.get(key) if value: return str(value) return None def _extract_header_from_variables(variables: Any) -> Optional[Dict[str, Any]]: if not isinstance(variables, list): return None for item in variables: if not isinstance(item, dict): continue name = item.get("name") if name and name.lower() == "header": value = item.get("value") headers = _parse_object_like(value) if headers: return headers return None def _parse_object_like(value: Any) -> Optional[Dict[str, Any]]: if isinstance(value, dict): return value if not isinstance(value, str): return None text = value.strip() if not text: return None # 去除外层引号 if text.startswith('"') and text.endswith('"'): try: text = json.loads(text) except json.JSONDecodeError: text = text[1:-1] text = text.encode('utf-8').decode('unicode_escape') if text.startswith('{') and text.endswith('}'): try: return json.loads(text) except json.JSONDecodeError: _logger.debug("解析 header JSON 失败", exc_info=True) return None return None __all__ = ["MessageType", "WSMessage", "parse_ws_message"]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dwsy/magic-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server