set_task_instances_state
Modify the state of task instances in Apache Airflow by specifying DAG IDs, task IDs, and execution dates. Supports optional parameters to include upstream, downstream, past, or future tasks, and enables dry-run testing.
Instructions
Set a state of task instances
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dry_run | No | ||
| execution_date | No | ||
| include_downstream | No | ||
| include_future | No | ||
| include_past | No | ||
| include_upstream | No | ||
| state | Yes | ||
| task_ids | No |
Implementation Reference
- src/airflow/dag.py:236-269 (handler)The main handler function for the 'set_task_instances_state' tool. It constructs an UpdateTaskInstancesState object from the input parameters and calls the Airflow DAG API to set the state of task instances.async def set_task_instances_state( dag_id: str, state: str, task_ids: Optional[List[str]] = None, execution_date: Optional[str] = None, include_upstream: Optional[bool] = None, include_downstream: Optional[bool] = None, include_future: Optional[bool] = None, include_past: Optional[bool] = None, dry_run: Optional[bool] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: state_request = {"state": state} if task_ids is not None: state_request["task_ids"] = task_ids if execution_date is not None: state_request["execution_date"] = execution_date if include_upstream is not None: state_request["include_upstream"] = include_upstream if include_downstream is not None: state_request["include_downstream"] = include_downstream if include_future is not None: state_request["include_future"] = include_future if include_past is not None: state_request["include_past"] = include_past if dry_run is not None: state_request["dry_run"] = dry_run update_task_instances_state = UpdateTaskInstancesState(**state_request) response = dag_api.post_set_task_instances_state( dag_id=dag_id, update_task_instances_state=update_task_instances_state, ) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/dag.py:15-33 (registration)Registration of all Airflow DAG tools, including the tuple for 'set_task_instances_state' which provides the function reference, tool name, description, and read-only flag.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]