list_runs
Retrieve workflow execution records with filters for name, status, time range, and result count to monitor and debug processes.
Instructions
List workflow runs with optional filters.
Args: workflow_name: Filter by workflow name (e.g., 'qa-workflow', 'embed-workflow') status: Filter by status ('queued', 'running', 'completed', 'failed', 'cancelled') since_hours: How many hours back to search (default: 24) limit: Maximum number of runs to return (default: 50)
Returns a list of runs with their status, metadata, and timing info.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| workflow_name | No | ||
| status | No | ||
| since_hours | No | ||
| limit | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/hatchet_mcp/server.py:82-125 (handler)The `list_runs` tool handler implementation. It uses the Hatchet SDK to query workflow runs and applies filtering logic for workflow name, status, time range, and result count limits.
@mcp.tool() async def list_runs( workflow_name: str | None = None, status: str | None = None, since_hours: int = 24, limit: int = 50, ) -> list[dict]: """ List workflow runs with optional filters. Args: workflow_name: Filter by workflow name (e.g., 'qa-workflow', 'embed-workflow') status: Filter by status ('queued', 'running', 'completed', 'failed', 'cancelled') since_hours: How many hours back to search (default: 24) limit: Maximum number of runs to return (default: 50) Returns a list of runs with their status, metadata, and timing info. """ try: hatchet = get_hatchet_client() # Build filter parameters params: dict[str, Any] = { "since": datetime.now(tz=timezone.utc) - timedelta(hours=since_hours), "limit": limit, } if status and status.lower() in STATUS_MAP: params["statuses"] = [STATUS_MAP[status.lower()]] if workflow_name: # Need to get workflow ID from name workflows = await hatchet.workflows.aio_list() workflow_ids = [ w.metadata.id for w in (workflows.rows or []) if hasattr(w, "name") and w.name == workflow_name ] if workflow_ids: params["workflow_ids"] = workflow_ids runs = await hatchet.runs.aio_list(**params) return [_serialize_run(r) for r in (runs.rows or [])] except Exception as e: return [{"error": str(e)}]