Skip to main content
Glama
yangkyeongmo

MCP Server for Apache Airflow

by yangkyeongmo

get_event_logs

Retrieve and filter Apache Airflow event logs to monitor DAG runs, task executions, and system events for debugging and auditing purposes.

Instructions

List log entries from event log

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
order_byNo
dag_idNo
task_idNo
run_idNo
map_indexNo
try_numberNo
eventNo
ownerNo
beforeNo
afterNo
included_eventsNo
excluded_eventsNo

Implementation Reference

  • The async handler function implementing the get_event_logs tool logic by building query parameters and calling the EventLogApi.
    async def get_event_logs( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, dag_id: Optional[str] = None, task_id: Optional[str] = None, run_id: Optional[str] = None, map_index: Optional[int] = None, try_number: Optional[int] = None, event: Optional[str] = None, owner: Optional[str] = None, before: Optional[datetime] = None, after: Optional[datetime] = None, included_events: Optional[str] = None, excluded_events: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by if dag_id is not None: kwargs["dag_id"] = dag_id if task_id is not None: kwargs["task_id"] = task_id if run_id is not None: kwargs["run_id"] = run_id if map_index is not None: kwargs["map_index"] = map_index if try_number is not None: kwargs["try_number"] = try_number if event is not None: kwargs["event"] = event if owner is not None: kwargs["owner"] = owner if before is not None: kwargs["before"] = before if after is not None: kwargs["after"] = after if included_events is not None: kwargs["included_events"] = included_events if excluded_events is not None: kwargs["excluded_events"] = excluded_events response = event_log_api.get_event_logs(**kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
  • Module registration function returning the tuple for registering the get_event_logs tool.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_event_logs, "get_event_logs", "List log entries from event log", True), (get_event_log, "get_event_log", "Get a specific log entry by ID", True), ]
  • src/main.py:95-96 (registration)
    Top-level registration loop where tools from get_all_functions() including get_event_logs are added to the MCP app.
    for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description))
  • src/main.py:12-12 (registration)
    Import of the eventlog module's get_all_functions for top-level tool registration.
    from src.airflow.eventlog import get_all_functions as get_eventlog_functions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yangkyeongmo/mcp-server-apache-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server