dag-health-rollup
Aggregates DAG health metrics over recent runs, including success rate, counts, average duration, last failed run ID, and failing task instances, to quickly assess if a DAG is running healthily.
Instructions
Aggregated DAG health: success-rate over the last N runs + count breakdown (succeeded/failed/queued) + average duration + last-failed-run id + (optional) failing task instances. Replaces the airflow-list-runs + airflow-get-task-instances combo for 'is this DAG healthy right now?'.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dagId | Yes | Airflow DAG id | |
| recentRuns | No | ||
| includeFailingTasks | No | If true, fetch task instances for the most recent failed run |
Implementation Reference
- src/tools/aggregations.ts:24-80 (handler)Main handler for dag-health-rollup: fetches recent DAG runs via airflowListRuns, computes success rate, counts, average duration, and optionally fetches failing task instances from the most recent failed run.
export async function dagHealthRollup(args: z.infer<typeof dagHealthRollupSchema>): Promise<unknown> { const runs = (await airflowListRuns({ dagId: args.dagId, limit: args.recentRuns, })) as { runs: Array<{ dagRunId: string; state: string; startDate?: string | null; endDate?: string | null }> }; const enriched: DagRunOut[] = runs.runs.map((r) => { let durationSec: number | null = null; if (r.startDate && r.endDate) { durationSec = (new Date(r.endDate).getTime() - new Date(r.startDate).getTime()) / 1000; } return { ...r, durationSec }; }); const total = enriched.length; const succeeded = enriched.filter((r) => r.state === "success").length; const failed = enriched.filter((r) => r.state === "failed").length; const running = enriched.filter((r) => r.state === "running" || r.state === "queued").length; const successRatePct = total === 0 ? null : Math.round((succeeded / total) * 1000) / 10; const completed = enriched.filter((r) => r.durationSec != null); const avgDurationSec = completed.length === 0 ? null : Math.round( completed.reduce((acc, r) => acc + (r.durationSec ?? 0), 0) / completed.length, ); let lastFailureTasks: TaskInstance[] | null = null; const lastFailedRun = enriched.find((r) => r.state === "failed"); if (args.includeFailingTasks && lastFailedRun) { try { const ti = (await airflowGetTaskInstances({ dagId: args.dagId, dagRunId: lastFailedRun.dagRunId, })) as { taskInstances: Array<{ taskId: string; state: string | null; duration?: number | null }> }; lastFailureTasks = ti.taskInstances .filter((t) => t.state !== "success") .map((t) => ({ taskId: t.taskId, state: t.state, duration: t.duration })); } catch { lastFailureTasks = null; } } return { dagId: args.dagId, window: { recentRuns: args.recentRuns, runsScanned: total }, successRatePct, counts: { succeeded, failed, runningOrQueued: running, total }, avgDurationSec, lastFailedRunId: lastFailedRun?.dagRunId ?? null, lastFailureTasks, runs: enriched, }; } - src/tools/aggregations.ts:4-8 (schema)Zod schema for dag-health-rollup: dagId (string), recentRuns (number 1-100, default 10), includeFailingTasks (boolean, default true).
export const dagHealthRollupSchema = z.object({ dagId: z.string().describe("Airflow DAG id"), recentRuns: z.coerce.number().int().min(1).max(100).default(10), includeFailingTasks: z.boolean().default(true).describe("If true, fetch task instances for the most recent failed run"), }); - src/index.ts:53-55 (registration)Registration of dag-health-rollup as a tool with description and schema, using wrapToolHandler to wrap the handler function.
tool("dag-health-rollup", "Aggregated DAG health: success-rate over the last N runs + count breakdown (succeeded/failed/queued) + average duration + last-failed-run id + (optional) failing task instances. Replaces the airflow-list-runs + airflow-get-task-instances combo for 'is this DAG healthy right now?'.", dagHealthRollupSchema.shape, wrapToolHandler(dagHealthRollup)); - src/index.ts:19-19 (registration)Import of dagHealthRollupSchema and dagHealthRollup from ./tools/aggregations.js.
import { dagHealthRollupSchema, dagHealthRollup } from "./tools/aggregations.js";