Skip to main content
Glama
yangkyeongmo

MCP Server for Apache Airflow

by yangkyeongmo

get_event_logs

Retrieve and filter Apache Airflow event logs to monitor DAG runs, task executions, and system events for debugging and auditing purposes.

Instructions

List log entries from event log

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
offsetNo
order_byNo
dag_idNo
task_idNo
run_idNo
map_indexNo
try_numberNo
eventNo
ownerNo
beforeNo
afterNo
included_eventsNo
excluded_eventsNo

Implementation Reference

  • The async handler function implementing the get_event_logs tool logic by building query parameters and calling the EventLogApi.
    async def get_event_logs(
        limit: Optional[int] = None,
        offset: Optional[int] = None,
        order_by: Optional[str] = None,
        dag_id: Optional[str] = None,
        task_id: Optional[str] = None,
        run_id: Optional[str] = None,
        map_index: Optional[int] = None,
        try_number: Optional[int] = None,
        event: Optional[str] = None,
        owner: Optional[str] = None,
        before: Optional[datetime] = None,
        after: Optional[datetime] = None,
        included_events: Optional[str] = None,
        excluded_events: Optional[str] = None,
    ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
        # Build parameters dictionary
        kwargs: Dict[str, Any] = {}
        if limit is not None:
            kwargs["limit"] = limit
        if offset is not None:
            kwargs["offset"] = offset
        if order_by is not None:
            kwargs["order_by"] = order_by
        if dag_id is not None:
            kwargs["dag_id"] = dag_id
        if task_id is not None:
            kwargs["task_id"] = task_id
        if run_id is not None:
            kwargs["run_id"] = run_id
        if map_index is not None:
            kwargs["map_index"] = map_index
        if try_number is not None:
            kwargs["try_number"] = try_number
        if event is not None:
            kwargs["event"] = event
        if owner is not None:
            kwargs["owner"] = owner
        if before is not None:
            kwargs["before"] = before
        if after is not None:
            kwargs["after"] = after
        if included_events is not None:
            kwargs["included_events"] = included_events
        if excluded_events is not None:
            kwargs["excluded_events"] = excluded_events
    
        response = event_log_api.get_event_logs(**kwargs)
        return [types.TextContent(type="text", text=str(response.to_dict()))]
  • Module registration function returning the tuple for registering the get_event_logs tool.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]:
        """Return list of (function, name, description, is_read_only) tuples for registration."""
        return [
            (get_event_logs, "get_event_logs", "List log entries from event log", True),
            (get_event_log, "get_event_log", "Get a specific log entry by ID", True),
        ]
  • src/main.py:95-96 (registration)
    Top-level registration loop where tools from get_all_functions() including get_event_logs are added to the MCP app.
    for func, name, description, *_ in functions:
        app.add_tool(Tool.from_function(func, name=name, description=description))
  • src/main.py:12-12 (registration)
    Import of the eventlog module's get_all_functions for top-level tool registration.
    from src.airflow.eventlog import get_all_functions as get_eventlog_functions

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yangkyeongmo/mcp-server-apache-airflow'

If you have feedback or need assistance with the MCP directory API, please join our Discord server