list_task_instances
Retrieve Airflow task instances for a specific DAG and DAG run, with filtering options for execution dates, state, duration, and other parameters to monitor workflow execution.
Instructions
List task instances by DAG ID and DAG run ID
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dag_run_id | Yes | ||
| duration_gte | No | ||
| duration_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| limit | No | ||
| offset | No | ||
| pool | No | ||
| queue | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| state | No | ||
| updated_at_gte | No | ||
| updated_at_lte | No |
Input Schema (JSON Schema)
{
"properties": {
"dag_id": {
"title": "Dag Id",
"type": "string"
},
"dag_run_id": {
"title": "Dag Run Id",
"type": "string"
},
"duration_gte": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Duration Gte"
},
"duration_lte": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"title": "Duration Lte"
},
"end_date_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "End Date Gte"
},
"end_date_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "End Date Lte"
},
"execution_date_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Execution Date Gte"
},
"execution_date_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Execution Date Lte"
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Limit"
},
"offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Offset"
},
"pool": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"title": "Pool"
},
"queue": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"title": "Queue"
},
"start_date_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Start Date Gte"
},
"start_date_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Start Date Lte"
},
"state": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"title": "State"
},
"updated_at_gte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Updated At Gte"
},
"updated_at_lte": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Updated At Lte"
}
},
"required": [
"dag_id",
"dag_run_id"
],
"type": "object"
}
Implementation Reference
- src/airflow/taskinstance.py:32-85 (handler)The main handler function for the 'list_task_instances' tool. It accepts DAG ID, DAG run ID, and various optional filters, builds a kwargs dict for them, calls the Airflow TaskInstanceApi.get_task_instances, and returns the response as TextContent.async def list_task_instances( dag_id: str, dag_run_id: str, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, queue: Optional[List[str]] = None, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if duration_gte is not None: kwargs["duration_gte"] = duration_gte if duration_lte is not None: kwargs["duration_lte"] = duration_lte if state is not None: kwargs["state"] = state if pool is not None: kwargs["pool"] = pool if queue is not None: kwargs["queue"] = queue if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset response = task_instance_api.get_task_instances(dag_id=dag_id, dag_run_id=dag_run_id, **kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/taskinstance.py:11-22 (registration)The get_all_functions() lists all task instance tools for registration, including the tuple for 'list_task_instances' (function, name, description, read_only=True).def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_task_instance, "get_task_instance", "Get a task instance by DAG ID, task ID, and DAG run ID", True), (list_task_instances, "list_task_instances", "List task instances by DAG ID and DAG run ID", True), ( update_task_instance, "update_task_instance", "Update a task instance by DAG ID, DAG run ID, and task ID", False, ), ]