Skip to main content
Glama
us-all

airflow-mcp-server

by us-all

dag-health-rollup

Aggregates DAG health metrics over recent runs, including success rate, counts, average duration, last failed run ID, and failing task instances, to quickly assess if a DAG is running healthily.

Instructions

Aggregated DAG health: success-rate over the last N runs + count breakdown (succeeded/failed/queued) + average duration + last-failed-run id + (optional) failing task instances. Replaces the airflow-list-runs + airflow-get-task-instances combo for 'is this DAG healthy right now?'.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dagIdYesAirflow DAG id
recentRunsNo
includeFailingTasksNoIf true, fetch task instances for the most recent failed run

Implementation Reference

  • Main handler for dag-health-rollup: fetches recent DAG runs via airflowListRuns, computes success rate, counts, average duration, and optionally fetches failing task instances from the most recent failed run.
    export async function dagHealthRollup(args: z.infer<typeof dagHealthRollupSchema>): Promise<unknown> {
      const runs = (await airflowListRuns({
        dagId: args.dagId,
        limit: args.recentRuns,
      })) as { runs: Array<{ dagRunId: string; state: string; startDate?: string | null; endDate?: string | null }> };
    
      const enriched: DagRunOut[] = runs.runs.map((r) => {
        let durationSec: number | null = null;
        if (r.startDate && r.endDate) {
          durationSec = (new Date(r.endDate).getTime() - new Date(r.startDate).getTime()) / 1000;
        }
        return { ...r, durationSec };
      });
    
      const total = enriched.length;
      const succeeded = enriched.filter((r) => r.state === "success").length;
      const failed = enriched.filter((r) => r.state === "failed").length;
      const running = enriched.filter((r) => r.state === "running" || r.state === "queued").length;
      const successRatePct =
        total === 0 ? null : Math.round((succeeded / total) * 1000) / 10;
    
      const completed = enriched.filter((r) => r.durationSec != null);
      const avgDurationSec =
        completed.length === 0
          ? null
          : Math.round(
              completed.reduce((acc, r) => acc + (r.durationSec ?? 0), 0) /
                completed.length,
            );
    
      let lastFailureTasks: TaskInstance[] | null = null;
      const lastFailedRun = enriched.find((r) => r.state === "failed");
      if (args.includeFailingTasks && lastFailedRun) {
        try {
          const ti = (await airflowGetTaskInstances({
            dagId: args.dagId,
            dagRunId: lastFailedRun.dagRunId,
          })) as { taskInstances: Array<{ taskId: string; state: string | null; duration?: number | null }> };
          lastFailureTasks = ti.taskInstances
            .filter((t) => t.state !== "success")
            .map((t) => ({ taskId: t.taskId, state: t.state, duration: t.duration }));
        } catch {
          lastFailureTasks = null;
        }
      }
    
      return {
        dagId: args.dagId,
        window: { recentRuns: args.recentRuns, runsScanned: total },
        successRatePct,
        counts: { succeeded, failed, runningOrQueued: running, total },
        avgDurationSec,
        lastFailedRunId: lastFailedRun?.dagRunId ?? null,
        lastFailureTasks,
        runs: enriched,
      };
    }
  • Zod schema for dag-health-rollup: dagId (string), recentRuns (number 1-100, default 10), includeFailingTasks (boolean, default true).
    export const dagHealthRollupSchema = z.object({
      dagId: z.string().describe("Airflow DAG id"),
      recentRuns: z.coerce.number().int().min(1).max(100).default(10),
      includeFailingTasks: z.boolean().default(true).describe("If true, fetch task instances for the most recent failed run"),
    });
  • src/index.ts:53-55 (registration)
    Registration of dag-health-rollup as a tool with description and schema, using wrapToolHandler to wrap the handler function.
    tool("dag-health-rollup",
      "Aggregated DAG health: success-rate over the last N runs + count breakdown (succeeded/failed/queued) + average duration + last-failed-run id + (optional) failing task instances. Replaces the airflow-list-runs + airflow-get-task-instances combo for 'is this DAG healthy right now?'.",
      dagHealthRollupSchema.shape, wrapToolHandler(dagHealthRollup));
  • src/index.ts:19-19 (registration)
    Import of dagHealthRollupSchema and dagHealthRollup from ./tools/aggregations.js.
    import { dagHealthRollupSchema, dagHealthRollup } from "./tools/aggregations.js";
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations provided, so description carries full burden. It discloses aggregation of multiple runs, optional failing tasks, and key metrics. It does not mention potential side effects or limitations, but for a read-only aggregation tool, disclosure is adequate.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Single sentence that front-loads the most critical information (aggregated DAG health) followed by specifics. No wasted words.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given three parameters, no output schema, and no annotations, the description provides a comprehensive overview of what the tool returns: success rate, count breakdown, average duration, last failed run id, and optional failing tasks. This fully addresses the agent's need to decide when to use it.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema coverage is 67%, with two of three parameters described. The description adds meaning beyond schema: it explains that 'includeFailingTasks' fetches task instances for the most recent failed run, and implicitly links 'recentRuns' to 'over the last N runs'.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states it provides aggregated DAG health metrics (success rate, count breakdown, average duration, last failed run, optional failing tasks). It also distinguishes itself from sibling tools by explicitly stating it replaces the combo of airflow-list-runs and airflow-get-task-instances for health checks.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description explicitly says when to use: 'for "is this DAG healthy right now?"' and contrasts with alternatives (airflow-list-runs + airflow-get-task-instances), providing clear usage guidance.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/us-all/airflow-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server