list_task_instances
Retrieve task instances for Apache Airflow DAG runs by specifying DAG ID and run ID, with optional filters for execution dates, states, durations, and other parameters to monitor workflow execution.
Instructions
List task instances by DAG ID and DAG run ID
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dag_run_id | Yes | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| updated_at_gte | No | ||
| updated_at_lte | No | ||
| duration_gte | No | ||
| duration_lte | No | ||
| state | No | ||
| pool | No | ||
| queue | No | ||
| limit | No | ||
| offset | No |
Implementation Reference
- src/airflow/taskinstance.py:32-85 (handler)The main handler function for the 'list_task_instances' tool. It constructs filter parameters and calls the Airflow TaskInstanceApi to retrieve task instances, returning the response as text content.async def list_task_instances( dag_id: str, dag_run_id: str, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, queue: Optional[List[str]] = None, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if duration_gte is not None: kwargs["duration_gte"] = duration_gte if duration_lte is not None: kwargs["duration_lte"] = duration_lte if state is not None: kwargs["state"] = state if pool is not None: kwargs["pool"] = pool if queue is not None: kwargs["queue"] = queue if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset response = task_instance_api.get_task_instances(dag_id=dag_id, dag_run_id=dag_run_id, **kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/taskinstance.py:11-22 (registration)Local registration of the 'list_task_instances' tool as part of the get_all_functions list, which provides the function, name, description, and read-only flag for MCP tool registration.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_task_instance, "get_task_instance", "Get a task instance by DAG ID, task ID, and DAG run ID", True), (list_task_instances, "list_task_instances", "List task instances by DAG ID and DAG run ID", True), ( update_task_instance, "update_task_instance", "Update a task instance by DAG ID, DAG run ID, and task ID", False, ), ]
- src/main.py:56-99 (registration)Global registration loop in the main CLI entrypoint where tools from get_all_functions (including list_task_instances via taskinstance import) are added to the MCP app using app.add_tool.@click.command() @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) @click.option( "--apis", type=click.Choice([api.value for api in APIType]), default=[api.value for api in APIType], multiple=True, help="APIs to run, default is all", ) @click.option( "--read-only", is_flag=True, help="Only expose read-only tools (GET operations, no CREATE/UPDATE/DELETE)", ) def main(transport: str, apis: list[str], read_only: bool) -> None: from src.server import app for api in apis: logging.debug(f"Adding API: {api}") get_function = APITYPE_TO_FUNCTIONS[APIType(api)] try: functions = get_function() except NotImplementedError: continue # Filter functions for read-only mode if requested if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description) if transport == "sse": logging.debug("Starting MCP server for Apache Airflow with SSE transport") app.run(transport="sse") else: logging.debug("Starting MCP server for Apache Airflow with stdio transport") app.run(transport="stdio")