get_dataset_events
Retrieve dataset events from Apache Airflow to monitor data dependencies and track dataset-related activities in workflows.
Instructions
Get dataset events
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dataset_id | No | ||
| limit | No | ||
| offset | No | ||
| order_by | No | ||
| source_dag_id | No | ||
| source_map_index | No | ||
| source_run_id | No | ||
| source_task_id | No |
Input Schema (JSON Schema)
{
"properties": {
"dataset_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Dataset Id"
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Limit"
},
"offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Offset"
},
"order_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Order By"
},
"source_dag_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Source Dag Id"
},
"source_map_index": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Source Map Index"
},
"source_run_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Source Run Id"
},
"source_task_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Source Task Id"
}
},
"type": "object"
}
Implementation Reference
- src/airflow/dataset.py:73-103 (handler)The handler function that executes the tool's logic: builds a kwargs dict from input parameters and calls the underlying Airflow DatasetApi.get_dataset_events, returning the result as MCP TextContent.async def get_dataset_events( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, dataset_id: Optional[int] = None, source_dag_id: Optional[str] = None, source_task_id: Optional[str] = None, source_run_id: Optional[str] = None, source_map_index: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by if dataset_id is not None: kwargs["dataset_id"] = dataset_id if source_dag_id is not None: kwargs["source_dag_id"] = source_dag_id if source_task_id is not None: kwargs["source_task_id"] = source_task_id if source_run_id is not None: kwargs["source_run_id"] = source_run_id if source_map_index is not None: kwargs["source_map_index"] = source_map_index response = dataset_api.get_dataset_events(**kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/dataset.py:16-16 (registration)The tuple in get_all_functions() that registers the get_dataset_events tool with its handler, name, description, and read-only flag.(get_dataset_events, "get_dataset_events", "Get dataset events", True),
- src/main.py:10-10 (registration)Imports the get_all_functions from the dataset module, which provides the tool list including get_dataset_events, for use in registering tools with the MCP server.from src.airflow.dataset import get_all_functions as get_dataset_functions
- src/airflow/dataset.py:8-8 (helper)Initializes the DatasetApi client instance used by the get_dataset_events handler and other dataset tools.dataset_api = DatasetApi(api_client)
- src/main.py:91-91 (helper)The generic registration call in the main server setup that adds the tool (including get_dataset_events) to the MCP app.app.add_tool(func, name=name, description=description)