get_task
Retrieve detailed task definition from an Airflow DAG: operator type, upstream/downstream dependencies, retries, execution timeout, trigger rule, and more. Use to inspect task configuration and relationships.
Instructions
Get detailed information about a specific task definition in a DAG.
Use this tool when the user asks about:
"Show me details for task X in DAG Y" or "What does task Z do?"
"What operator does task A use?" or "What's the configuration of task B?"
"Tell me about task C" or "Get task definition for D"
"What are the dependencies of task E?" or "Which tasks does F depend on?"
Returns task definition information including:
task_id: Unique identifier for the task
task_display_name: Human-readable display name
owner: Who owns this task
start_date: When this task becomes active
end_date: When this task becomes inactive (if set)
trigger_rule: When this task should run (all_success, one_failed, etc.)
depends_on_past: Whether task depends on previous run's success
wait_for_downstream: Whether to wait for downstream tasks
retries: Number of retry attempts
retry_delay: Time between retries
execution_timeout: Maximum execution time
operator_name: Type of operator (PythonOperator, BashOperator, etc.)
pool: Resource pool assignment
queue: Queue for executor
downstream_task_ids: List of tasks that depend on this task
upstream_task_ids: List of tasks this task depends on
Args: dag_id: The ID of the DAG containing the task task_id: The ID of the task to get details for
Returns: JSON with complete task definition details
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| task_id | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/astro_airflow_mcp/server.py:742-777 (registration)The MCP tool registration and public interface for 'get_task' using the @mcp.tool() decorator. Delegates to _get_task_impl.
@mcp.tool() def get_task(dag_id: str, task_id: str) -> str: """Get detailed information about a specific task definition in a DAG. Use this tool when the user asks about: - "Show me details for task X in DAG Y" or "What does task Z do?" - "What operator does task A use?" or "What's the configuration of task B?" - "Tell me about task C" or "Get task definition for D" - "What are the dependencies of task E?" or "Which tasks does F depend on?" Returns task definition information including: - task_id: Unique identifier for the task - task_display_name: Human-readable display name - owner: Who owns this task - start_date: When this task becomes active - end_date: When this task becomes inactive (if set) - trigger_rule: When this task should run (all_success, one_failed, etc.) - depends_on_past: Whether task depends on previous run's success - wait_for_downstream: Whether to wait for downstream tasks - retries: Number of retry attempts - retry_delay: Time between retries - execution_timeout: Maximum execution time - operator_name: Type of operator (PythonOperator, BashOperator, etc.) - pool: Resource pool assignment - queue: Queue for executor - downstream_task_ids: List of tasks that depend on this task - upstream_task_ids: List of tasks this task depends on Args: dag_id: The ID of the DAG containing the task task_id: The ID of the task to get details for Returns: JSON with complete task definition details """ return _get_task_impl(dag_id=dag_id, task_id=task_id) - src/astro_airflow_mcp/server.py:651-666 (handler)The handler function that executes the get_task logic. Gets the appropriate versioned adapter and calls adapter.get_task(dag_id, task_id), then serializes the result as JSON.
def _get_task_impl(dag_id: str, task_id: str) -> str: """Internal implementation for getting task details from Airflow. Args: dag_id: The ID of the DAG task_id: The ID of the task Returns: JSON string containing the task details """ try: adapter = _get_adapter() data = adapter.get_task(dag_id, task_id) return json.dumps(data, indent=2) except Exception as e: return str(e) - Abstract method definition in the base adapter interface, defining the contract: takes dag_id and task_id strings, returns a dict.
@abstractmethod def get_task(self, dag_id: str, task_id: str) -> dict[str, Any]: """Get details of a specific task.""" - Airflow 2.x implementation: calls GET /api/v1/dags/{dag_id}/tasks/{task_id}
def get_task(self, dag_id: str, task_id: str) -> dict[str, Any]: """Get details of a specific task.""" return self._call(f"dags/{dag_id}/tasks/{task_id}") - Airflow 3.x implementation: calls GET /api/v2/dags/{dag_id}/tasks/{task_id}
def get_task(self, dag_id: str, task_id: str) -> dict[str, Any]: """Get details of a specific task.""" return self._call(f"dags/{dag_id}/tasks/{task_id}")