get_queue_metrics
Monitor queue depth and job status counts for Hatchet workflows to track performance and identify bottlenecks in job processing.
Instructions
Get queue depth and job counts by status.
Args: workflow_name: Optional workflow name to filter metrics
Returns counts of jobs in each status (queued, running, completed, failed).
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| workflow_name | No |
Implementation Reference
- src/hatchet_mcp/server.py:163-216 (handler)The `get_queue_metrics` tool handler is defined using the `@mcp.tool()` decorator in `src/hatchet_mcp/server.py`. It fetches workflow runs from the last 24 hours, optionally filtered by workflow name, and calculates counts for each status.
@mcp.tool() async def get_queue_metrics(workflow_name: str | None = None) -> dict: """ Get queue depth and job counts by status. Args: workflow_name: Optional workflow name to filter metrics Returns counts of jobs in each status (queued, running, completed, failed). """ try: hatchet = get_hatchet_client() # Get runs from the last 24 hours and count by status params: dict[str, Any] = { "since": datetime.now(tz=timezone.utc) - timedelta(hours=24), "limit": 1000, } if workflow_name: workflows = await hatchet.workflows.aio_list() workflow_ids = [ w.metadata.id for w in (workflows.rows or []) if hasattr(w, "name") and w.name == workflow_name ] if workflow_ids: params["workflow_ids"] = workflow_ids runs = await hatchet.runs.aio_list(**params) # Count by status counts = { "queued": 0, "running": 0, "completed": 0, "failed": 0, "cancelled": 0, "total": 0, } for run in (runs.rows or []): counts["total"] += 1 if hasattr(run, "status"): status_name = run.status.value.lower() if hasattr(run.status, "value") else str(run.status).lower() if status_name in counts: counts[status_name] += 1 return { "workflow_name": workflow_name or "all", "time_range_hours": 24, "counts": counts, } except Exception as e: return {"error": str(e)}