get_task_logs
Retrieve execution logs for specific Apache Airflow tasks in MWAA environments to monitor workflow performance and troubleshoot issues.
Instructions
Get logs for a specific task instance.
Args: environment_name: Name of the MWAA environment dag_id: The DAG ID dag_run_id: The DAG run ID task_id: The task ID task_try_number: Specific try number (optional)
Returns: Dictionary containing task logs
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment_name | Yes | ||
| dag_id | Yes | ||
| dag_run_id | Yes | ||
| task_id | Yes | ||
| task_try_number | No |
Implementation Reference
- awslabs/mwaa_mcp_server/tools.py:353-369 (handler)The actual implementation of the get_task_logs logic, which constructs the Airflow API endpoint and calls the API.
async def get_task_logs( self, environment_name: str, dag_id: str, dag_run_id: str, task_id: str, task_try_number: Optional[int] = None, ) -> Dict[str, Any]: """Get task logs via Airflow API.""" if task_try_number is None: task_try_number = 1 endpoint = ( f"/dags/{dag_id}/dagRuns/{dag_run_id}" f"/taskInstances/{task_id}/logs/{task_try_number}" ) return self._invoke_airflow_api(environment_name, "GET", endpoint) - awslabs/mwaa_mcp_server/server.py:409-433 (registration)The MCP tool registration and wrapper function that handles input processing before calling the tools implementation.
@mcp.tool(name="get_task_logs") async def get_task_logs( environment_name: str, dag_id: str, dag_run_id: str, task_id: str, task_try_number: Optional[int] = None, ) -> Dict[str, Any]: """Get logs for a specific task instance. Args: environment_name: Name of the MWAA environment dag_id: The DAG ID dag_run_id: The DAG run ID task_id: The task ID task_try_number: Specific try number (optional) Returns: Dictionary containing task logs """ task_try_number_int = int(task_try_number) if task_try_number is not None else None return await tools.get_task_logs( environment_name, dag_id, dag_run_id, task_id, task_try_number_int )