codex_poll
Poll an asynchronous codex job to check its status and retrieve output. Use the job ID from codex_start to get real-time updates and trailing lines of output.
Instructions
Poll the status and output of a running (or finished) codex job.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | Yes | The job_id returned by codex_start. | |
| tail_lines | No | How many trailing lines of output to return. Default: 100. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/codex_async_mcp/server.py:25-36 (handler)The MCP tool handler for 'codex_poll' – registered with @mcp.tool(), it delegates to poll_job() in job_manager.py.
def codex_poll(job_id: str, tail_lines: int = 100) -> dict: """ Poll the status and output of a running (or finished) codex job. Args: job_id: The job_id returned by codex_start. tail_lines: How many trailing lines of output to return. Default: 100. Returns a dict with status ('running' | 'done' | 'error' | 'cancelled'), exit_code, and the latest output. """ return poll_job(job_id, tail_lines) - src/codex_async_mcp/server.py:24-24 (registration)Registration of 'codex_poll' as an MCP tool via the @mcp.tool() decorator.
@mcp.tool() - The core polling logic: reads job metadata, checks PID liveness as a fallback, reads output tail, and returns status.
def poll_job(job_id: str, tail_lines: int = JOB_TAIL_LINES) -> dict: meta = _read_meta(job_id) if meta is None: return {"error": f"Job '{job_id}' not found"} # If meta still says running but monitor thread hasn't updated yet, # fall back to PID liveness check (covers server-restart case too). if meta["status"] == "running": with _lock: proc = _active_procs.get(job_id) if proc is None: pid = meta.get("pid") if pid and not _is_pid_alive(pid): with _lock: meta = _read_meta(job_id) if meta and meta["status"] == "running": meta["status"] = "done" meta["finished_at"] = datetime.now(timezone.utc).isoformat() _write_meta(job_id, meta) meta = _read_meta(job_id) output = _read_tail(job_id, tail_lines) return { "job_id": job_id, "status": meta["status"], "exit_code": meta.get("exit_code"), "output": output, "started_at": meta.get("started_at"), "finished_at": meta.get("finished_at"), "prompt": meta.get("prompt"), "cwd": meta.get("cwd"), } - Helper used by poll_job to read the trailing lines of output from a job's output file.
def _read_tail(job_id: str, tail_lines: int = JOB_TAIL_LINES) -> str: output_path = _job_dir(job_id) / "output.txt" if not output_path.exists(): return "" text = output_path.read_text(errors="replace") lines = text.splitlines() tail = "\n".join(lines[-tail_lines:]) if len(tail) > MAX_OUTPUT_CHARS: tail = "...(truncated)...\n" + tail[-MAX_OUTPUT_CHARS:] return tail - Helper used by poll_job to read the job's metadata JSON file.
def _read_meta(job_id: str) -> Optional[dict]: path = _job_dir(job_id) / "meta.json" if not path.exists(): return None with open(path) as f: return json.load(f)