get_event_logs
Retrieve and filter Apache Airflow event logs to monitor DAG runs, task executions, and system events for debugging and auditing purposes.
Instructions
List log entries from event log
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| order_by | No | ||
| dag_id | No | ||
| task_id | No | ||
| run_id | No | ||
| map_index | No | ||
| try_number | No | ||
| event | No | ||
| owner | No | ||
| before | No | ||
| after | No | ||
| included_events | No | ||
| excluded_events | No |
Implementation Reference
- src/airflow/eventlog.py:20-69 (handler)The async handler function implementing the get_event_logs tool logic by building query parameters and calling the EventLogApi.async def get_event_logs( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, dag_id: Optional[str] = None, task_id: Optional[str] = None, run_id: Optional[str] = None, map_index: Optional[int] = None, try_number: Optional[int] = None, event: Optional[str] = None, owner: Optional[str] = None, before: Optional[datetime] = None, after: Optional[datetime] = None, included_events: Optional[str] = None, excluded_events: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by if dag_id is not None: kwargs["dag_id"] = dag_id if task_id is not None: kwargs["task_id"] = task_id if run_id is not None: kwargs["run_id"] = run_id if map_index is not None: kwargs["map_index"] = map_index if try_number is not None: kwargs["try_number"] = try_number if event is not None: kwargs["event"] = event if owner is not None: kwargs["owner"] = owner if before is not None: kwargs["before"] = before if after is not None: kwargs["after"] = after if included_events is not None: kwargs["included_events"] = included_events if excluded_events is not None: kwargs["excluded_events"] = excluded_events response = event_log_api.get_event_logs(**kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/eventlog.py:12-17 (registration)Module registration function returning the tuple for registering the get_event_logs tool.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_event_logs, "get_event_logs", "List log entries from event log", True), (get_event_log, "get_event_log", "Get a specific log entry by ID", True), ]
- src/main.py:95-96 (registration)Top-level registration loop where tools from get_all_functions() including get_event_logs are added to the MCP app.for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description))
- src/main.py:12-12 (registration)Import of the eventlog module's get_all_functions for top-level tool registration.from src.airflow.eventlog import get_all_functions as get_eventlog_functions