Skip to main content
Glama

MCP Server Airflow Token

list_task_instances

Retrieve Airflow task instances for a specific DAG and DAG run, with filtering options for execution dates, state, duration, and other parameters to monitor workflow execution.

Instructions

List task instances by DAG ID and DAG run ID

Input Schema

NameRequiredDescriptionDefault
dag_idYes
dag_run_idYes
duration_gteNo
duration_lteNo
end_date_gteNo
end_date_lteNo
execution_date_gteNo
execution_date_lteNo
limitNo
offsetNo
poolNo
queueNo
start_date_gteNo
start_date_lteNo
stateNo
updated_at_gteNo
updated_at_lteNo

Input Schema (JSON Schema)

{ "properties": { "dag_id": { "title": "Dag Id", "type": "string" }, "dag_run_id": { "title": "Dag Run Id", "type": "string" }, "duration_gte": { "anyOf": [ { "type": "number" }, { "type": "null" } ], "default": null, "title": "Duration Gte" }, "duration_lte": { "anyOf": [ { "type": "number" }, { "type": "null" } ], "default": null, "title": "Duration Lte" }, "end_date_gte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "End Date Gte" }, "end_date_lte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "End Date Lte" }, "execution_date_gte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Execution Date Gte" }, "execution_date_lte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Execution Date Lte" }, "limit": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Limit" }, "offset": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Offset" }, "pool": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Pool" }, "queue": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Queue" }, "start_date_gte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Start Date Gte" }, "start_date_lte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Start Date Lte" }, "state": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "State" }, "updated_at_gte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Updated At Gte" }, "updated_at_lte": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Updated At Lte" } }, "required": [ "dag_id", "dag_run_id" ], "type": "object" }

Implementation Reference

  • The main handler function for the 'list_task_instances' tool. It accepts DAG ID, DAG run ID, and various optional filters, builds a kwargs dict for them, calls the Airflow TaskInstanceApi.get_task_instances, and returns the response as TextContent.
    async def list_task_instances( dag_id: str, dag_run_id: str, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, queue: Optional[List[str]] = None, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if duration_gte is not None: kwargs["duration_gte"] = duration_gte if duration_lte is not None: kwargs["duration_lte"] = duration_lte if state is not None: kwargs["state"] = state if pool is not None: kwargs["pool"] = pool if queue is not None: kwargs["queue"] = queue if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset response = task_instance_api.get_task_instances(dag_id=dag_id, dag_run_id=dag_run_id, **kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
  • The get_all_functions() lists all task instance tools for registration, including the tuple for 'list_task_instances' (function, name, description, read_only=True).
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_task_instance, "get_task_instance", "Get a task instance by DAG ID, task ID, and DAG run ID", True), (list_task_instances, "list_task_instances", "List task instances by DAG ID and DAG run ID", True), ( update_task_instance, "update_task_instance", "Update a task instance by DAG ID, DAG run ID, and task ID", False, ), ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server