mcp_job_status
Retrieve status and results for long-running MCP jobs like ETCH and NEXUS operations within the WorkFlowy integration server.
Instructions
Get status/result for long-running MCP jobs (ETCH, NEXUS, etc.).
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | No |
Implementation Reference
- src/workflowy_mcp/server.py:670-834 (handler)Handler and registration for the 'mcp_job_status' tool. Retrieves status of in-memory asyncio jobs and detached WEAVE workers by scanning PIDs and persistent journals in nexus_runs directory. Supports querying all jobs or a specific job_id.@mcp.tool( name="mcp_job_status", description="Get status/result for long-running MCP jobs (ETCH, NEXUS, etc.).", ) async def mcp_job_status(job_id: str | None = None) -> dict: """Get status for background jobs (in-memory + detached WEAVE workers). Scans: - In-memory asyncio jobs (_jobs registry) - Detached WEAVE workers via: • Active PIDs (.weave.pid under nexus_runs/) • Persistent journals (enchanted_terrain.weave_journal.json / *.weave_journal.json) This allows status/error inspection even after the worker PID has exited. """ from .client.api_client import scan_active_weaves from pathlib import Path import json as json_module # Determine nexus_runs directory # TODO: make this configurable or derive from client nexus_runs_base = r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\nexus_runs" def _scan_weave_journals(base_dir_str: str) -> list[dict[str, Any]]: """Scan nexus_runs/ for WEAVE journal files and summarize their status. Returns one entry per journal with: - job_id: weave-enchanted-<nexus_tag> or weave-direct-<stem> - mode: 'enchanted' | 'direct' - status: 'completed' | 'failed' | 'unknown' - detached: True - journal: path to *.weave_journal.json - log_file: associated .weave.log if present - last_run_* and phase fields from journal """ base_dir = Path(base_dir_str) results: list[dict[str, Any]] = [] if not base_dir.exists(): return results for run_dir in base_dir.iterdir(): if not run_dir.is_dir(): continue name = run_dir.name # Extract nexus_tag from directory name: either <tag> or <TIMESTAMP>__<tag> if "__" in name: nexus_tag = name.split("__", 1)[1] else: nexus_tag = name # Look for any *.weave_journal.json in this run directory for journal_path in run_dir.glob("*.weave_journal.json"): try: with open(journal_path, "r", encoding="utf-8") as jf: journal = json_module.load(jf) except Exception: continue json_file = journal.get("json_file") if json_file and str(json_file).endswith("enchanted_terrain.json"): mode = "enchanted" job_id_val = f"weave-enchanted-{nexus_tag}" else: # Direct-mode weave (json_file is some other JSON path) stem = Path(json_file).stem if json_file else name mode = "direct" job_id_val = f"weave-direct-{stem}" last_completed = bool(journal.get("last_run_completed")) last_error = journal.get("last_run_error") if last_completed and not last_error: status_val = "completed" elif last_error: status_val = "failed" else: status_val = "unknown" log_file = run_dir / ".weave.log" results.append( { "job_id": job_id_val, "nexus_tag": nexus_tag, "mode": mode, "status": status_val, "detached": True, "journal": str(journal_path), "log_file": str(log_file) if log_file.exists() else None, "last_run_started_at": journal.get("last_run_started_at"), "last_run_completed": journal.get("last_run_completed"), "last_run_failed_at": journal.get("last_run_failed_at"), "last_run_error": last_error, "phase": journal.get("phase"), } ) return results # Return status for one job (if job_id given) or all jobs if job_id is None: # List ALL jobs (in-memory + detached) in_memory_jobs: list[dict[str, Any]] = [] for job in _jobs.values(): in_memory_jobs.append( { "job_id": job.get("job_id"), "kind": job.get("kind"), "status": job.get("status"), "started_at": job.get("started_at"), "finished_at": job.get("finished_at"), "detached": False, } ) # Active detached jobs (PIDs still running) active_weaves = {j["job_id"]: j for j in scan_active_weaves(nexus_runs_base)} # All WEAVE journals (completed/failed/unknown) journal_jobs = {j["job_id"]: j for j in _scan_weave_journals(nexus_runs_base)} # Merge active + journal info per job_id detached_jobs_dict: dict[str, dict[str, Any]] = {} for jid, jj in journal_jobs.items(): merged = dict(jj) if jid in active_weaves: # Active info (PID, running status) overrides journal's status, # but we keep journal fields like last_run_error and phase. merged.update(active_weaves[jid]) detached_jobs_dict[jid] = merged # Any active jobs without journals (edge case) for jid, aj in active_weaves.items(): if jid not in detached_jobs_dict: detached_jobs_dict[jid] = aj detached_jobs = list(detached_jobs_dict.values()) return { "success": True, "in_memory_jobs": in_memory_jobs, "detached_jobs": detached_jobs, "total": len(in_memory_jobs) + len(detached_jobs), } # Check in-memory first job = _jobs.get(job_id) if job: # Do not expose internal task handle view = {k: v for k, v in job.items() if k not in ("payload", "_task")} return {"success": True, **view} # Check detached WEAVE jobs (active or completed) using journals + PIDs active_weaves = {j["job_id"]: j for j in scan_active_weaves(nexus_runs_base)} journal_jobs = {j["job_id"]: j for j in _scan_weave_journals(nexus_runs_base)} if job_id in journal_jobs or job_id in active_weaves: merged: dict[str, Any] = {} if job_id in journal_jobs: merged.update(journal_jobs[job_id]) if job_id in active_weaves: merged.update(active_weaves[job_id]) merged["success"] = True return merged # Unknown job_id return {"success": False, "error": f"Unknown job_id: {job_id}"}