list_task_instances_all
Retrieve task instances from Apache Airflow with filtering by DAG, run, task, date ranges, duration, state, and pool parameters to monitor workflow execution.
Instructions
[Tool Role]: Lists task instances with comprehensive filtering options.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | No | ||
| dag_run_id | No | ||
| task_id | No | ||
| execution_date_gte | No | ||
| execution_date_lte | No | ||
| start_date_gte | No | ||
| start_date_lte | No | ||
| end_date_gte | No | ||
| end_date_lte | No | ||
| duration_gte | No | ||
| duration_lte | No | ||
| state | No | ||
| pool | No | ||
| limit | No | ||
| offset | No |
Implementation Reference
- Core handler function for the 'list_task_instances_all' tool. Queries the Airflow /taskInstances API endpoint with dynamic filtering parameters for comprehensive task instance listing.@mcp.tool() async def list_task_instances_all( dag_id: Optional[str] = None, dag_run_id: Optional[str] = None, task_id: Optional[str] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, limit: int = 100, offset: int = 0 ) -> Dict[str, Any]: """[Tool Role]: Lists task instances with comprehensive filtering options.""" params = {'limit': limit, 'offset': offset} # Add all filter parameters if dag_id: params['dag_id'] = dag_id if dag_run_id: params['dag_run_id'] = dag_run_id if task_id: params['task_id'] = task_id if execution_date_gte: params['execution_date_gte'] = execution_date_gte if execution_date_lte: params['execution_date_lte'] = execution_date_lte if start_date_gte: params['start_date_gte'] = start_date_gte if start_date_lte: params['start_date_lte'] = start_date_lte if end_date_gte: params['end_date_gte'] = end_date_gte if end_date_lte: params['end_date_lte'] = end_date_lte if duration_gte: params['duration_gte'] = duration_gte if duration_lte: params['duration_lte'] = duration_lte if state: params['state'] = state if pool: params['pool'] = pool query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/taskInstances?{query_string}") resp.raise_for_status() return resp.json()
- src/mcp_airflow_api/tools/v1_tools.py:19-27 (registration)Registers the common tools (including list_task_instances_all) for Airflow API v1 by setting the v1-specific airflow_request function and calling register_common_tools.# Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
- src/mcp_airflow_api/tools/v2_tools.py:20-25 (registration)Registers the common tools (including list_task_instances_all) for Airflow API v2 by setting the v2-specific airflow_request function and calling register_common_tools.# Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp)
- src/mcp_airflow_api/tools/common_tools.py:21-744 (registration)The register_common_tools function defines and registers all common MCP tools using @mcp.tool() decorators, including the list_task_instances_all handler.def register_common_tools(mcp): """Register all 43 common tools that work with both v1 and v2 APIs.""" if airflow_request is None: raise RuntimeError("airflow_request function must be set before registering common tools") logger.info("Registering common tools shared between v1 and v2") # Template & Prompt Management @mcp.tool() async def get_prompt_template(section: Optional[str] = None, mode: Optional[str] = None) -> str: """ [Tool Role]: Provides comprehensive prompt template for LLM interactions with Airflow operations. Args: section: Optional section name to get specific part of template mode: Optional mode (summary/detailed) to control response verbosity Returns: Comprehensive template or specific section for optimal LLM guidance """ template = read_prompt_template(PROMPT_TEMPLATE_PATH) if section: sections = parse_prompt_sections(template) for i, s in enumerate(sections): if section.lower() in s.lower(): return sections[i + 1] # +1 to skip the title section return f"Section '{section}' not found." return template # DAG Management (11 tools) @mcp.tool() async def list_dags(limit: int = 20, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None) -> Dict[str, Any]: """ [Tool Role]: Lists all DAGs registered in the Airflow cluster with pagination support. Args: limit: Maximum number of DAGs to return (default: 20) offset: Number of DAGs to skip for pagination (default: 0) fetch_all: If True, fetches all DAGs regardless of limit/offset id_contains: Filter DAGs by ID containing this string name_contains: Filter DAGs by display name containing this string Returns: Dict containing dags list, pagination info, and total counts """ return await list_dags_internal(limit, offset, fetch_all, id_contains, name_contains) @mcp.tool() async def get_dag(dag_id: str) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for a specific DAG. Args: dag_id: The DAG ID to get details for Returns: Comprehensive DAG details """ return await get_dag_detailed_info(dag_id) @mcp.tool() async def get_dags_detailed_batch( limit: int = 100, offset: int = 0, fetch_all: bool = False, id_contains: Optional[str] = None, name_contains: Optional[str] = None, is_active: Optional[bool] = None, is_paused: Optional[bool] = None ) -> Dict[str, Any]: """ [Tool Role]: Retrieves detailed information for multiple DAGs in batch with latest run information. """ dag_list_result = await list_dags_internal( limit=limit, offset=offset, fetch_all=fetch_all, id_contains=id_contains, name_contains=name_contains ) dags_basic = dag_list_result.get("dags", []) detailed_dags = [] success_count = 0 error_count = 0 errors = [] skipped_count = 0 for dag_basic in dags_basic: dag_id = dag_basic.get("dag_id") if not dag_id: skipped_count += 1 continue if is_active is not None and dag_basic.get("is_active") != is_active: skipped_count += 1 continue if is_paused is not None and dag_basic.get("is_paused") != is_paused: skipped_count += 1 continue try: detailed_dag = await get_dag_detailed_info(dag_id) try: latest_run_resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") latest_run_resp.raise_for_status() latest_runs = latest_run_resp.json().get("dag_runs", []) if latest_runs: latest_run = latest_runs[0] detailed_dag["latest_dag_run"] = { "run_id": latest_run.get("run_id"), "run_type": latest_run.get("run_type"), "state": latest_run.get("state"), "execution_date": latest_run.get("execution_date"), "start_date": latest_run.get("start_date"), "end_date": latest_run.get("end_date"), "data_interval_start": latest_run.get("data_interval_start"), "data_interval_end": latest_run.get("data_interval_end"), "external_trigger": latest_run.get("external_trigger"), "conf": latest_run.get("conf"), "note": latest_run.get("note") } else: detailed_dag["latest_dag_run"] = None except Exception: detailed_dag["latest_dag_run"] = None detailed_dags.append(detailed_dag) success_count += 1 except Exception as e: error_count += 1 errors.append({ "dag_id": dag_id, "error": str(e) }) return { "dags_detailed": detailed_dags, "total_processed": success_count, "total_available": dag_list_result.get("total_entries", 0), "returned_count": len(detailed_dags), "processing_stats": { "success_count": success_count, "error_count": error_count, "skipped_count": skipped_count, "errors": errors }, "applied_filters": { "id_contains": id_contains, "name_contains": name_contains, "is_active": is_active, "is_paused": is_paused, "limit": limit, "offset": offset, "fetch_all": fetch_all }, "pagination_info": dag_list_result.get("pagination_info", {}), "has_more_pages": dag_list_result.get("has_more_pages", False), "next_offset": dag_list_result.get("next_offset") } @mcp.tool() async def running_dags() -> Dict[str, Any]: """[Tool Role]: Lists all currently running DAG runs in the Airflow cluster.""" resp = await airflow_request("GET", "/dags/~/dagRuns?state=running&limit=1000&order_by=-start_date") resp.raise_for_status() data = resp.json() running_runs = [] for run in data.get("dag_runs", []): run_info = { "dag_id": run.get("dag_id"), "dag_display_name": run.get("dag_display_name"), "run_id": run.get("run_id"), "run_type": run.get("run_type"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "data_interval_start": run.get("data_interval_start"), "data_interval_end": run.get("data_interval_end"), "external_trigger": run.get("external_trigger"), "conf": run.get("conf"), "note": run.get("note") } running_runs.append(run_info) return { "dag_runs": running_runs, "total_running": len(running_runs), "query_info": { "state_filter": "running", "limit": 1000, "order_by": "start_date (descending)" } } @mcp.tool() async def failed_dags() -> Dict[str, Any]: """[Tool Role]: Lists all recently failed DAG runs in the Airflow cluster.""" resp = await airflow_request("GET", "/dags/~/dagRuns?state=failed&limit=1000&order_by=-start_date") resp.raise_for_status() data = resp.json() failed_runs = [] for run in data.get("dag_runs", []): run_info = { "dag_id": run.get("dag_id"), "dag_display_name": run.get("dag_display_name"), "run_id": run.get("run_id"), "run_type": run.get("run_type"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "data_interval_start": run.get("data_interval_start"), "data_interval_end": run.get("data_interval_end"), "external_trigger": run.get("external_trigger"), "conf": run.get("conf"), "note": run.get("note") } failed_runs.append(run_info) return { "dag_runs": failed_runs, "total_failed": len(failed_runs), "query_info": { "state_filter": "failed", "limit": 1000, "order_by": "start_date (descending)" } } @mcp.tool() async def trigger_dag(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Triggers a new DAG run for a specified Airflow DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("POST", f"/dags/{dag_id}/dagRuns", json={"conf": {}}) resp.raise_for_status() run = resp.json() return { "dag_id": dag_id, "run_id": run.get("run_id"), "state": run.get("state"), "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date") } @mcp.tool() async def pause_dag(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Pauses the specified Airflow DAG (prevents scheduling new runs).""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": True}) resp.raise_for_status() dag_data = resp.json() return { "dag_id": dag_id, "is_paused": dag_data.get("is_paused") } @mcp.tool() async def unpause_dag(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Unpauses the specified Airflow DAG (allows scheduling new runs).""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("PATCH", f"/dags/{dag_id}", json={"is_paused": False}) resp.raise_for_status() dag_data = resp.json() return { "dag_id": dag_id, "is_paused": dag_data.get("is_paused") } @mcp.tool() async def dag_graph(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Retrieves task graph structure for the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/tasks") resp.raise_for_status() tasks_data = resp.json() tasks = tasks_data.get("tasks", []) task_graph = {} for task in tasks: task_id = task.get("task_id") task_graph[task_id] = { "task_id": task_id, "task_type": task.get("class_ref", {}).get("class_name"), "downstream_task_ids": task.get("downstream_task_ids", []), "upstream_task_ids": task.get("upstream_task_ids", []) } return { "dag_id": dag_id, "task_graph": task_graph, "total_tasks": len(tasks), "task_relationships": { "nodes": list(task_graph.keys()), "edges": [(task_id, downstream) for task_id, task_info in task_graph.items() for downstream in task_info["downstream_task_ids"]] } } @mcp.tool() async def list_tasks(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Lists all tasks within the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/tasks") resp.raise_for_status() return resp.json() @mcp.tool() async def dag_code(dag_id: str) -> Dict[str, Any]: """[Tool Role]: Retrieves the source code for the specified DAG.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dagSources/{dag_id}") resp.raise_for_status() return resp.json() # Continue with remaining tools... # Event/Log Management (3 tools) @mcp.tool() async def list_event_logs(dag_id: Optional[str] = None, limit: int = 20, offset: int = 0) -> Dict[str, Any]: """[Tool Role]: Lists event logs from Airflow.""" params = {'limit': limit, 'offset': offset} if dag_id: params['dag_id'] = dag_id query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/eventLogs?{query_string}") resp.raise_for_status() return resp.json() @mcp.tool() async def get_event_log(event_log_id: int) -> Dict[str, Any]: """[Tool Role]: Retrieves a specific event log entry.""" resp = await airflow_request("GET", f"/eventLogs/{event_log_id}") resp.raise_for_status() return resp.json() @mcp.tool() async def all_dag_event_summary() -> Dict[str, Any]: """[Tool Role]: Provides summary of event logs across all DAGs.""" resp = await airflow_request("GET", "/eventLogs?limit=1000") resp.raise_for_status() data = resp.json() event_summary = {} for event in data.get("event_logs", []): dag_id = event.get("dag_id", "unknown") event_type = event.get("event", "unknown") if dag_id not in event_summary: event_summary[dag_id] = {} if event_type not in event_summary[dag_id]: event_summary[dag_id][event_type] = 0 event_summary[dag_id][event_type] += 1 return { "event_summary": event_summary, "total_events": len(data.get("event_logs", [])), "unique_dags": len(event_summary) } # Import Error Management (3 tools) @mcp.tool() async def list_import_errors(limit: int = 20, offset: int = 0) -> Dict[str, Any]: """[Tool Role]: Lists import errors in Airflow.""" params = {'limit': limit, 'offset': offset} query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/importErrors?{query_string}") resp.raise_for_status() return resp.json() @mcp.tool() async def get_import_error(import_error_id: int) -> Dict[str, Any]: """[Tool Role]: Retrieves a specific import error.""" resp = await airflow_request("GET", f"/importErrors/{import_error_id}") resp.raise_for_status() return resp.json() @mcp.tool() async def all_dag_import_summary() -> Dict[str, Any]: """[Tool Role]: Provides summary of import errors across all DAGs.""" resp = await airflow_request("GET", "/importErrors?limit=1000") resp.raise_for_status() data = resp.json() import_summary = {} for error in data.get("import_errors", []): filename = error.get("filename", "unknown") if filename not in import_summary: import_summary[filename] = [] import_summary[filename].append({ "id": error.get("id"), "timestamp": error.get("timestamp"), "stacktrace": error.get("stacktrace", "")[:200] + "..." }) return { "import_errors_summary": import_summary, "total_errors": len(data.get("import_errors", [])), "files_with_errors": len(import_summary) } # Analysis/Statistics (3 tools) @mcp.tool() async def dag_run_duration(dag_id: str, limit: int = 10) -> Dict[str, Any]: """[Tool Role]: Analyzes DAG run durations and performance metrics.""" if not dag_id: raise ValueError("dag_id must not be empty") resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit={limit}&order_by=-execution_date") resp.raise_for_status() data = resp.json() runs = data.get("dag_runs", []) durations = [] for run in runs: start_date = run.get("start_date") end_date = run.get("end_date") if start_date and end_date: from datetime import datetime start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end = datetime.fromisoformat(end_date.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() durations.append({ "run_id": run.get("run_id"), "duration_seconds": duration_seconds, "state": run.get("state"), "execution_date": run.get("execution_date") }) avg_duration = sum(d["duration_seconds"] for d in durations) / len(durations) if durations else 0 return { "dag_id": dag_id, "run_durations": durations, "statistics": { "average_duration_seconds": avg_duration, "total_analyzed_runs": len(durations), "fastest_run": min(durations, key=lambda x: x["duration_seconds"]) if durations else None, "slowest_run": max(durations, key=lambda x: x["duration_seconds"]) if durations else None } } @mcp.tool() async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]: """[Tool Role]: Analyzes task durations within a DAG run.""" if not dag_id: raise ValueError("dag_id must not be empty") if not dag_run_id: # Get the latest run resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") resp.raise_for_status() runs = resp.json().get("dag_runs", []) if not runs: return {"error": f"No DAG runs found for DAG {dag_id}"} dag_run_id = runs[0]["run_id"] resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances") resp.raise_for_status() data = resp.json() task_durations = [] for task in data.get("task_instances", []): start_date = task.get("start_date") end_date = task.get("end_date") if start_date and end_date: from datetime import datetime start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end = datetime.fromisoformat(end_date.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() task_durations.append({ "task_id": task.get("task_id"), "duration_seconds": duration_seconds, "state": task.get("state"), "start_date": start_date, "end_date": end_date }) return { "dag_id": dag_id, "dag_run_id": dag_run_id, "task_durations": task_durations, "total_tasks": len(task_durations) } @mcp.tool() async def dag_calendar(dag_id: str, start_date: str, end_date: str) -> Dict[str, Any]: """[Tool Role]: Shows DAG schedule and execution calendar for a date range.""" if not dag_id: raise ValueError("dag_id must not be empty") params = { 'start_date_gte': start_date, 'start_date_lte': end_date, 'limit': 1000 } query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?{query_string}") resp.raise_for_status() data = resp.json() calendar_data = [] for run in data.get("dag_runs", []): calendar_data.append({ "execution_date": run.get("execution_date"), "start_date": run.get("start_date"), "end_date": run.get("end_date"), "state": run.get("state"), "run_type": run.get("run_type") }) return { "dag_id": dag_id, "date_range": {"start": start_date, "end": end_date}, "calendar_entries": calendar_data, "total_runs": len(calendar_data) } # System Information (6 tools) @mcp.tool() async def get_health() -> Dict[str, Any]: """[Tool Role]: Checks Airflow cluster health status.""" # Import here to avoid circular imports from ..functions import get_api_version api_version = get_api_version() if api_version == "v2": # v2 API: Use /monitor/health endpoint (Airflow 3.x) resp = await airflow_request("GET", "/monitor/health") else: # v1 API: Use /health endpoint (Airflow 2.x) resp = await airflow_request("GET", "/health") resp.raise_for_status() return resp.json() @mcp.tool() async def get_version() -> Dict[str, Any]: """[Tool Role]: Gets Airflow version information.""" resp = await airflow_request("GET", "/version") resp.raise_for_status() return resp.json() @mcp.tool() async def get_config() -> Dict[str, Any]: """[Tool Role]: Retrieves Airflow configuration.""" resp = await airflow_request("GET", "/config") resp.raise_for_status() return resp.json() @mcp.tool() async def list_config_sections() -> Dict[str, Any]: """[Tool Role]: Lists all configuration sections with summary.""" try: resp = await airflow_request("GET", "/config") resp.raise_for_status() config_data = resp.json() sections_summary = {} for section_name, section_data in config_data.get("sections", {}).items(): options_count = len(section_data.get("options", {})) sections_summary[section_name] = { "options_count": options_count, "sample_options": list(section_data.get("options", {}).keys())[:5] } return { "sections_summary": sections_summary, "total_sections": len(sections_summary) } except Exception as e: return { "error": f"Configuration access denied: {str(e)}", "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section" } @mcp.tool() async def get_config_section(section_name: str) -> Dict[str, Any]: """[Tool Role]: Gets all options within a specific configuration section.""" try: resp = await airflow_request("GET", "/config") resp.raise_for_status() config_data = resp.json() section_data = config_data.get("sections", {}).get(section_name) if not section_data: return {"error": f"Section '{section_name}' not found"} return { "section_name": section_name, "options": section_data.get("options", {}), "options_count": len(section_data.get("options", {})) } except Exception as e: return { "error": f"Configuration access denied: {str(e)}", "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section" } @mcp.tool() async def search_config_options(search_term: str) -> Dict[str, Any]: """[Tool Role]: Searches for configuration options matching a term.""" try: resp = await airflow_request("GET", "/config") resp.raise_for_status() config_data = resp.json() matching_options = {} for section_name, section_data in config_data.get("sections", {}).items(): section_matches = {} for option_name, option_data in section_data.get("options", {}).items(): if search_term.lower() in option_name.lower() or search_term.lower() in str(option_data.get("value", "")).lower(): section_matches[option_name] = option_data if section_matches: matching_options[section_name] = section_matches return { "search_term": search_term, "matching_options": matching_options, "total_matches": sum(len(section) for section in matching_options.values()) } except Exception as e: return { "error": f"Configuration access denied: {str(e)}", "note": "This requires 'expose_config = True' in airflow.cfg [webserver] section" } # Pool Management (2 tools) @mcp.tool() async def list_pools(limit: int = 20, offset: int = 0) -> Dict[str, Any]: """[Tool Role]: Lists all pools in Airflow.""" params = {'limit': limit, 'offset': offset} query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/pools?{query_string}") resp.raise_for_status() return resp.json() @mcp.tool() async def get_pool(pool_name: str) -> Dict[str, Any]: """[Tool Role]: Gets details for a specific pool.""" resp = await airflow_request("GET", f"/pools/{pool_name}") resp.raise_for_status() return resp.json() # Task Instance Management (5 tools) @mcp.tool() async def list_task_instances_all( dag_id: Optional[str] = None, dag_run_id: Optional[str] = None, task_id: Optional[str] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, duration_gte: Optional[float] = None, duration_lte: Optional[float] = None, state: Optional[List[str]] = None, pool: Optional[List[str]] = None, limit: int = 100, offset: int = 0 ) -> Dict[str, Any]: """[Tool Role]: Lists task instances with comprehensive filtering options.""" params = {'limit': limit, 'offset': offset} # Add all filter parameters if dag_id: params['dag_id'] = dag_id if dag_run_id: params['dag_run_id'] = dag_run_id if task_id: params['task_id'] = task_id if execution_date_gte: params['execution_date_gte'] = execution_date_gte if execution_date_lte: params['execution_date_lte'] = execution_date_lte if start_date_gte: params['start_date_gte'] = start_date_gte if start_date_lte: params['start_date_lte'] = start_date_lte if end_date_gte: params['end_date_gte'] = end_date_gte if end_date_lte: params['end_date_lte'] = end_date_lte if duration_gte: params['duration_gte'] = duration_gte if duration_lte: params['duration_lte'] = duration_lte if state: params['state'] = state if pool: params['pool'] = pool query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request("GET", f"/taskInstances?{query_string}") resp.raise_for_status() return resp.json()