Skip to main content
Glama
codex.py16.4 kB
"""Codex CLI 事件解析器。 cli-agent-mcp shared/parsers v0.1.0 同步日期: 2025-12-16 将 Codex CLI 的 JSON 流事件解析为统一格式。 Codex CLI 事件类型: - thread.started: 会话开始 - turn.started: 轮次开始 - turn.completed: 轮次结束 - turn.failed: 轮次失败 - error: 错误事件 - item.started/item.updated/item.completed: 包含嵌套 item 对象 - item.type: agent_message, reasoning, command_execution, file_change, function_call, function_call_output, mcp_tool_call, web_search, todo_list """ from __future__ import annotations import json from typing import Any from .base import CLISource, ContentType, OperationType, Status from .unified import ( LifecycleEvent, MessageEvent, OperationEvent, SystemEvent, UnifiedEvent, make_event_id, make_fallback_event, ) __all__ = [ "parse_codex_event", "CodexParser", ] class CodexParser: """Codex CLI 事件解析器。 维护解析状态,支持 item 的 started/updated/completed 状态跟踪。 Example: parser = CodexParser() for line in stream: event = parser.parse(json.loads(line)) if event: gui.push_event(event) """ def __init__(self) -> None: self.session_id: str | None = None self._item_states: dict[str, dict[str, Any]] = {} # item_id -> item data self._function_names: dict[str, str] = {} # call_id -> function name def parse(self, data: dict[str, Any]) -> UnifiedEvent: """解析单个 Codex 事件。 Args: data: 原始事件字典 Returns: 统一事件实例 """ event_type = data.get("type", "") base_kwargs = { "source": CLISource.CODEX, "raw": data, } # 顶层事件分发 if event_type == "thread.started": return self._parse_thread_started(data, base_kwargs) elif event_type == "turn.started": return self._parse_turn_started(data, base_kwargs) elif event_type == "turn.completed": return self._parse_turn_completed(data, base_kwargs) elif event_type == "turn.failed": return self._parse_turn_failed(data, base_kwargs) elif event_type == "error": return self._parse_error(data, base_kwargs) elif event_type in ("item.started", "item.updated", "item.completed"): return self._parse_item(data, event_type, base_kwargs) else: # Fallback: 未识别的事件类型 return make_fallback_event(CLISource.CODEX, data) def _parse_thread_started( self, data: dict[str, Any], base: dict[str, Any] ) -> LifecycleEvent: """解析 thread.started 事件。""" self.session_id = data.get("thread_id") return LifecycleEvent( event_id=make_event_id("codex", "thread"), lifecycle_type="session_start", session_id=self.session_id, status=Status.SUCCESS, **base, ) def _parse_turn_started( self, data: dict[str, Any], base: dict[str, Any] ) -> LifecycleEvent: """解析 turn.started 事件。""" return LifecycleEvent( event_id=make_event_id("codex", "turn_start"), lifecycle_type="turn_start", session_id=self.session_id, status=Status.RUNNING, **base, ) def _parse_turn_completed( self, data: dict[str, Any], base: dict[str, Any] ) -> LifecycleEvent: """解析 turn.completed 事件。""" usage = data.get("usage", {}) stats = {} if usage: stats = { "input_tokens": usage.get("input_tokens"), "cached_input_tokens": usage.get("cached_input_tokens"), "output_tokens": usage.get("output_tokens"), } return LifecycleEvent( event_id=make_event_id("codex", "turn_end"), lifecycle_type="turn_end", session_id=self.session_id, status=Status.SUCCESS, stats=stats, **base, ) def _parse_turn_failed( self, data: dict[str, Any], base: dict[str, Any] ) -> LifecycleEvent: """解析 turn.failed 事件。""" error = data.get("error", {}) error_msg = "" if isinstance(error, dict): error_msg = error.get("message", "Turn failed") elif isinstance(error, str): error_msg = error return LifecycleEvent( event_id=make_event_id("codex", "turn_fail"), lifecycle_type="turn_end", session_id=self.session_id, status=Status.FAILED, stats={"error": error_msg}, **base, ) def _parse_error( self, data: dict[str, Any], base: dict[str, Any] ) -> SystemEvent: """解析 error 事件。""" message = data.get("message", "Unknown error") return SystemEvent( event_id=make_event_id("codex", "error"), severity="error", message=message, **base, ) def _parse_item( self, data: dict[str, Any], event_type: str, base: dict[str, Any] ) -> UnifiedEvent: """解析 item.* 事件。 将嵌套的 item 结构扁平化为统一事件。 """ item = data.get("item", {}) if not isinstance(item, dict): return make_fallback_event(CLISource.CODEX, data) item_type = item.get("type", "") item_id = item.get("id", "") item_status = item.get("status", "") is_completed = event_type == "item.completed" # 缓存 item 状态 if item_id: self._item_states[item_id] = item # 根据 item.type 分发 if item_type == "agent_message": return self._parse_agent_message(item, is_completed, base) elif item_type == "reasoning": return self._parse_reasoning(item, is_completed, base) elif item_type == "command_execution": return self._parse_command(item, item_status, is_completed, base) elif item_type == "file_change": return self._parse_file_change(item, item_status, is_completed, base) elif item_type == "function_call": return self._parse_function_call(item, is_completed, base) elif item_type == "function_call_output": return self._parse_function_output(item, is_completed, base) elif item_type == "mcp_tool_call": return self._parse_mcp_call(item, is_completed, base) elif item_type == "web_search": return self._parse_web_search(item, is_completed, base) elif item_type == "todo_list": return self._parse_todo_list(item, is_completed, base) else: # Fallback: 未识别的 item 类型 return make_fallback_event( CLISource.CODEX, data, f"Unknown item type: {item_type}" ) def _parse_agent_message( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> MessageEvent: """解析 agent_message。""" text = item.get("text", "") return MessageEvent( event_id=make_event_id("codex", "msg"), content_type=ContentType.TEXT, role="assistant", text=text, is_delta=not is_completed, session_id=self.session_id, **base, ) def _parse_reasoning( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> MessageEvent: """解析 reasoning。""" text = item.get("text", "") return MessageEvent( event_id=make_event_id("codex", "reasoning"), content_type=ContentType.REASONING, role="assistant", text=text, is_delta=not is_completed, session_id=self.session_id, **base, ) def _parse_command( self, item: dict[str, Any], item_status: str, is_completed: bool, base: dict[str, Any], ) -> OperationEvent: """解析 command_execution。""" command = item.get("command", "") output = item.get("aggregated_output", "") exit_code = item.get("exit_code") # 确定状态 if is_completed: if exit_code is not None and exit_code != 0: status = Status.FAILED else: status = Status.SUCCESS elif item_status == "in_progress": status = Status.RUNNING else: status = Status.PENDING return OperationEvent( event_id=make_event_id("codex", "cmd"), operation_type=OperationType.COMMAND, name=command[:50] if command else "shell", input=command, output=output if output else None, status=status, session_id=self.session_id, metadata={ "exit_code": exit_code, "item_id": item.get("id"), }, **base, ) def _parse_file_change( self, item: dict[str, Any], item_status: str, is_completed: bool, base: dict[str, Any], ) -> OperationEvent: """解析 file_change。""" changes = item.get("changes", []) # 构建变更摘要 change_summary = [] for c in changes[:10]: # 最多显示 10 个 if isinstance(c, dict): kind = c.get("kind", "") path = c.get("path", "") change_summary.append(f"{kind}: {path}") status = Status.SUCCESS if is_completed else Status.RUNNING return OperationEvent( event_id=make_event_id("codex", "file"), operation_type=OperationType.FILE, name=f"{len(changes)} files", output="\n".join(change_summary) if change_summary else None, status=status, session_id=self.session_id, metadata={ "changes": changes, "count": len(changes), }, **base, ) def _parse_function_call( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> OperationEvent: """解析 function_call。""" name = item.get("name", "unknown") call_id = item.get("call_id", "") arguments = item.get("arguments", "{}") # 缓存 call_id -> name 映射 if call_id: self._function_names[call_id] = name # 解析参数 try: if isinstance(arguments, str): args = json.loads(arguments) else: args = arguments input_str = json.dumps(args, ensure_ascii=False, indent=2) except (json.JSONDecodeError, TypeError): input_str = str(arguments) return OperationEvent( event_id=make_event_id("codex", f"call_{name}"), operation_type=OperationType.TOOL, name=name, operation_id=call_id, input=input_str, status=Status.RUNNING, session_id=self.session_id, metadata={"arguments": arguments}, **base, ) def _parse_function_output( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> OperationEvent: """解析 function_call_output。""" call_id = item.get("call_id", "") output = item.get("output", "") # 从缓存获取函数名 name = self._function_names.get(call_id, "unknown") # 判断是否有错误 has_error = isinstance(output, str) and "error" in output.lower() status = Status.FAILED if has_error else Status.SUCCESS output_str = output if isinstance(output, str) else str(output) return OperationEvent( event_id=make_event_id("codex", f"output_{name}"), operation_type=OperationType.TOOL, name=name, operation_id=call_id, output=output_str, status=status, session_id=self.session_id, **base, ) def _parse_mcp_call( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> OperationEvent: """解析 mcp_tool_call。""" server = item.get("server", "") tool = item.get("tool", "") arguments = item.get("arguments", {}) result = item.get("result", {}) error = item.get("error") name = f"{server}/{tool}" if server else tool # 确定状态 if error: status = Status.FAILED elif is_completed and result: status = Status.SUCCESS else: status = Status.RUNNING # 解析输入 try: input_str = json.dumps(arguments, ensure_ascii=False, indent=2) except (TypeError, ValueError): input_str = str(arguments) # 解析输出 output_str = None if error and isinstance(error, dict): output_str = error.get("message", str(error)) elif result: # MCP 结果通常是 { content: [{ type: "text", text: "..." }] } content = result.get("content", []) texts = [ b.get("text", "") for b in content[:5] if isinstance(b, dict) and b.get("type") == "text" ] output_str = "\n".join(texts) if texts else str(result) return OperationEvent( event_id=make_event_id("codex", f"mcp_{tool}"), operation_type=OperationType.MCP, name=name, input=input_str, output=output_str, status=status, session_id=self.session_id, metadata={ "server": server, "tool": tool, "arguments": arguments, }, **base, ) def _parse_web_search( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> OperationEvent: """解析 web_search。""" query = item.get("query", "") return OperationEvent( event_id=make_event_id("codex", "search"), operation_type=OperationType.SEARCH, name="web_search", input=query, status=Status.SUCCESS if is_completed else Status.RUNNING, session_id=self.session_id, **base, ) def _parse_todo_list( self, item: dict[str, Any], is_completed: bool, base: dict[str, Any] ) -> OperationEvent: """解析 todo_list。""" items = item.get("items", []) # 构建待办摘要 done_count = sum( 1 for t in items if isinstance(t, dict) and t.get("completed", False) ) total_count = len(items) # 构建输出文本 todo_lines = [] for t in items[:30]: # 最多显示 30 个 if isinstance(t, dict): marker = "✓" if t.get("completed", False) else "○" text = t.get("text", "") todo_lines.append(f"{marker} {text}") return OperationEvent( event_id=make_event_id("codex", "todo"), operation_type=OperationType.TODO, name=f"TODO {done_count}/{total_count}", output="\n".join(todo_lines) if todo_lines else None, status=Status.SUCCESS if is_completed else Status.RUNNING, session_id=self.session_id, metadata={ "items": items, "done": done_count, "total": total_count, }, **base, ) def parse_codex_event(data: dict[str, Any]) -> UnifiedEvent: """无状态解析单个 Codex 事件。 注意: 此函数不维护状态,无法关联 function_call 和 function_call_output。 如需完整功能,请使用 CodexParser 类。 Args: data: 原始事件字典 Returns: 统一事件实例 """ parser = CodexParser() return parser.parse(data)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shiharuharu/cli-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server