clear_task_instances
Clear specific task instances in Airflow workflows to resolve failures or restart jobs, allowing precise control over which tasks to reset based on criteria like date ranges and dependencies.
Instructions
Clear a set of task instances
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| task_ids | No | ||
| start_date | No | ||
| end_date | No | ||
| include_subdags | No | ||
| include_parentdag | No | ||
| include_upstream | No | ||
| include_downstream | No | ||
| include_future | No | ||
| include_past | No | ||
| dry_run | No | ||
| reset_dag_runs | No |
Implementation Reference
- src/airflow/dag.py:190-231 (handler)The main handler function for the 'clear_task_instances' tool. It constructs a ClearTaskInstances object from input parameters and calls the Airflow DAG API to clear the specified task instances.async def clear_task_instances( dag_id: str, task_ids: Optional[List[str]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, include_subdags: Optional[bool] = None, include_parentdag: Optional[bool] = None, include_upstream: Optional[bool] = None, include_downstream: Optional[bool] = None, include_future: Optional[bool] = None, include_past: Optional[bool] = None, dry_run: Optional[bool] = None, reset_dag_runs: Optional[bool] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: clear_request = {} if task_ids is not None: clear_request["task_ids"] = task_ids if start_date is not None: clear_request["start_date"] = start_date if end_date is not None: clear_request["end_date"] = end_date if include_subdags is not None: clear_request["include_subdags"] = include_subdags if include_parentdag is not None: clear_request["include_parentdag"] = include_parentdag if include_upstream is not None: clear_request["include_upstream"] = include_upstream if include_downstream is not None: clear_request["include_downstream"] = include_downstream if include_future is not None: clear_request["include_future"] = include_future if include_past is not None: clear_request["include_past"] = include_past if dry_run is not None: clear_request["dry_run"] = dry_run if reset_dag_runs is not None: clear_request["reset_dag_runs"] = reset_dag_runs clear_task_instances = ClearTaskInstances(**clear_request) response = dag_api.post_clear_task_instances(dag_id=dag_id, clear_task_instances=clear_task_instances) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/dag.py:15-33 (registration)The get_all_functions in dag.py includes the registration tuple for clear_task_instances, which is later used in main.py to add the tool.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]
- src/main.py:90-92 (registration)The generic registration loop in main.py where tools from all modules, including clear_task_instances from dag.py, are added to the MCP app using app.add_tool.for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description)