Skip to main content
Glama

mcp_job_status

Retrieve status and results for long-running MCP jobs like ETCH and NEXUS operations within the WorkFlowy integration server.

Instructions

Get status/result for long-running MCP jobs (ETCH, NEXUS, etc.).

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
job_idNo

Implementation Reference

  • Handler and registration for the 'mcp_job_status' tool. Retrieves status of in-memory asyncio jobs and detached WEAVE workers by scanning PIDs and persistent journals in nexus_runs directory. Supports querying all jobs or a specific job_id.
    @mcp.tool(
        name="mcp_job_status",
        description="Get status/result for long-running MCP jobs (ETCH, NEXUS, etc.).",
    )
    async def mcp_job_status(job_id: str | None = None) -> dict:
        """Get status for background jobs (in-memory + detached WEAVE workers).
    
        Scans:
        - In-memory asyncio jobs (_jobs registry)
        - Detached WEAVE workers via:
          • Active PIDs (.weave.pid under nexus_runs/)
          • Persistent journals (enchanted_terrain.weave_journal.json / *.weave_journal.json)
    
        This allows status/error inspection even after the worker PID has exited.
        """
        from .client.api_client import scan_active_weaves
        from pathlib import Path
        import json as json_module
    
        # Determine nexus_runs directory
        # TODO: make this configurable or derive from client
        nexus_runs_base = r"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\nexus_runs"
    
        def _scan_weave_journals(base_dir_str: str) -> list[dict[str, Any]]:
            """Scan nexus_runs/ for WEAVE journal files and summarize their status.
    
            Returns one entry per journal with:
            - job_id: weave-enchanted-<nexus_tag> or weave-direct-<stem>
            - mode:  'enchanted' | 'direct'
            - status: 'completed' | 'failed' | 'unknown'
            - detached: True
            - journal: path to *.weave_journal.json
            - log_file: associated .weave.log if present
            - last_run_* and phase fields from journal
            """
            base_dir = Path(base_dir_str)
            results: list[dict[str, Any]] = []
            if not base_dir.exists():
                return results
    
            for run_dir in base_dir.iterdir():
                if not run_dir.is_dir():
                    continue
                name = run_dir.name
                # Extract nexus_tag from directory name: either <tag> or <TIMESTAMP>__<tag>
                if "__" in name:
                    nexus_tag = name.split("__", 1)[1]
                else:
                    nexus_tag = name
    
                # Look for any *.weave_journal.json in this run directory
                for journal_path in run_dir.glob("*.weave_journal.json"):
                    try:
                        with open(journal_path, "r", encoding="utf-8") as jf:
                            journal = json_module.load(jf)
                    except Exception:
                        continue
    
                    json_file = journal.get("json_file")
                    if json_file and str(json_file).endswith("enchanted_terrain.json"):
                        mode = "enchanted"
                        job_id_val = f"weave-enchanted-{nexus_tag}"
                    else:
                        # Direct-mode weave (json_file is some other JSON path)
                        stem = Path(json_file).stem if json_file else name
                        mode = "direct"
                        job_id_val = f"weave-direct-{stem}"
    
                    last_completed = bool(journal.get("last_run_completed"))
                    last_error = journal.get("last_run_error")
                    if last_completed and not last_error:
                        status_val = "completed"
                    elif last_error:
                        status_val = "failed"
                    else:
                        status_val = "unknown"
    
                    log_file = run_dir / ".weave.log"
    
                    results.append(
                        {
                            "job_id": job_id_val,
                            "nexus_tag": nexus_tag,
                            "mode": mode,
                            "status": status_val,
                            "detached": True,
                            "journal": str(journal_path),
                            "log_file": str(log_file) if log_file.exists() else None,
                            "last_run_started_at": journal.get("last_run_started_at"),
                            "last_run_completed": journal.get("last_run_completed"),
                            "last_run_failed_at": journal.get("last_run_failed_at"),
                            "last_run_error": last_error,
                            "phase": journal.get("phase"),
                        }
                    )
    
            return results
    
        # Return status for one job (if job_id given) or all jobs
        if job_id is None:
            # List ALL jobs (in-memory + detached)
            in_memory_jobs: list[dict[str, Any]] = []
            for job in _jobs.values():
                in_memory_jobs.append(
                    {
                        "job_id": job.get("job_id"),
                        "kind": job.get("kind"),
                        "status": job.get("status"),
                        "started_at": job.get("started_at"),
                        "finished_at": job.get("finished_at"),
                        "detached": False,
                    }
                )
    
            # Active detached jobs (PIDs still running)
            active_weaves = {j["job_id"]: j for j in scan_active_weaves(nexus_runs_base)}
            # All WEAVE journals (completed/failed/unknown)
            journal_jobs = {j["job_id"]: j for j in _scan_weave_journals(nexus_runs_base)}
    
            # Merge active + journal info per job_id
            detached_jobs_dict: dict[str, dict[str, Any]] = {}
            for jid, jj in journal_jobs.items():
                merged = dict(jj)
                if jid in active_weaves:
                    # Active info (PID, running status) overrides journal's status,
                    # but we keep journal fields like last_run_error and phase.
                    merged.update(active_weaves[jid])
                detached_jobs_dict[jid] = merged
            # Any active jobs without journals (edge case)
            for jid, aj in active_weaves.items():
                if jid not in detached_jobs_dict:
                    detached_jobs_dict[jid] = aj
    
            detached_jobs = list(detached_jobs_dict.values())
    
            return {
                "success": True,
                "in_memory_jobs": in_memory_jobs,
                "detached_jobs": detached_jobs,
                "total": len(in_memory_jobs) + len(detached_jobs),
            }
    
        # Check in-memory first
        job = _jobs.get(job_id)
        if job:
            # Do not expose internal task handle
            view = {k: v for k, v in job.items() if k not in ("payload", "_task")}
            return {"success": True, **view}
    
        # Check detached WEAVE jobs (active or completed) using journals + PIDs
        active_weaves = {j["job_id"]: j for j in scan_active_weaves(nexus_runs_base)}
        journal_jobs = {j["job_id"]: j for j in _scan_weave_journals(nexus_runs_base)}
    
        if job_id in journal_jobs or job_id in active_weaves:
            merged: dict[str, Any] = {}
            if job_id in journal_jobs:
                merged.update(journal_jobs[job_id])
            if job_id in active_weaves:
                merged.update(active_weaves[job_id])
            merged["success"] = True
            return merged
    
        # Unknown job_id
        return {"success": False, "error": f"Unknown job_id: {job_id}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server