Skip to main content
Glama
claude.py10.9 kB
"""Claude Code CLI 事件解析器。 cli-agent-mcp shared/parsers v0.1.0 同步日期: 2025-12-16 将 Claude Code CLI 的 JSON 流事件解析为统一格式。 Claude Code CLI 事件类型: - system/init: 会话初始化(包含 session_id, model, tools, mcp_servers 等) - assistant: 助手消息,content[] 可包含 thinking/text/tool_use - user: 用户消息或工具结果,content[] 可包含 tool_result - result: 会话结束,包含统计信息 """ from __future__ import annotations import json from typing import Any from .base import CLISource, ContentType, OperationType, Status from .unified import ( LifecycleEvent, MessageEvent, OperationEvent, SystemEvent, UnifiedEvent, make_event_id, make_fallback_event, ) __all__ = [ "parse_claude_event", "ClaudeParser", ] class ClaudeParser: """Claude Code CLI 事件解析器。 Claude Code 的 stream-json 格式特点: - 顶层 type: system/assistant/user/result - assistant/user 消息的 content 是数组,可包含多种内容类型 - tool_use 和 tool_result 嵌套在消息的 content 数组中 Example: parser = ClaudeParser() for line in stream: events = parser.parse(json.loads(line)) for event in events: gui.push_event(event) """ def __init__(self) -> None: self.session_id: str | None = None self.model: str | None = None self._tool_names: dict[str, str] = {} # tool_use_id -> tool_name def parse(self, data: dict[str, Any]) -> list[UnifiedEvent]: """解析单个 Claude 事件。 注意:一个 Claude 事件可能产生多个统一事件 (例如一个 assistant 消息包含 thinking + tool_use) Args: data: 原始事件字典 Returns: 统一事件列表 """ event_type = data.get("type", "") subtype = data.get("subtype", "") base_kwargs = { "source": CLISource.CLAUDE, "raw": data, } if event_type == "system" and subtype == "init": return [self._parse_init(data, base_kwargs)] elif event_type == "assistant": return self._parse_assistant(data, base_kwargs) elif event_type == "user": return self._parse_user(data, base_kwargs) elif event_type == "result": return [self._parse_result(data, base_kwargs)] else: # Fallback: 未识别的事件类型 return [make_fallback_event(CLISource.CLAUDE, data)] def _parse_init( self, data: dict[str, Any], base: dict[str, Any] ) -> LifecycleEvent: """解析 system/init 事件。""" self.session_id = data.get("session_id") self.model = data.get("model") return LifecycleEvent( event_id=make_event_id("claude", "init"), lifecycle_type="session_start", session_id=self.session_id, model=self.model, status=Status.SUCCESS, stats={ "cwd": data.get("cwd"), "tools_count": len(data.get("tools", [])), "mcp_servers": [ s.get("name") for s in data.get("mcp_servers", []) if s.get("status") == "connected" ], "claude_code_version": data.get("claude_code_version"), }, **base, ) def _parse_assistant( self, data: dict[str, Any], base: dict[str, Any] ) -> list[UnifiedEvent]: """解析 assistant 消息。 一个 assistant 消息的 content[] 可能包含多个内容块: - thinking: 思考过程 - text: 文本输出 - tool_use: 工具调用 """ events: list[UnifiedEvent] = [] message = data.get("message", {}) content_list = message.get("content", []) session_id = data.get("session_id") or self.session_id for content in content_list: content_type = content.get("type", "") if content_type == "thinking": # 思考过程 thinking_text = content.get("thinking", "") if thinking_text: events.append(MessageEvent( event_id=make_event_id("claude", "thinking"), content_type=ContentType.REASONING, role="assistant", text=thinking_text, is_delta=False, session_id=session_id, # 传递 session_id **base, )) elif content_type == "text": # 文本输出 text = content.get("text", "") if text: events.append(MessageEvent( event_id=make_event_id("claude", "text"), content_type=ContentType.TEXT, role="assistant", text=text, is_delta=False, session_id=session_id, # 传递 session_id **base, )) elif content_type == "tool_use": # 工具调用 tool_name = content.get("name", "unknown") tool_id = content.get("id", "") tool_input = content.get("input", {}) # 缓存 tool_id -> tool_name if tool_id: self._tool_names[tool_id] = tool_name # 序列化输入 try: input_str = json.dumps(tool_input, ensure_ascii=False, indent=2) except (TypeError, ValueError): input_str = str(tool_input) # 确定操作类型 op_type = self._get_operation_type(tool_name) events.append(OperationEvent( event_id=make_event_id("claude", f"call_{tool_name}"), operation_type=op_type, name=tool_name, operation_id=tool_id, input=input_str, status=Status.RUNNING, session_id=session_id, # 传递 session_id metadata={"input": tool_input}, **base, )) return events if events else [make_fallback_event(CLISource.CLAUDE, data)] def _parse_user( self, data: dict[str, Any], base: dict[str, Any] ) -> list[UnifiedEvent]: """解析 user 消息。 user 消息通常包含 tool_result(工具执行结果)。 """ events: list[UnifiedEvent] = [] message = data.get("message", {}) content_list = message.get("content", []) session_id = data.get("session_id") or self.session_id for content in content_list: content_type = content.get("type", "") if content_type == "tool_result": tool_id = content.get("tool_use_id", "") output = content.get("content", "") is_error = content.get("is_error", False) # 从缓存获取工具名 tool_name = self._tool_names.get(tool_id, "unknown") op_type = self._get_operation_type(tool_name) events.append(OperationEvent( event_id=make_event_id("claude", f"result_{tool_name}"), operation_type=op_type, name=tool_name, operation_id=tool_id, output=output if isinstance(output, str) else str(output), status=Status.FAILED if is_error else Status.SUCCESS, session_id=session_id, # 传递 session_id **base, )) elif content_type == "text": # 用户文本输入(较少见) text = content.get("text", "") if text: events.append(MessageEvent( event_id=make_event_id("claude", "user_text"), content_type=ContentType.TEXT, role="user", text=text, is_delta=False, session_id=session_id, # 传递 session_id **base, )) return events if events else [make_fallback_event(CLISource.CLAUDE, data)] def _parse_result( self, data: dict[str, Any], base: dict[str, Any] ) -> LifecycleEvent: """解析 result 事件(会话结束)。""" subtype = data.get("subtype", "") is_error = data.get("is_error", False) usage = data.get("usage", {}) status = Status.FAILED if is_error or subtype == "error" else Status.SUCCESS stats = { "duration_ms": data.get("duration_ms"), "duration_api_ms": data.get("duration_api_ms"), "num_turns": data.get("num_turns"), "total_cost_usd": data.get("total_cost_usd"), "input_tokens": usage.get("input_tokens"), "output_tokens": usage.get("output_tokens"), "cache_creation_input_tokens": usage.get("cache_creation_input_tokens"), "cache_read_input_tokens": usage.get("cache_read_input_tokens"), } return LifecycleEvent( event_id=make_event_id("claude", "result"), lifecycle_type="session_end", session_id=data.get("session_id") or self.session_id, model=self.model, status=status, stats=stats, **base, ) def _get_operation_type(self, tool_name: str) -> OperationType: """根据工具名确定操作类型。""" name_lower = tool_name.lower() if name_lower == "bash": return OperationType.COMMAND elif name_lower in ("edit", "write"): return OperationType.FILE elif name_lower == "websearch": return OperationType.SEARCH elif name_lower == "todowrite": return OperationType.TODO elif name_lower.startswith("mcp__"): return OperationType.MCP else: return OperationType.TOOL def parse_claude_event(data: dict[str, Any]) -> list[UnifiedEvent]: """无状态解析单个 Claude 事件。 注意: 1. 此函数不维护状态,无法关联 tool_use 和 tool_result 的工具名 2. 返回的是事件列表(一个 Claude 事件可能产生多个统一事件) 如需完整功能,请使用 ClaudeParser 类。 Args: data: 原始事件字典 Returns: 统一事件列表 """ parser = ClaudeParser() return parser.parse(data)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shiharuharu/cli-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server