get_dataset_events
Retrieve events for a specific dataset in Apache Airflow using the v1 API to monitor dataset activity and track changes.
Instructions
[Tool Role]: Gets events for a specific dataset (v1 API only - v2 uses Assets).
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dataset_uri | Yes | ||
| limit | No | ||
| offset | No |
Implementation Reference
- The get_dataset_events tool handler function, decorated with @mcp.tool() for automatic registration in the MCP server. It fetches events for the specified dataset URI using the Airflow API (v1 only, with fallback message for v2). Parameters: dataset_uri (str), limit (int=20), offset (int=0). Returns dict of events.@mcp.tool() async def get_dataset_events(dataset_uri: str, limit: int = 20, offset: int = 0) -> Dict[str, Any]: """[Tool Role]: Gets events for a specific dataset (v1 API only - v2 uses Assets).""" from ..functions import get_api_version api_version = get_api_version() if api_version == "v2": return { "error": "Dataset events API is not available in Airflow 3.x (API v2)", "available_in": "v1 only", "v2_alternative": "Use list_asset_events() for Airflow 3.x data lineage tracking" } import urllib.parse encoded_uri = urllib.parse.quote(dataset_uri, safe='') params = [] params.append(f"limit={limit}") if offset > 0: params.append(f"offset={offset}") query_string = "&".join(params) if params else "" endpoint = f"/datasets/{encoded_uri}/events?{query_string}" if query_string else f"/datasets/{encoded_uri}/events" resp = await airflow_request("GET", endpoint) resp.raise_for_status() return resp.json()