airflow-get-task-logs
Retrieve the last N kilobytes of an Airflow task instance log for a given try number, enabling quick inspection of recent logs without loading the entire file.
Instructions
Fetch the tail (last N kB) of an Airflow task instance log for a specific try_number
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dagId | Yes | ||
| dagRunId | Yes | ||
| taskId | Yes | ||
| tryNumber | No | ||
| tailKb | No | Return only the last N kilobytes of log |
Implementation Reference
- src/tools/dags.ts:121-139 (handler)Handler function that fetches Airflow task instance logs via the REST API, trims to tailKb, and returns the content along with metadata.
export async function airflowGetTaskLogs(args: z.infer<typeof airflowGetTaskLogsSchema>): Promise<unknown> { const path = `/dags/${encodeURIComponent(args.dagId)}/dagRuns/${encodeURIComponent(args.dagRunId)}/taskInstances/${encodeURIComponent(args.taskId)}/logs/${args.tryNumber}?full_content=true`; const data = await airflowFetch<{ content?: string; continuation_token?: unknown }>(path); let content = data.content ?? ""; const limit = args.tailKb * 1024; let truncated = false; if (content.length > limit) { truncated = true; content = content.slice(content.length - limit); } return { dagId: args.dagId, dagRunId: args.dagRunId, taskId: args.taskId, tryNumber: args.tryNumber, truncated, content, }; } - src/tools/dags.ts:113-119 (schema)Zod schema defining the input parameters for the tool: dagId, dagRunId, taskId, tryNumber (default 1), and tailKb (1-64, default 16).
export const airflowGetTaskLogsSchema = z.object({ dagId: z.string(), dagRunId: z.string(), taskId: z.string(), tryNumber: z.coerce.number().int().min(1).default(1), tailKb: z.coerce.number().int().min(1).max(64).default(16).describe("Return only the last N kilobytes of log"), }); - src/index.ts:49-49 (registration)Registration of the tool with the MCP server, wiring the schema and handler together.
tool("airflow-get-task-logs", "Fetch the tail (last N kB) of an Airflow task instance log for a specific try_number", airflowGetTaskLogsSchema.shape, wrapToolHandler(airflowGetTaskLogs)); - src/clients/airflow.ts:65-94 (helper)HTTP client helper used by airflowGetTaskLogs to make authenticated requests to the Airflow REST API v2.
export async function airflowFetch<T = unknown>( path: string, init: RequestInit & { method?: string; body?: string } = {}, ): Promise<T> { if (!config.apiBase) { throw new Error("AIRFLOW_API_URL is not configured"); } const token = await getValidToken(); const url = `${config.apiBase}/api/v2${path.startsWith("/") ? "" : "/"}${path}`; const headers = new Headers(init.headers); headers.set("Authorization", `Bearer ${token}`); if (init.body && !headers.has("Content-Type")) headers.set("Content-Type", "application/json"); headers.set("Accept", "application/json"); const res = await fetch(url, { ...init, headers }); const text = await res.text(); let body: unknown; try { body = text ? JSON.parse(text) : null; } catch { body = text; } if (!res.ok) { if (res.status === 401) { // Token may have rotated — invalidate cache and let caller retry one tokenCache = null; } throw new AirflowApiError( res.status, body, `Airflow API ${init.method ?? "GET"} ${path} failed: ${res.status} ${res.statusText}`, ); } return body as T; }