search_runs
Search workflow runs by metadata key-value pairs like audit_id, patient_id, or rule_id to monitor and debug Hatchet workflows.
Instructions
Search runs by metadata key-value pairs.
Common metadata keys:
audit_id: The audit being processed
audit_type: Type of audit (e.g., 'standard', 'express')
patient_id: Patient being processed
application_id: Application ID
rule_id: Rule being processed
Args: metadata_key: The metadata key to search (e.g., 'audit_id') metadata_value: The value to match status: Optional status filter since_hours: How many hours back to search (default: 24) limit: Maximum runs to return (default: 50)
Returns matching runs with their full metadata.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| metadata_key | Yes | ||
| metadata_value | Yes | ||
| status | No | ||
| since_hours | No | ||
| limit | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/hatchet_mcp/server.py:219-259 (handler)The `search_runs` tool is defined here, decorated with `@mcp.tool()`. It takes metadata key-value pairs and other filters to search for Hatchet workflow runs.
async def search_runs( metadata_key: str, metadata_value: str, status: str | None = None, since_hours: int = 24, limit: int = 50, ) -> list[dict]: """ Search runs by metadata key-value pairs. Common metadata keys: - audit_id: The audit being processed - audit_type: Type of audit (e.g., 'standard', 'express') - patient_id: Patient being processed - application_id: Application ID - rule_id: Rule being processed Args: metadata_key: The metadata key to search (e.g., 'audit_id') metadata_value: The value to match status: Optional status filter since_hours: How many hours back to search (default: 24) limit: Maximum runs to return (default: 50) Returns matching runs with their full metadata. """ try: hatchet = get_hatchet_client() params: dict[str, Any] = { "since": datetime.now(tz=timezone.utc) - timedelta(hours=since_hours), "limit": limit, "additional_metadata": {metadata_key: metadata_value}, } if status and status.lower() in STATUS_MAP: params["statuses"] = [STATUS_MAP[status.lower()]] runs = await hatchet.runs.aio_list(**params) return [_serialize_run(r) for r in (runs.rows or [])] except Exception as e: return [{"error": str(e)}]