Skip to main content
Glama
nikhil-ganage

MCP Server Airflow Token

get_dag_runs

Retrieve DAG run details by ID with filtering options for execution dates, states, and pagination to monitor Airflow workflow execution.

Instructions

Get DAG runs by ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
limitNo
offsetNo
execution_date_gteNo
execution_date_lteNo
start_date_gteNo
start_date_lteNo
end_date_gteNo
end_date_lteNo
updated_at_gteNo
updated_at_lteNo
stateNo
order_byNo

Implementation Reference

  • The primary handler function implementing the 'get_dag_runs' tool. It accepts various filters, calls the Airflow DAGRunApi, adds UI URLs to the response, and returns formatted text content.
    async def get_dag_runs( dag_id: str, limit: Optional[int] = None, offset: Optional[int] = None, execution_date_gte: Optional[str] = None, execution_date_lte: Optional[str] = None, start_date_gte: Optional[str] = None, start_date_lte: Optional[str] = None, end_date_gte: Optional[str] = None, end_date_lte: Optional[str] = None, updated_at_gte: Optional[str] = None, updated_at_lte: Optional[str] = None, state: Optional[List[str]] = None, order_by: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if execution_date_gte is not None: kwargs["execution_date_gte"] = execution_date_gte if execution_date_lte is not None: kwargs["execution_date_lte"] = execution_date_lte if start_date_gte is not None: kwargs["start_date_gte"] = start_date_gte if start_date_lte is not None: kwargs["start_date_lte"] = start_date_lte if end_date_gte is not None: kwargs["end_date_gte"] = end_date_gte if end_date_lte is not None: kwargs["end_date_lte"] = end_date_lte if updated_at_gte is not None: kwargs["updated_at_gte"] = updated_at_gte if updated_at_lte is not None: kwargs["updated_at_lte"] = updated_at_lte if state is not None: kwargs["state"] = state if order_by is not None: kwargs["order_by"] = order_by response = dag_run_api.get_dag_runs(dag_id=dag_id, **kwargs) # Convert response to dictionary for easier manipulation response_dict = response.to_dict() # Add UI links to each DAG run for dag_run in response_dict.get("dag_runs", []): dag_run["ui_url"] = get_dag_run_url(dag_id, dag_run["dag_run_id"]) return [types.TextContent(type="text", text=str(response_dict))]
  • Module-level registration function that includes the tuple for 'get_dag_runs' tool, providing the handler reference, name, description, and read-only flag.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (post_dag_run, "post_dag_run", "Trigger a DAG by ID", False), (get_dag_runs, "get_dag_runs", "Get DAG runs by ID", True), (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True), (get_dag_run, "get_dag_run", "Get a DAG run by DAG ID and DAG run ID", True), (update_dag_run_state, "update_dag_run_state", "Update a DAG run state by DAG ID and DAG run ID", False), (delete_dag_run, "delete_dag_run", "Delete a DAG run by DAG ID and DAG run ID", False), (clear_dag_run, "clear_dag_run", "Clear a DAG run", False), (set_dag_run_note, "set_dag_run_note", "Update the DagRun note", False), (get_upstream_dataset_events, "get_upstream_dataset_events", "Get dataset events for a DAG run", True), ]
  • src/main.py:78-92 (registration)
    Top-level MCP server tool registration loop in main.py that calls get_dagrun_functions() (via APIType.DAGRUN mapping) and registers each tool, including 'get_dag_runs', using app.add_tool().
    for api in apis: logging.debug(f"Adding API: {api}") get_function = APITYPE_TO_FUNCTIONS[APIType(api)] try: functions = get_function() except NotImplementedError: continue # Filter functions for read-only mode if requested if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description)
  • Helper utility to generate the Airflow UI URL for a specific DAG run, used within the get_dag_runs handler to enhance the response.
    def get_dag_run_url(dag_id: str, dag_run_id: str) -> str: return f"{AIRFLOW_HOST}/dags/{dag_id}/grid?dag_run_id={dag_run_id}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server