Skip to main content
Glama
nikhil-ganage

MCP Server Airflow Token

list_task_instances

Retrieve task instances for Apache Airflow DAG runs by specifying DAG ID and run ID, with optional filters for execution dates, states, durations, and other parameters to monitor workflow execution.

Instructions

List task instances by DAG ID and DAG run ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
dag_run_idYes
execution_date_gteNo
execution_date_lteNo
start_date_gteNo
start_date_lteNo
end_date_gteNo
end_date_lteNo
updated_at_gteNo
updated_at_lteNo
duration_gteNo
duration_lteNo
stateNo
poolNo
queueNo
limitNo
offsetNo

Implementation Reference

  • The main handler function for the 'list_task_instances' tool. It constructs filter parameters and calls the Airflow TaskInstanceApi to retrieve task instances, returning the response as text content.
    async def list_task_instances( dag_id: str, dag_run_id: str, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, queue: Optional[List[str]] = None, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if duration_gte is not None: kwargs["duration_gte"] = duration_gte if duration_lte is not None: kwargs["duration_lte"] = duration_lte if state is not None: kwargs["state"] = state if pool is not None: kwargs["pool"] = pool if queue is not None: kwargs["queue"] = queue if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset response = task_instance_api.get_task_instances(dag_id=dag_id, dag_run_id=dag_run_id, **kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
  • Local registration of the 'list_task_instances' tool as part of the get_all_functions list, which provides the function, name, description, and read-only flag for MCP tool registration.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_task_instance, "get_task_instance", "Get a task instance by DAG ID, task ID, and DAG run ID", True), (list_task_instances, "list_task_instances", "List task instances by DAG ID and DAG run ID", True), ( update_task_instance, "update_task_instance", "Update a task instance by DAG ID, DAG run ID, and task ID", False, ), ]
  • src/main.py:56-99 (registration)
    Global registration loop in the main CLI entrypoint where tools from get_all_functions (including list_task_instances via taskinstance import) are added to the MCP app using app.add_tool.
    @click.command() @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) @click.option( "--apis", type=click.Choice([api.value for api in APIType]), default=[api.value for api in APIType], multiple=True, help="APIs to run, default is all", ) @click.option( "--read-only", is_flag=True, help="Only expose read-only tools (GET operations, no CREATE/UPDATE/DELETE)", ) def main(transport: str, apis: list[str], read_only: bool) -> None: from src.server import app for api in apis: logging.debug(f"Adding API: {api}") get_function = APITYPE_TO_FUNCTIONS[APIType(api)] try: functions = get_function() except NotImplementedError: continue # Filter functions for read-only mode if requested if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description) if transport == "sse": logging.debug("Starting MCP server for Apache Airflow with SSE transport") app.run(transport="sse") else: logging.debug("Starting MCP server for Apache Airflow with stdio transport") app.run(transport="stdio")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server