Skip to main content
Glama
nikhil-ganage

MCP Server Airflow Token

clear_task_instances

Clear specific task instances in Airflow workflows to resolve failures or restart jobs, allowing precise control over which tasks to reset based on criteria like date ranges and dependencies.

Instructions

Clear a set of task instances

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
task_idsNo
start_dateNo
end_dateNo
include_subdagsNo
include_parentdagNo
include_upstreamNo
include_downstreamNo
include_futureNo
include_pastNo
dry_runNo
reset_dag_runsNo

Implementation Reference

  • The main handler function for the 'clear_task_instances' tool. It constructs a ClearTaskInstances object from input parameters and calls the Airflow DAG API to clear the specified task instances.
    async def clear_task_instances( dag_id: str, task_ids: Optional[List[str]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, include_subdags: Optional[bool] = None, include_parentdag: Optional[bool] = None, include_upstream: Optional[bool] = None, include_downstream: Optional[bool] = None, include_future: Optional[bool] = None, include_past: Optional[bool] = None, dry_run: Optional[bool] = None, reset_dag_runs: Optional[bool] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: clear_request = {} if task_ids is not None: clear_request["task_ids"] = task_ids if start_date is not None: clear_request["start_date"] = start_date if end_date is not None: clear_request["end_date"] = end_date if include_subdags is not None: clear_request["include_subdags"] = include_subdags if include_parentdag is not None: clear_request["include_parentdag"] = include_parentdag if include_upstream is not None: clear_request["include_upstream"] = include_upstream if include_downstream is not None: clear_request["include_downstream"] = include_downstream if include_future is not None: clear_request["include_future"] = include_future if include_past is not None: clear_request["include_past"] = include_past if dry_run is not None: clear_request["dry_run"] = dry_run if reset_dag_runs is not None: clear_request["reset_dag_runs"] = reset_dag_runs clear_task_instances = ClearTaskInstances(**clear_request) response = dag_api.post_clear_task_instances(dag_id=dag_id, clear_task_instances=clear_task_instances) return [types.TextContent(type="text", text=str(response.to_dict()))]
  • The get_all_functions in dag.py includes the registration tuple for clear_task_instances, which is later used in main.py to add the tool.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dags, "fetch_dags", "Fetch all DAGs", True), (get_dag, "get_dag", "Get a DAG by ID", True), (get_dag_details, "get_dag_details", "Get a simplified representation of DAG", True), (get_dag_source, "get_dag_source", "Get a source code", True), (pause_dag, "pause_dag", "Pause a DAG by ID", False), (unpause_dag, "unpause_dag", "Unpause a DAG by ID", False), (get_dag_tasks, "get_dag_tasks", "Get tasks for DAG", True), (get_task, "get_task", "Get a task by ID", True), (get_tasks, "get_tasks", "Get tasks for DAG", True), (patch_dag, "patch_dag", "Update a DAG", False), (patch_dags, "patch_dags", "Update multiple DAGs", False), (delete_dag, "delete_dag", "Delete a DAG", False), (clear_task_instances, "clear_task_instances", "Clear a set of task instances", False), (set_task_instances_state, "set_task_instances_state", "Set a state of task instances", False), (reparse_dag_file, "reparse_dag_file", "Request re-parsing of a DAG file", False), ]
  • src/main.py:90-92 (registration)
    The generic registration loop in main.py where tools from all modules, including clear_task_instances from dag.py, are added to the MCP app using app.add_tool.
    for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server