Skip to main content
Glama

dag_task_duration

Retrieve task duration data for a specific DAG run in Apache Airflow. Input a DAG ID and optional run ID to access task statistics and execution details efficiently.

Instructions

[Tool Role]: Retrieves task duration information for a specific DAG run.

Args: dag_id: The DAG ID to get task durations for run_id: Specific run ID (if not provided, uses latest run)

Returns: Task duration data: dag_id, run_id, tasks, statistics

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
run_idNo

Implementation Reference

  • The handler function decorated with @mcp.tool() that implements the dag_task_duration tool logic, calculating task execution durations for a given DAG run.
    @mcp.tool() async def dag_task_duration(dag_id: str, dag_run_id: Optional[str] = None) -> Dict[str, Any]: """[Tool Role]: Analyzes task durations within a DAG run.""" if not dag_id: raise ValueError("dag_id must not be empty") if not dag_run_id: # Get the latest run resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns?limit=1&order_by=-execution_date") resp.raise_for_status() runs = resp.json().get("dag_runs", []) if not runs: return {"error": f"No DAG runs found for DAG {dag_id}"} dag_run_id = runs[0]["run_id"] resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances") resp.raise_for_status() data = resp.json() task_durations = [] for task in data.get("task_instances", []): start_date = task.get("start_date") end_date = task.get("end_date") if start_date and end_date: from datetime import datetime start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) end = datetime.fromisoformat(end_date.replace('Z', '+00:00')) duration_seconds = (end - start).total_seconds() task_durations.append({ "task_id": task.get("task_id"), "duration_seconds": duration_seconds, "state": task.get("state"), "start_date": start_date, "end_date": end_date }) return { "dag_id": dag_id, "dag_run_id": dag_run_id, "task_durations": task_durations, "total_tasks": len(task_durations) }
  • Registration entry point for v1 API tools, which sets the v1 airflow_request function and calls register_common_tools(mcp) to register dag_task_duration among others.
    def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
  • Registration entry point for v2 API tools, which sets the v2 airflow_request function and calls register_common_tools(mcp) to register dag_task_duration among others.
    def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp) # Add V2-exclusive tools (2 tools) @mcp.tool() async def list_assets(limit: int = 20, offset: int = 0, uri_pattern: Optional[str] = None) -> Dict[str, Any]: """ [V2 New] List all assets in the system for data-aware scheduling. Assets are a key feature in Airflow 3.0 for data-aware scheduling. They enable workflows to be triggered by data changes rather than time schedules. Args: limit: Maximum number of assets to return (default: 20) offset: Number of assets to skip for pagination (default: 0) uri_pattern: Filter assets by URI pattern (optional) Returns: Dict containing assets list, pagination info, and metadata """ params = {'limit': limit, 'offset': offset} if uri_pattern: params['uri_pattern'] = uri_pattern query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request_v2("GET", f"/assets?{query_string}") resp.raise_for_status() data = resp.json() return { "assets": data.get("assets", []), "total_entries": data.get("total_entries", 0), "limit": limit, "offset": offset, "api_version": "v2", "feature": "assets" } @mcp.tool() async def list_asset_events(limit: int = 20, offset: int = 0, asset_uri: Optional[str] = None, source_dag_id: Optional[str] = None) -> Dict[str, Any]: """ [V2 New] List asset events for data lineage tracking. Asset events track when assets are created or updated by DAGs. This enables data lineage tracking and data-aware scheduling in Airflow 3.0. Args: limit: Maximum number of events to return (default: 20) offset: Number of events to skip for pagination (default: 0) asset_uri: Filter events by specific asset URI (optional) source_dag_id: Filter events by source DAG that produced the event (optional) Returns: Dict containing asset events list, pagination info, and metadata """ params = {'limit': limit, 'offset': offset} if asset_uri: params['asset_uri'] = asset_uri if source_dag_id: params['source_dag_id'] = source_dag_id query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request_v2("GET", f"/assets/events?{query_string}") resp.raise_for_status() data = resp.json() return { "asset_events": data.get("asset_events", []), "total_entries": data.get("total_entries", 0), "limit": limit, "offset": offset, "api_version": "v2", "feature": "asset_events" } logger.info("Registered all Airflow API v2 tools (43 common + 2 assets + 4 management = 49 tools)")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Airflow-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server