Skip to main content
Glama
nikhil-ganage

MCP Server Airflow Token

post_dag_run

Trigger an Apache Airflow DAG by specifying its ID to initiate workflow execution.

Instructions

Trigger a DAG by ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dag_idYes
dag_run_idNo
data_interval_endNo
data_interval_startNo
end_dateNo
execution_dateNo
external_triggerNo
last_scheduling_decisionNo
logical_dateNo
noteNo
run_typeNo
start_dateNo

Implementation Reference

  • The async handler function implementing the 'post_dag_run' tool logic. It creates a DAGRun object from input parameters and triggers it via the Airflow DAGRunApi.
    async def post_dag_run(
        dag_id: str,
        dag_run_id: Optional[str] = None,
        data_interval_end: Optional[datetime] = None,
        data_interval_start: Optional[datetime] = None,
        end_date: Optional[datetime] = None,
        execution_date: Optional[datetime] = None,
        external_trigger: Optional[bool] = None,
        last_scheduling_decision: Optional[datetime] = None,
        logical_date: Optional[datetime] = None,
        note: Optional[str] = None,
        run_type: Optional[str] = None,
        start_date: Optional[datetime] = None,
        # state: Optional[str] = None,  # TODO: add state
    ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
        dag_run = DAGRun(
            dag_id=dag_id,
            dag_run_id=dag_run_id,
            data_interval_end=data_interval_end,
            data_interval_start=data_interval_start,
            end_date=end_date,
            execution_date=execution_date,
            external_trigger=external_trigger,
            last_scheduling_decision=last_scheduling_decision,
            logical_date=logical_date,
            note=note,
            run_type=run_type,
            start_date=start_date,
            state=None,
        )
        response = dag_run_api.post_dag_run(dag_id=dag_id, dag_run=dag_run)
        return [types.TextContent(type="text", text=str(response.to_dict()))]
  • Registration of the 'post_dag_run' tool (and other DAG run tools) via the get_all_functions() which returns tuples used in src/main.py to register with the MCP app.add_tool().
    def get_all_functions() -> list[tuple[Callable, str, str, bool]]:
        """Return list of (function, name, description, is_read_only) tuples for registration."""
        return [
            (post_dag_run, "post_dag_run", "Trigger a DAG by ID", False),
            (get_dag_runs, "get_dag_runs", "Get DAG runs by ID", True),
            (get_dag_runs_batch, "get_dag_runs_batch", "List DAG runs (batch)", True),
            (get_dag_run, "get_dag_run", "Get a DAG run by DAG ID and DAG run ID", True),
            (update_dag_run_state, "update_dag_run_state", "Update a DAG run state by DAG ID and DAG run ID", False),
            (delete_dag_run, "delete_dag_run", "Delete a DAG run by DAG ID and DAG run ID", False),
            (clear_dag_run, "clear_dag_run", "Clear a DAG run", False),
            (set_dag_run_note, "set_dag_run_note", "Update the DagRun note", False),
            (get_upstream_dataset_events, "get_upstream_dataset_events", "Get dataset events for a DAG run", True),
        ]
  • src/main.py:8-98 (registration)
    Central registration logic in main.py that imports get_all_functions from dagrun module and uses it to add the 'post_dag_run' tool (among others) to the MCP server via app.add_tool().
    from src.airflow.dagrun import get_all_functions as get_dagrun_functions
    from src.airflow.dagstats import get_all_functions as get_dagstats_functions
    from src.airflow.dataset import get_all_functions as get_dataset_functions
    from src.airflow.eventlog import get_all_functions as get_eventlog_functions
    from src.airflow.importerror import get_all_functions as get_importerror_functions
    from src.airflow.monitoring import get_all_functions as get_monitoring_functions
    from src.airflow.plugin import get_all_functions as get_plugin_functions
    from src.airflow.pool import get_all_functions as get_pool_functions
    from src.airflow.provider import get_all_functions as get_provider_functions
    from src.airflow.taskinstance import get_all_functions as get_taskinstance_functions
    from src.airflow.variable import get_all_functions as get_variable_functions
    from src.airflow.xcom import get_all_functions as get_xcom_functions
    from src.enums import APIType
    
    APITYPE_TO_FUNCTIONS = {
        APIType.CONFIG: get_config_functions,
        APIType.CONNECTION: get_connection_functions,
        APIType.DAG: get_dag_functions,
        APIType.DAGRUN: get_dagrun_functions,
        APIType.DAGSTATS: get_dagstats_functions,
        APIType.DATASET: get_dataset_functions,
        APIType.EVENTLOG: get_eventlog_functions,
        APIType.IMPORTERROR: get_importerror_functions,
        APIType.MONITORING: get_monitoring_functions,
        APIType.PLUGIN: get_plugin_functions,
        APIType.POOL: get_pool_functions,
        APIType.PROVIDER: get_provider_functions,
        APIType.TASKINSTANCE: get_taskinstance_functions,
        APIType.VARIABLE: get_variable_functions,
        APIType.XCOM: get_xcom_functions,
    }
    
    
    def filter_functions_for_read_only(functions: list[tuple]) -> list[tuple]:
        """
        Filter functions to only include read-only operations.
    
        Args:
            functions: List of (func, name, description, is_read_only) tuples
    
        Returns:
            List of (func, name, description, is_read_only) tuples with only read-only functions
        """
        return [
            (func, name, description, is_read_only) for func, name, description, is_read_only in functions if is_read_only
        ]
    
    
    @click.command()
    @click.option(
        "--transport",
        type=click.Choice(["stdio", "sse"]),
        default="stdio",
        help="Transport type",
    )
    @click.option(
        "--apis",
        type=click.Choice([api.value for api in APIType]),
        default=[api.value for api in APIType],
        multiple=True,
        help="APIs to run, default is all",
    )
    @click.option(
        "--read-only",
        is_flag=True,
        help="Only expose read-only tools (GET operations, no CREATE/UPDATE/DELETE)",
    )
    def main(transport: str, apis: list[str], read_only: bool) -> None:
        from src.server import app
    
        for api in apis:
            logging.debug(f"Adding API: {api}")
            get_function = APITYPE_TO_FUNCTIONS[APIType(api)]
            try:
                functions = get_function()
            except NotImplementedError:
                continue
    
            # Filter functions for read-only mode if requested
            if read_only:
                functions = filter_functions_for_read_only(functions)
    
            for func, name, description, *_ in functions:
                app.add_tool(func, name=name, description=description)
    
        if transport == "sse":
            logging.debug("Starting MCP server for Apache Airflow with SSE transport")
            app.run(transport="sse")
        else:
            logging.debug("Starting MCP server for Apache Airflow with stdio transport")
            app.run(transport="stdio")
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. 'Trigger' implies a write/mutation operation, but it doesn't specify permissions needed, side effects (e.g., if it starts execution immediately), error conditions, or response format. This is a significant gap for a tool with 12 parameters and no output schema.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is extremely concise with a single sentence, 'Trigger a DAG by ID', which is front-loaded and wastes no words. However, this conciseness comes at the cost of clarity and completeness for such a complex tool.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness1/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (12 parameters, 1 required), lack of annotations, 0% schema description coverage, and no output schema, the description is completely inadequate. It doesn't explain what 'trigger' entails, parameter meanings, or expected outcomes, making it insufficient for an agent to use the tool effectively.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, meaning none of the 12 parameters are documented in the schema. The description adds no information about parameters beyond 'by ID', which only hints at 'dag_id'. It doesn't explain the purpose of other parameters like 'dag_run_id', 'data_interval_start', or 'run_type', leaving them completely undocumented.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose3/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description 'Trigger a DAG by ID' states a clear verb ('trigger') and resource ('DAG'), but it's vague about what 'trigger' means in this context (e.g., start execution, schedule, or create a run). It doesn't distinguish from siblings like 'update_dag_run_state' or 'delete_dag_run', which involve DAG runs but with different actions.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives. With siblings like 'update_dag_run_state' and 'delete_dag_run', the description doesn't indicate if this is for initial runs, rescheduling, or other scenarios, leaving the agent to guess based on the name alone.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nikhil-ganage/mcp-server-airflow-token'

If you have feedback or need assistance with the MCP directory API, please join our Discord server