Skip to main content
Glama
tomnagengast

MCP Server for Apache Airflow

by tomnagengast
airflow-client.ts14.7 kB
import fetch from "node-fetch"; export interface AirflowConfig { baseUrl: string; username?: string; password?: string; token?: string; } export class AirflowClient { private config: AirflowConfig; constructor() { this.config = { baseUrl: process.env.AIRFLOW_BASE_URL || "http://localhost:8080", username: process.env.AIRFLOW_USERNAME, password: process.env.AIRFLOW_PASSWORD, token: process.env.AIRFLOW_TOKEN, }; if (!this.config.token && (!this.config.username || !this.config.password)) { throw new Error( "Either AIRFLOW_TOKEN or both AIRFLOW_USERNAME and AIRFLOW_PASSWORD must be provided" ); } } private getAuthHeaders(): Record<string, string> { if (this.config.token) { return { Authorization: `Bearer ${this.config.token}`, }; } else if (this.config.username && this.config.password) { const credentials = Buffer.from( `${this.config.username}:${this.config.password}` ).toString("base64"); return { Authorization: `Basic ${credentials}`, }; } return {}; } private async makeRequest(endpoint: string, options: any = {}): Promise<any> { const url = `${this.config.baseUrl}/api/v1${endpoint}`; const headers = { "Content-Type": "application/json", ...this.getAuthHeaders(), ...options.headers, }; const response = await fetch(url, { ...options, headers, }); if (!response.ok) { const errorText = await response.text(); throw new Error(`Airflow API error: ${response.status} ${response.statusText}: ${errorText}`); } return response.json(); } async listDags(params: any = {}) { const queryParams = new URLSearchParams(); if (params.limit) queryParams.append("limit", params.limit.toString()); if (params.offset) queryParams.append("offset", params.offset.toString()); if (params.order_by) queryParams.append("order_by", params.order_by); const data = await this.makeRequest(`/dags?${queryParams}`); return { content: [ { type: "text" as const, text: `Found ${data.total_entries} DAGs:\n\n${data.dags .map((dag: any) => `• **${dag.dag_id}** - ${dag.description || "No description"}\n` + ` Status: ${dag.is_paused ? "Paused" : "Active"}\n` + ` Schedule: ${dag.schedule_interval || "None"}\n` ) .join("\n")}`, }, ], }; } async getDag(dagId: string) { const data = await this.makeRequest(`/dags/${dagId}`); return { content: [ { type: "text" as const, text: `**DAG: ${data.dag_id}**\n\n` + `Description: ${data.description || "No description"}\n` + `Status: ${data.is_paused ? "Paused" : "Active"}\n` + `Schedule: ${data.schedule_interval || "None"}\n` + `Start Date: ${data.start_date}\n` + `Catchup: ${data.catchup}\n` + `Max Active Runs: ${data.max_active_runs}\n` + `Tags: ${data.tags?.join(", ") || "None"}`, }, ], }; } async triggerDag(dagId: string, dagRunId?: string, conf?: any) { const payload: any = {}; if (dagRunId) payload.dag_run_id = dagRunId; if (conf) payload.conf = conf; const data = await this.makeRequest(`/dags/${dagId}/dagRuns`, { method: "POST", body: JSON.stringify(payload), }); return { content: [ { type: "text" as const, text: `DAG run triggered successfully!\n\n` + `DAG ID: ${data.dag_id}\n` + `Run ID: ${data.dag_run_id}\n` + `State: ${data.state}\n` + `Execution Date: ${data.execution_date}\n` + `Start Date: ${data.start_date || "Not started"}`, }, ], }; } async listDagRuns(dagId: string, params: any = {}) { const queryParams = new URLSearchParams(); if (params.limit) queryParams.append("limit", params.limit.toString()); if (params.offset) queryParams.append("offset", params.offset.toString()); const data = await this.makeRequest(`/dags/${dagId}/dagRuns?${queryParams}`); return { content: [ { type: "text" as const, text: `DAG Runs for ${dagId} (${data.total_entries} total):\n\n${data.dag_runs .map((run: any) => `• **${run.dag_run_id}**\n` + ` State: ${run.state}\n` + ` Start: ${run.start_date || "Not started"}\n` + ` End: ${run.end_date || "Running"}\n` + ` Duration: ${run.end_date && run.start_date ? Math.round((new Date(run.end_date).getTime() - new Date(run.start_date).getTime()) / 1000) + "s" : "N/A"}\n` ) .join("\n")}`, }, ], }; } async getDagRun(dagId: string, dagRunId: string) { const data = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}`); return { content: [ { type: "text" as const, text: `**DAG Run: ${data.dag_run_id}**\n\n` + `DAG ID: ${data.dag_id}\n` + `State: ${data.state}\n` + `Start Date: ${data.start_date || "Not started"}\n` + `End Date: ${data.end_date || "Running"}\n` + `Duration: ${data.end_date && data.start_date ? Math.round((new Date(data.end_date).getTime() - new Date(data.start_date).getTime()) / 1000) + "s" : "N/A"}\n` + `External Trigger: ${data.external_trigger}\n` + `Configuration: ${data.conf ? JSON.stringify(data.conf, null, 2) : "None"}`, }, ], }; } async listTaskInstances(dagId: string, dagRunId: string) { const data = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances`); return { content: [ { type: "text" as const, text: `Task Instances for ${dagId}/${dagRunId}:\n\n${data.task_instances .map((task: any) => `• **${task.task_id}**\n` + ` State: ${task.state}\n` + ` Start: ${task.start_date || "Not started"}\n` + ` End: ${task.end_date || "Running"}\n` + ` Duration: ${task.duration ? Math.round(task.duration) + "s" : "N/A"}\n` + ` Try: ${task.try_number}\n` ) .join("\n")}`, }, ], }; } async getTaskInstance(dagId: string, dagRunId: string, taskId: string) { const data = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances/${taskId}`); return { content: [ { type: "text" as const, text: `**Task Instance: ${data.task_id}**\n\n` + `DAG ID: ${data.dag_id}\n` + `Run ID: ${data.dag_run_id}\n` + `State: ${data.state}\n` + `Start Date: ${data.start_date || "Not started"}\n` + `End Date: ${data.end_date || "Running"}\n` + `Duration: ${data.duration ? Math.round(data.duration) + "s" : "N/A"}\n` + `Try Number: ${data.try_number}\n` + `Max Tries: ${data.max_tries}\n` + `Queue: ${data.queue}\n` + `Pool: ${data.pool}\n` + `Priority Weight: ${data.priority_weight}`, }, ], }; } async pauseDag(dagId: string) { await this.makeRequest(`/dags/${dagId}?update_mask=is_paused`, { method: "PATCH", body: JSON.stringify({ is_paused: true }), }); return { content: [ { type: "text" as const, text: `DAG "${dagId}" has been paused successfully.`, }, ], }; } async unpauseDag(dagId: string) { await this.makeRequest(`/dags/${dagId}?update_mask=is_paused`, { method: "PATCH", body: JSON.stringify({ is_paused: false }), }); return { content: [ { type: "text" as const, text: `DAG "${dagId}" has been unpaused successfully.`, }, ], }; } async getTaskLogs(dagId: string, dagRunId: string, taskId: string, taskTryNumber: number = 1, fullContent: boolean = true) { const queryParams = new URLSearchParams(); queryParams.append("full_content", fullContent.toString()); const data = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances/${taskId}/logs/${taskTryNumber}?${queryParams}`); // Decode the escaped content let decodedContent = data.content || ""; try { // Handle escaped characters in log content decodedContent = decodedContent.replace(/\\n/g, '\n').replace(/\\t/g, '\t').replace(/\\r/g, '\r'); } catch (error) { // If decoding fails, use raw content console.warn("Failed to decode log content:", error); } return { content: [ { type: "text" as const, text: `**Task Logs: ${taskId}** (Try ${taskTryNumber})\n\n` + `DAG: ${dagId}\n` + `Run: ${dagRunId}\n` + `Task: ${taskId}\n` + `Try Number: ${taskTryNumber}\n\n` + `**Logs:**\n\`\`\`\n${decodedContent}\n\`\`\``, }, ], }; } async getDagRunLogs(dagId: string, dagRunId: string, limit: number = 10) { // First get all task instances for the DAG run const taskInstances = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances`); const logs: string[] = []; logs.push(`**DAG Run Logs: ${dagId}/${dagRunId}**\n`); // Get logs for each task instance (limit to avoid overwhelming output) const tasksToShow = taskInstances.task_instances.slice(0, limit); for (const task of tasksToShow) { try { const taskLogs = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances/${task.task_id}/logs/${task.try_number}?full_content=true`); let decodedContent = taskLogs.content || ""; try { decodedContent = decodedContent.replace(/\\n/g, '\n').replace(/\\t/g, '\t').replace(/\\r/g, '\r'); } catch (error) { // Use raw content if decoding fails } logs.push(`\n### Task: ${task.task_id} (${task.state}) - Try ${task.try_number}\n`); if (decodedContent.trim()) { // Truncate very long logs const truncatedContent = decodedContent.length > 2000 ? decodedContent.substring(0, 2000) + "\n... (truncated, use get_task_logs for full content)" : decodedContent; logs.push(`\`\`\`\n${truncatedContent}\n\`\`\`\n`); } else { logs.push("*No logs available*\n"); } } catch (error) { logs.push(`\n### Task: ${task.task_id} - Error fetching logs\n`); logs.push(`Error: ${error instanceof Error ? error.message : String(error)}\n`); } } if (taskInstances.task_instances.length > limit) { logs.push(`\n*Showing ${limit} of ${taskInstances.task_instances.length} tasks. Use get_task_logs for specific task logs.*`); } return { content: [ { type: "text" as const, text: logs.join(""), }, ], }; } async tailDagRun(dagId: string, dagRunId: string, maxLines: number = 50) { // Get current DAG run status and task instances const dagRun = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}`); const taskInstances = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances`); const output: string[] = []; output.push(`**Tailing DAG Run: ${dagId}/${dagRunId}**\n`); output.push(`Status: ${dagRun.state}\n`); output.push(`Start: ${dagRun.start_date || "Not started"}\n`); output.push(`End: ${dagRun.end_date || "Running"}\n\n`); // Show recent task state changes and logs const runningOrRecentTasks = taskInstances.task_instances .filter((task: any) => task.state === 'running' || task.state === 'failed' || task.state === 'success' || task.state === 'upstream_failed' || task.state === 'skipped' ) .sort((a: any, b: any) => { // Sort by start_date descending (most recent first) if (!a.start_date && !b.start_date) return 0; if (!a.start_date) return 1; if (!b.start_date) return -1; return new Date(b.start_date).getTime() - new Date(a.start_date).getTime(); }) .slice(0, 5); // Show top 5 most recent tasks output.push("**Recent Task Activity:**\n"); for (const task of runningOrRecentTasks) { output.push(`\n### ${task.task_id} (${task.state})`); if (task.start_date) { output.push(` - Started: ${task.start_date}`); } if (task.end_date) { output.push(` - Ended: ${task.end_date}`); } output.push(`\n`); // For running or recently failed tasks, show tail of logs if (task.state === 'running' || task.state === 'failed') { try { const taskLogs = await this.makeRequest(`/dags/${dagId}/dagRuns/${dagRunId}/taskInstances/${task.task_id}/logs/${task.try_number}?full_content=true`); let decodedContent = taskLogs.content || ""; try { decodedContent = decodedContent.replace(/\\n/g, '\n').replace(/\\t/g, '\t').replace(/\\r/g, '\r'); } catch (error) { // Use raw content if decoding fails } if (decodedContent.trim()) { // Get last N lines const lines = decodedContent.split('\n'); const tailLines = lines.slice(-maxLines); output.push(`**Recent logs (last ${Math.min(maxLines, lines.length)} lines):**\n`); output.push(`\`\`\`\n${tailLines.join('\n')}\n\`\`\`\n`); } else { output.push("*No logs available yet*\n"); } } catch (error) { output.push(`*Error fetching logs: ${error instanceof Error ? error.message : String(error)}*\n`); } } } // Summary of all task states const taskStates = taskInstances.task_instances.reduce((acc: any, task: any) => { acc[task.state] = (acc[task.state] || 0) + 1; return acc; }, {}); output.push(`\n**Task Summary:**\n`); Object.entries(taskStates).forEach(([state, count]) => { output.push(`- ${state}: ${count}\n`); }); output.push(`\n*Use get_task_logs for complete logs of specific tasks*`); return { content: [ { type: "text" as const, text: output.join(""), }, ], }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tomnagengast/mcp-server-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server