Skip to main content
Glama

MCP Server Airflow Token

get_dag_stats

Retrieve statistics and status information for Apache Airflow DAGs to monitor workflow execution and performance.

Instructions

Get DAG stats

Input Schema

NameRequiredDescriptionDefault
dag_idsNo

Input Schema (JSON Schema)

{ "properties": { "dag_ids": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Dag Ids" } }, "type": "object" }

Implementation Reference

  • The main handler function implementing the get_dag_stats tool. It accepts optional dag_ids, calls the Airflow DagStatsApi, and returns the stats as text content.
    async def get_dag_stats( dag_ids: Optional[List[str]] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if dag_ids is not None: kwargs["dag_ids"] = dag_ids response = dag_stats_api.get_dag_stats(**kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
  • Local registration helper that provides the tool function, name, description, and read-only flag for get_dag_stats.
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_dag_stats, "get_dag_stats", "Get DAG stats", True), ]
  • src/main.py:22-38 (registration)
    Central mapping of APIType.DAGSTATS to dagstats get_all_functions for tool registration.
    APITYPE_TO_FUNCTIONS = { APIType.CONFIG: get_config_functions, APIType.CONNECTION: get_connection_functions, APIType.DAG: get_dag_functions, APIType.DAGRUN: get_dagrun_functions, APIType.DAGSTATS: get_dagstats_functions, APIType.DATASET: get_dataset_functions, APIType.EVENTLOG: get_eventlog_functions, APIType.IMPORTERROR: get_importerror_functions, APIType.MONITORING: get_monitoring_functions, APIType.PLUGIN: get_plugin_functions, APIType.POOL: get_pool_functions, APIType.PROVIDER: get_provider_functions, APIType.TASKINSTANCE: get_taskinstance_functions, APIType.VARIABLE: get_variable_functions, APIType.XCOM: get_xcom_functions, }
  • src/main.py:80-92 (registration)
    Code that invokes the get_all_functions for selected APIs and registers each tool using app.add_tool.
    get_function = APITYPE_TO_FUNCTIONS[APIType(api)] try: functions = get_function() except NotImplementedError: continue # Filter functions for read-only mode if requested if read_only: functions = filter_functions_for_read_only(functions) for func, name, description, *_ in functions: app.add_tool(func, name=name, description=description)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server