MCP Server for Apache Airflow
by yangkyeongmo
- mcp-server-apache-airflow
- src
- airflow
from typing import Any, Callable, Dict, List, Optional, Union
import mcp.types as types
from airflow_client.client.api.dataset_api import DatasetApi
from src.airflow.airflow_client import api_client
dataset_api = DatasetApi(api_client)
def get_all_functions() -> list[tuple[Callable, str, str]]:
return [
(get_datasets, "get_datasets", "List datasets"),
(get_dataset, "get_dataset", "Get a dataset by URI"),
(get_dataset_events, "get_dataset_events", "Get dataset events"),
(create_dataset_event, "create_dataset_event", "Create dataset event"),
(get_dag_dataset_queued_event, "get_dag_dataset_queued_event", "Get a queued Dataset event for a DAG"),
(get_dag_dataset_queued_events, "get_dag_dataset_queued_events", "Get queued Dataset events for a DAG"),
(delete_dag_dataset_queued_event, "delete_dag_dataset_queued_event", "Delete a queued Dataset event for a DAG"),
(
delete_dag_dataset_queued_events,
"delete_dag_dataset_queued_events",
"Delete queued Dataset events for a DAG",
),
(get_dataset_queued_events, "get_dataset_queued_events", "Get queued Dataset events for a Dataset"),
(delete_dataset_queued_events, "delete_dataset_queued_events", "Delete queued Dataset events for a Dataset"),
]
async def get_datasets(
limit: Optional[int] = None,
offset: Optional[int] = None,
order_by: Optional[str] = None,
uri_pattern: Optional[str] = None,
dag_ids: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
# Build parameters dictionary
kwargs: Dict[str, Any] = {}
if limit is not None:
kwargs["limit"] = limit
if offset is not None:
kwargs["offset"] = offset
if order_by is not None:
kwargs["order_by"] = order_by
if uri_pattern is not None:
kwargs["uri_pattern"] = uri_pattern
if dag_ids is not None:
kwargs["dag_ids"] = dag_ids
response = dataset_api.get_datasets(**kwargs)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def get_dataset(
uri: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
response = dataset_api.get_dataset(uri=uri)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def get_dataset_events(
limit: Optional[int] = None,
offset: Optional[int] = None,
order_by: Optional[str] = None,
dataset_id: Optional[int] = None,
source_dag_id: Optional[str] = None,
source_task_id: Optional[str] = None,
source_run_id: Optional[str] = None,
source_map_index: Optional[int] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
# Build parameters dictionary
kwargs: Dict[str, Any] = {}
if limit is not None:
kwargs["limit"] = limit
if offset is not None:
kwargs["offset"] = offset
if order_by is not None:
kwargs["order_by"] = order_by
if dataset_id is not None:
kwargs["dataset_id"] = dataset_id
if source_dag_id is not None:
kwargs["source_dag_id"] = source_dag_id
if source_task_id is not None:
kwargs["source_task_id"] = source_task_id
if source_run_id is not None:
kwargs["source_run_id"] = source_run_id
if source_map_index is not None:
kwargs["source_map_index"] = source_map_index
response = dataset_api.get_dataset_events(**kwargs)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def create_dataset_event(
dataset_uri: str,
extra: Optional[Dict[str, Any]] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
event_request = {
"dataset_uri": dataset_uri,
}
if extra is not None:
event_request["extra"] = extra
response = dataset_api.create_dataset_event(create_dataset_event=event_request)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def get_dag_dataset_queued_event(
dag_id: str,
uri: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
response = dataset_api.get_dag_dataset_queued_event(dag_id=dag_id, uri=uri)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def get_dag_dataset_queued_events(
dag_id: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
response = dataset_api.get_dag_dataset_queued_events(dag_id=dag_id)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def delete_dag_dataset_queued_event(
dag_id: str,
uri: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
response = dataset_api.delete_dag_dataset_queued_event(dag_id=dag_id, uri=uri)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def delete_dag_dataset_queued_events(
dag_id: str,
before: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
kwargs: Dict[str, Any] = {}
if before is not None:
kwargs["before"] = before
response = dataset_api.delete_dag_dataset_queued_events(dag_id=dag_id, **kwargs)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def get_dataset_queued_events(
uri: str,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
response = dataset_api.get_dataset_queued_events(uri=uri)
return [types.TextContent(type="text", text=str(response.to_dict()))]
async def delete_dataset_queued_events(
uri: str,
before: Optional[str] = None,
) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
kwargs: Dict[str, Any] = {}
if before is not None:
kwargs["before"] = before
response = dataset_api.delete_dataset_queued_events(uri=uri, **kwargs)
return [types.TextContent(type="text", text=str(response.to_dict()))]