Provides tools to interact with Apache Airflow instances to inspect DAGs, monitor DAG runs, retrieve task logs, and perform write operations such as triggering DAGs and clearing task instances.
Click on "Install Server".
Wait a few minutes for the server to deploy. Once ready, it will show a "Started" state.
In the chat, type
@followed by the MCP server name and your instructions, e.g., "@Apache Airflow MCP Servershow me the logs for the failed task in the daily_etl DAG"
That's it! The server will respond to your query, and you can continue using it as needed.
Here is a step-by-step guide with screenshots.
Airflow MCP Server
Human entrypoint for running and using the Apache Airflow MCP server. This server exposes safe, focused tools to inspect Airflow DAGs, runs, and logs (with optional write operations gated by client approval). Responses are structured JSON objects (dicts) and include a request_id for traceability.
Quickstart
1) Install the server (PyPI)
Install with uv so you are exercising the exact bits that ship to users and get reproducible virtualenvs:
No uv? Fall back to pip:
2) Configure instances (required)
Set AIRFLOW_MCP_INSTANCES_FILE to a YAML file listing available Airflow instances. Values may reference environment variables using ${VAR} syntax. Missing variables cause startup errors.
Example (examples/instances.yaml):
Bearer token auth is experimental; basic auth remains the primary, well-tested path.
Kubernetes deployment tip: Provide instances.yaml via a Secret and mount it at /config/instances.yaml (set AIRFLOW_MCP_INSTANCES_FILE=/config/instances.yaml).
Environment variables:
AIRFLOW_MCP_INSTANCES_FILE(required): path to registry YAMLAIRFLOW_MCP_DEFAULT_INSTANCE(optional): default instance keyAIRFLOW_MCP_HTTP_HOST(default: 127.0.0.1)AIRFLOW_MCP_HTTP_PORT(default: 8765)AIRFLOW_MCP_TIMEOUT_SECONDS(default: 30)AIRFLOW_MCP_LOG_FILE(optional)AIRFLOW_MCP_HTTP_BLOCK_GET_ON_MCP(default: true)
3) Run the server
HTTP (recommended for tooling):
STDIO (CLI/terminal workflows):
Health check (HTTP): GET /health → 200 OK.
Tip: A fastmcp.json is included for discovery/config by FastMCP tooling:
4) Typical incident workflow
Start from an Airflow UI URL (often in a Datadog alert):
airflow_resolve_url(url)→ resolveinstance,dag_id,dag_run_id,task_id.airflow_list_dag_runs(instance|ui_url, dag_id)→ confirm recent state.airflow_get_task_instance(instance|ui_url, dag_id, dag_run_id, task_id, include_rendered?, max_rendered_bytes?)→ inspect task metadata, attempts, and optional rendered fields.airflow_get_task_instance_logs(instance|ui_url, dag_id, dag_run_id, task_id, try_number, filter_level?, context_lines?, tail_lines?, max_bytes?)→ inspect failure with optional filtering and truncation.
All tools accept either instance or ui_url. If both are given and disagree, the call fails with INSTANCE_MISMATCH. ui_url must be a fully qualified http(s) Airflow URL; use airflow_list_instances() to discover valid hosts when you only have an instance key.
Tool Reference (Structured JSON)
Discovery and URL utilities:
airflow_list_instances()→ list configured instance keys and defaultairflow_describe_instance(instance)→ host, api_version, verify_ssl,auth_type(redacted)airflow_resolve_url(url)→ resolve instance and identifiers from an Airflow UI URL
Read-only tools:
airflow_list_dags(instance|ui_url, limit?, offset?, state?/filters)→ compact DAGs with UI linksairflow_get_dag(instance|ui_url, dag_id)→ DAG details + UI linkairflow_list_dag_runs(instance|ui_url, dag_id, state?, limit?, offset?, order_by?, descending?)→ runs + per-run UI links. Defaults toexecution_datedescending ("latest first"). Accepts explicit ordering bystart_date,end_date, orexecution_dateairflow_get_dag_run(instance|ui_url, dag_id, dag_run_id)→ run details + UI linkairflow_list_task_instances(instance|ui_url, dag_id, dag_run_id, limit?, offset?, state?, task_ids?)→ task attempts for a run, with per-attempt log URLs, optional server-side filtering by state or task id, and afiltersecho describing applied filters (countreflects filtered results;total_entriesmirrors the API response when available)airflow_get_task_instance(instance|ui_url, dag_id, dag_run_id, task_id, include_rendered?, max_rendered_bytes?)→ concise task metadata (state, timings, attempts, config) with optional rendered template fields and direct UI links; sensors incrementtry_numberon every reschedule, so treat it as an attempt index (derivedretries_*fields are heuristic)airflow_get_task_instance_logs(instance|ui_url, dag_id, dag_run_id, task_id, try_number, filter_level?, context_lines?, tail_lines?, max_bytes?)→ log text with optional filtering; response includestruncated,auto_tailed, and statsairflow_dataset_events(instance|ui_url, dataset_uri, limit?)→ dataset events (optional capability)
Write tools (require client approval; destructive):
airflow_trigger_dag(instance|ui_url, dag_id, conf?, logical_date?, dag_run_id?, note?)airflow_clear_task_instances(instance|ui_url, dag_id, task_ids?, start_date?, end_date?, include_*?, dry_run?)airflow_clear_dag_run(instance|ui_url, dag_id, dag_run_id, include_*?, dry_run?, reset_dag_runs?)airflow_pause_dag(instance|ui_url, dag_id)/airflow_unpause_dag(instance|ui_url, dag_id)
Contract:
Success: dict payloads include a
request_id(traceable in logs). FastMCP serializes them automatically.Failure: tools raise an MCP
ToolErrorwhose payload remains a compact JSON string{ "code": "INVALID_INPUT", "message": "...", "request_id": "...", "context"?: {...} }.
Log Filtering (airflow_get_task_instance_logs)
Efficient incident triage with server-side filtering and normalized payloads:
Typical incident workflow:
Two-call pattern (required for
ti = airflow_get_task_instance(...)→ readti["attempts"]["try_number"].airflow_get_task_instance_logs(..., try_number=ti["attempts"]["try_number"]).
Sensors and reschedules can increment try_number without consuming retries; keeping this explicit prevents the server from making incorrect assumptions about the “latest attempt.”
Sensors treat each reschedule as another attempt, so try_number is best interpreted as an attempt index rather than “number of retries consumed.” The derived retries_consumed / retries_remaining fields are heuristics based on configured retries and may not match Airflow’s notion for long-running sensors—always inspect the task metadata if you need authoritative counts.
Parameters:
filter_level:"error"(strict) |"warning"(includes errors) |"info"(all levels)context_lines: Symmetric context (N before + N after each match), clamped to [0, 1000]tail_lines: Extract last N lines before filtering, clamped to [0, 100K]max_bytes: Hard cap (default: 100KB ≈ 25K tokens)Numeric inputs provided as floats/strings are coerced and clamped server-side so clients can pass user-entered values safely.
Response shape:
log: Single normalized string. When the Airflow API returns host-segmented logs, headers of the form--- [worker-1] ---(or--- [unknown-host] ---) are inserted with blank lines between segments so LLMs can reason about execution locality.truncated:trueif output exceededmax_bytesauto_tailed:trueif log >100MB triggered automatic tail to last 10K linesmatch_count: Number of lines matchingfilter_level(before context expansion)meta.filters: Echo of effective filters applied (shows clamped values)bytes_returned,original_lines,returned_lines: stats that align with the normalized text seen by the client
Failed Task Discovery (airflow_list_task_instances)
airflow_list_task_instances now exposes server-side filters that make the dedicated
airflow_get_failed_task_instance helper unnecessary. Use the same primitive for every
failed-task workflow:
Why the change?
A single tool stays composable: filter by
state,task_ids, or both.The response includes
filters(echo) andtotal_entrieswhen the API shares it, so callers can detect whether additional pagination is needed while still getting a filteredcount.Agents that previously relied on
airflow_get_failed_task_instanceshould migrate toairflow_list_task_instances(state=["failed"])(optionally preceded byairflow_list_dag_runsto resolve the relevant run).
Task Instance Metadata (airflow_get_task_instance)
Pair metadata with logs for faster triage:
Highlights:
task_instance: State, host, operator, timings (with computedduration_ms)task_config: Owner, retries, retry_delay (when available)attempts: Current try, retries consumed/remainingrendered_fields(optional): Rendered template values with byte cap (max_rendered_bytes, default 100KB; truncated payload returns{ "_truncated": "Increase max_rendered_bytes" })ui_url: Directgridandloglinks for the task attempt
Client Notes
MCP clients (e.g., IDEs) should call tools by name with JSON args and handle JSON responses.
For URL-based calls, the server validates the hostname against the configured registry (SSRF guard).
Write tools are annotated so MCP clients will prompt for confirmation before execution.
Add to MCP clients
Cursor – stdio (spawn on demand)
Add an entry to ~/.cursor/mcp.json. Cursor will spawn the server process and communicate over stdio.
Notes:
Replace
/path/to/apache-airflow-mcp-serverand/path/to/instances.yamlwith your paths.No long-running server is required; Cursor starts the process per session.
Cursor – local HTTP
Run the server yourself, then point Cursor at the HTTP endpoint.
Start the server:
In
~/.cursor/mcp.json:
Cursor – remote HTTP
Point Cursor at a deployed server. Optional headers can be added if fronted by an auth proxy.
Other clients
Many MCP clients accept either a stdio command or an HTTP URL; the examples above generalize.
A
fastmcp.jsonis included for FastMCP-aware tooling that can auto-discover entrypoints.
Development
Testing strategy:
No real network calls; tests patch
airflow_mcp.client_factory._import_airflow_client.Reset registry cache in fixtures; assert
request_idpresence, URL precedence, and structured log fields.
Observability:
Structured logs with
tool_start,tool_success/tool_error,duration_ms,response_bytes, and context fields.
License
Apache 2.0 - see LICENSE for details.