get_task_instance_logs
Retrieve logs for a specific task instance in Apache Airflow, including content and metadata. Specify DAG, task, and try number to fetch detailed execution logs or metadata only. Supports pagination for large logs.
Instructions
[Tool Role]: Retrieves logs for a specific task instance and its try number with content and metadata.
Args: dag_id: The DAG ID containing the task dag_run_id: The DAG run ID containing the task instance task_id: The task ID to get logs for try_number: The try number for the task instance (default: 1) full_content: Whether to return full log content or just metadata (default: False) token: Pagination token for large logs (optional)
Returns: Task instance logs with content and metadata: task_id, dag_id, dag_run_id, try_number, content, metadata
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dag_run_id | Yes | ||
| full_content | No | ||
| task_id | Yes | ||
| token | No | ||
| try_number | No |
Implementation Reference
- The main handler function for the 'get_task_instance_logs' tool. It makes a GET request to the Airflow API to fetch logs for a specific task instance by dag_id, dag_run_id, task_id, and try_number.async def get_task_instance_logs(dag_id: str, dag_run_id: str, task_id: str, try_number: int = 1) -> Dict[str, Any]: """[Tool Role]: Retrieves logs for a specific task instance.""" resp = await airflow_request("GET", f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}") resp.raise_for_status() return resp.json()
- src/mcp_airflow_api/tools/v1_tools.py:13-28 (registration)Registration function for v1 tools that sets the v1-specific airflow_request and calls register_common_tools(mcp), which defines and registers the get_task_instance_logs tool among others.def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1 common_tools.airflow_request = airflow_request_v1 # Register all 56 common tools (includes management tools) common_tools.register_common_tools(mcp) # V1 has no exclusive tools - all tools are shared with v2 logger.info("Registered all Airflow API v1 tools (56 tools: 43 core + 13 management tools)")
- src/mcp_airflow_api/tools/v2_tools.py:14-104 (registration)Registration function for v2 tools that sets the v2-specific airflow_request and calls register_common_tools(mcp), which defines and registers the get_task_instance_logs tool among others.def register_tools(mcp): """Register v2 tools: common tools + v2-exclusive asset tools.""" logger.info("Initializing MCP server for Airflow API v2") logger.info("Loading Airflow API v2 tools (Airflow 3.0+)") # Set the global request function to v2 common_tools.airflow_request = airflow_request_v2 # Register all 43 common tools common_tools.register_common_tools(mcp) # Add V2-exclusive tools (2 tools) @mcp.tool() async def list_assets(limit: int = 20, offset: int = 0, uri_pattern: Optional[str] = None) -> Dict[str, Any]: """ [V2 New] List all assets in the system for data-aware scheduling. Assets are a key feature in Airflow 3.0 for data-aware scheduling. They enable workflows to be triggered by data changes rather than time schedules. Args: limit: Maximum number of assets to return (default: 20) offset: Number of assets to skip for pagination (default: 0) uri_pattern: Filter assets by URI pattern (optional) Returns: Dict containing assets list, pagination info, and metadata """ params = {'limit': limit, 'offset': offset} if uri_pattern: params['uri_pattern'] = uri_pattern query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request_v2("GET", f"/assets?{query_string}") resp.raise_for_status() data = resp.json() return { "assets": data.get("assets", []), "total_entries": data.get("total_entries", 0), "limit": limit, "offset": offset, "api_version": "v2", "feature": "assets" } @mcp.tool() async def list_asset_events(limit: int = 20, offset: int = 0, asset_uri: Optional[str] = None, source_dag_id: Optional[str] = None) -> Dict[str, Any]: """ [V2 New] List asset events for data lineage tracking. Asset events track when assets are created or updated by DAGs. This enables data lineage tracking and data-aware scheduling in Airflow 3.0. Args: limit: Maximum number of events to return (default: 20) offset: Number of events to skip for pagination (default: 0) asset_uri: Filter events by specific asset URI (optional) source_dag_id: Filter events by source DAG that produced the event (optional) Returns: Dict containing asset events list, pagination info, and metadata """ params = {'limit': limit, 'offset': offset} if asset_uri: params['asset_uri'] = asset_uri if source_dag_id: params['source_dag_id'] = source_dag_id query_string = "&".join([f"{k}={v}" for k, v in params.items()]) resp = await airflow_request_v2("GET", f"/assets/events?{query_string}") resp.raise_for_status() data = resp.json() return { "asset_events": data.get("asset_events", []), "total_entries": data.get("total_entries", 0), "limit": limit, "offset": offset, "api_version": "v2", "feature": "asset_events" } logger.info("Registered all Airflow API v2 tools (43 common + 2 assets + 4 management = 49 tools)")
- src/mcp_airflow_api/mcp_main.py:264-267 (registration)Top-level registration call for v1 tools in the main MCP server setup function, which indirectly registers the tool.logger.info("Loading Airflow API v1 tools (Airflow 2.x)") from mcp_airflow_api.tools import v1_tools v1_tools.register_tools(mcp_instance) elif api_version == "v2":
- Imports and setup of the airflow_request helper function used by the tool handler.from ..functions import airflow_request as airflow_request_v1 from . import common_tools import logging logger = logging.getLogger(__name__) def register_tools(mcp): """Register v1 tools by importing common tools with v1 request function.""" logger.info("Initializing MCP server for Airflow API v1") logger.info("Loading Airflow API v1 tools (Airflow 2.x)") # Set the global request function to v1