list_dataset_events
Retrieve dataset events for data lineage tracking in Apache Airflow, enabling monitoring of data dependencies and pipeline relationships.
Instructions
[Tool Role]: Lists dataset events for data lineage tracking (v1 API only - v2 uses Assets).
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| dataset_uri | No | ||
| source_dag_id | No |
Implementation Reference
- The main handler function for the 'list_dataset_events' tool, decorated with @mcp.tool() which registers it in the MCP server. It handles pagination, filtering by dataset_uri or source_dag_id, checks API version, and queries the Airflow datasets/events endpoint.@mcp.tool() async def list_dataset_events(limit: int = 20, offset: int = 0, dataset_uri: Optional[str] = None, source_dag_id: Optional[str] = None) -> Dict[str, Any]: """[Tool Role]: Lists dataset events for data lineage tracking (v1 API only - v2 uses Assets).""" from ..functions import get_api_version api_version = get_api_version() if api_version == "v2": return { "error": "Dataset events API is not available in Airflow 3.x (API v2)", "available_in": "v1 only", "v2_alternative": "Use list_asset_events() for Airflow 3.x data lineage tracking" } params = [] params.append(f"limit={limit}") if offset > 0: params.append(f"offset={offset}") if dataset_uri: params.append(f"dataset_uri={dataset_uri}") if source_dag_id: params.append(f"source_dag_id={source_dag_id}") query_string = "&".join(params) if params else "" endpoint = f"/datasets/events?{query_string}" if query_string else "/datasets/events" resp = await airflow_request("GET", endpoint) resp.raise_for_status() return resp.json()