get_upstream_asset_events
Retrieve the asset events that triggered a specific DAG run, helping you understand why the pipeline started and which data changes caused the execution.
Instructions
Get asset events that triggered a specific DAG run.
Use this tool when the user asks about:
"What triggered this DAG run?"
"Which asset events caused this run to start?"
"Why did DAG X start running?"
"Show me the upstream triggers for this run"
"What data changes triggered this pipeline run?"
This is useful for understanding causation in data-aware scheduling. When a DAG is scheduled based on asset updates, this tool shows which specific asset events triggered the run.
Returns information including:
dag_id: The DAG that was triggered
dag_run_id: The specific run
triggered_by_events: List of asset events that caused this run
event_count: Number of triggering events
Each event includes:
asset_uri or dataset_uri: The asset that was updated
source_dag_id: The DAG that produced the event
source_run_id: The run that produced the event
timestamp: When the event occurred
Args: dag_id: The ID of the DAG dag_run_id: The ID of the DAG run (e.g., "scheduled__2024-01-01T00:00:00+00:00")
Returns: JSON with the asset events that triggered this DAG run
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dag_run_id | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The internal implementation function _get_upstream_asset_events_impl that executes the tool logic. It calls adapter.get_dag_run_upstream_asset_events() and formats the response with dag_id, dag_run_id, triggered_by_events, and event_count.
def _get_upstream_asset_events_impl( dag_id: str, dag_run_id: str, ) -> str: """Internal implementation for getting upstream asset events for a DAG run. Args: dag_id: The DAG ID dag_run_id: The DAG run ID Returns: JSON string containing the asset events that triggered this run """ try: adapter = _get_adapter() data = adapter.get_dag_run_upstream_asset_events(dag_id, dag_run_id) if "asset_events" in data: return json.dumps( { "dag_id": dag_id, "dag_run_id": dag_run_id, "triggered_by_events": data["asset_events"], "event_count": len(data["asset_events"]), }, indent=2, ) return json.dumps(data, indent=2) except Exception as e: return str(e) - src/astro_airflow_mcp/server.py:1512-1549 (registration)The MCP tool registration via @mcp.tool() decorator with docstring describing usage for getting upstream asset events that triggered a DAG run. Calls _get_upstream_asset_events_impl.
@mcp.tool() def get_upstream_asset_events( dag_id: str, dag_run_id: str, ) -> str: """Get asset events that triggered a specific DAG run. Use this tool when the user asks about: - "What triggered this DAG run?" - "Which asset events caused this run to start?" - "Why did DAG X start running?" - "Show me the upstream triggers for this run" - "What data changes triggered this pipeline run?" This is useful for understanding causation in data-aware scheduling. When a DAG is scheduled based on asset updates, this tool shows which specific asset events triggered the run. Returns information including: - dag_id: The DAG that was triggered - dag_run_id: The specific run - triggered_by_events: List of asset events that caused this run - event_count: Number of triggering events Each event includes: - asset_uri or dataset_uri: The asset that was updated - source_dag_id: The DAG that produced the event - source_run_id: The run that produced the event - timestamp: When the event occurred Args: dag_id: The ID of the DAG dag_run_id: The ID of the DAG run (e.g., "scheduled__2024-01-01T00:00:00+00:00") Returns: JSON with the asset events that triggered this DAG run """ return _get_upstream_asset_events_impl(dag_id, dag_run_id) - Abstract method definition in the base AirflowAdapter class defining the interface for get_dag_run_upstream_asset_events.
def get_dag_run_upstream_asset_events( self, dag_id: str, dag_run_id: str, ) -> dict[str, Any]: """Get asset events that triggered a specific DAG run. This is used to verify causation - which asset events caused this DAG run to be scheduled (data-aware scheduling). Args: dag_id: The DAG ID dag_run_id: The DAG run ID Returns: Dict with 'asset_events' list showing which events triggered this run """ - Airflow 2.x implementation of get_dag_run_upstream_asset_events. Calls the 'upstreamDatasetEvents' endpoint and normalizes field names (dataset_events->asset_events, dataset_uri->uri, dataset_id->asset_id).
def get_dag_run_upstream_asset_events( self, dag_id: str, dag_run_id: str, ) -> dict[str, Any]: """Get upstream dataset events that triggered a DAG run (Airflow 2.x). Normalizes field names for consistency with Airflow 3: - 'dataset_events' -> 'asset_events' - 'dataset_uri' -> 'uri' - 'dataset_id' -> 'asset_id' """ try: data = self._call(f"dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents") # Normalize field names if "dataset_events" in data: data["asset_events"] = data.pop("dataset_events") for event in data.get("asset_events", []): if "dataset_uri" in event: event["uri"] = event.pop("dataset_uri") if "dataset_id" in event: event["asset_id"] = event.pop("dataset_id") return data except NotFoundError: return self._handle_not_found( "upstreamDatasetEvents", alternative="This endpoint requires Airflow 2.4+ and a dataset-triggered run", ) - Airflow 3.x implementation of get_dag_run_upstream_asset_events. Calls the 'upstreamAssetEvents' endpoint and normalizes asset_uri to uri for consistency.
def get_dag_run_upstream_asset_events( self, dag_id: str, dag_run_id: str, ) -> dict[str, Any]: """Get upstream asset events that triggered a DAG run (Airflow 3.x). Normalizes field names for consistency: - 'asset_uri' -> 'uri' """ try: data = self._call(f"dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents") # Normalize field names if "asset_events" in data: for event in data.get("asset_events", []): if "asset_uri" in event: event["uri"] = event.pop("asset_uri") return data except NotFoundError: return self._handle_not_found( "upstreamAssetEvents", alternative="This endpoint requires an asset-triggered DAG run", )