MCP Server for Apache Airflow

from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Union import mcp.types as types from airflow_client.client.api.dag_run_api import DAGRunApi from airflow_client.client.model.clear_dag_run import ClearDagRun from airflow_client.client.model.dag_run import DAGRun from airflow_client.client.model.set_dag_run_note import SetDagRunNote from airflow_client.client.model.update_dag_run_state import UpdateDagRunState from src.airflow.airflow_client import api_client from src.envs import AIRFLOW_HOST dag_run_api = DAGRunApi(api_client) def get_all_functions() -> list[tuple[Callable, str, str]]: return [ (post_dag_run, "post_dag_run", "Trigger a DAG by ID"), (get_dag_runs, "get_dag_runs", "Get DAG runs by ID"), (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)"), (get_dag_run, "get_dag_run", "Get a DAG run by DAG ID and DAG run ID"), (update_dag_run_state, "update_dag_run_state", "Update a DAG run state by DAG ID and DAG run ID"), (delete_dag_run, "delete_dag_run", "Delete a DAG run by DAG ID and DAG run ID"), (clear_dag_run, "clear_dag_run", "Clear a DAG run"), (set_dag_run_note, "set_dag_run_note", "Update the DagRun note"), (get_upstream_dataset_events, "get_upstream_dataset_events", "Get dataset events for a DAG run"), ] def get_dag_run_url(dag_id: str, dag_run_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}" async def post_dag_run( dag_id: str, dag_run_id: Optional[str] = None, data_interval_end: Optional[datetime] = None, data_interval_start: Optional[datetime] = None, end_date: Optional[datetime] = None, execution_date: Optional[datetime] = None, external_trigger: Optional[bool] = None, last_scheduling_decision: Optional[datetime] = None, logical_date: Optional[datetime] = None, note: Optional[str] = None, run_type: Optional[str] = None, start_date: Optional[datetime] = None, # state: Optional[str] = None, # TODO: add state ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: dag_run = DAGRun( dag_id=dag_id, dag_run_id=dag_run_id, data_interval_end=data_interval_end, data_interval_start=data_interval_start, end_date=end_date, execution_date=execution_date, external_trigger=external_trigger, last_scheduling_decision=last_scheduling_decision, logical_date=logical_date, note=note, run_type=run_type, start_date=start_date, state=None, ) response = dag_run_api.post_dag_run(dag_id=dag_id, dag_run=dag_run) return [types.TextContent(type="text", text=str(response.to_dict()))] async def get_dag_runs( dag_id: str, limit: Optional[int] = None, offset: Optional[int] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if state is not None: kwargs["state"] = state if order_by is not None: kwargs["order_by"] = order_by response = dag_run_api.get_dag_runs(dag_id=dag_id, **kwargs) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_id, dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))] async def get_dag_runs_batch( dag_ids: Optional[List[str]] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, page_offset: Optional[int] = None, page_limit: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build request dictionary request: Dict[str, Any] = {} if dag_ids is not None: request["dag_ids"] = dag_ids if execution_date_gte is not None: request["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: request["execution_date_lte"] = execution_date_lte if start_date_gte is not None: request["start_date_gte"] = start_date_gte if start_date_lte is not None: request["start_date_lte"] = start_date_lte if end_date_gte is not None: request["end_date_gte"] = end_date_gte if end_date_lte is not None: request["end_date_lte"] = end_date_lte if state is not None: request["state"] = state if order_by is not None: request["order_by"] = order_by if page_offset is not None: request["page_offset"] = page_offset if page_limit is not None: request["page_limit"] = page_limit response = dag_run_api.get_dag_runs_batch(list_dag_runs_form=request) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_run["dag_id"], dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))] async def get_dag_run( dag_id: str, dag_run_id: str ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: response = dag_run_api.get_dag_run(dag_id=dag_id, dag_run_id=dag_run_id) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI link to DAG run response_dict["ui_url"] = get_dag_run_url(dag_id, dag_run_id) return [types.TextContent(type="text", text=str(response_dict))] async def update_dag_run_state( dag_id: str, dag_run_id: str, state: Optional[str] = None ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: update_dag_run_state = UpdateDagRunState(state=state) response = dag_run_api.update_dag_run_state( dag_id=dag_id, dag_run_id=dag_run_id, update_dag_run_state=update_dag_run_state, ) return [types.TextContent(type="text", text=str(response.to_dict()))] async def delete_dag_run( dag_id: str, dag_run_id: str ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: response = dag_run_api.delete_dag_run(dag_id=dag_id, dag_run_id=dag_run_id) return [types.TextContent(type="text", text=str(response.to_dict()))] async def clear_dag_run( dag_id: str, dag_run_id: str, dry_run: Optional[bool] = None ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: clear_dag_run = ClearDagRun(dry_run=dry_run) response = dag_run_api.clear_dag_run(dag_id=dag_id, dag_run_id=dag_run_id, clear_dag_run=clear_dag_run) return [types.TextContent(type="text", text=str(response.to_dict()))] async def set_dag_run_note( dag_id: str, dag_run_id: str, note: str ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: set_dag_run_note = SetDagRunNote(note=note) response = dag_run_api.set_dag_run_note(dag_id=dag_id, dag_run_id=dag_run_id, set_dag_run_note=set_dag_run_note) return [types.TextContent(type="text", text=str(response.to_dict()))] async def get_upstream_dataset_events( dag_id: str, dag_run_id: str ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: response = dag_run_api.get_upstream_dataset_events(dag_id=dag_id, dag_run_id=dag_run_id) return [types.TextContent(type="text", text=str(response.to_dict()))]