trigger_dag_run
Initiate workflow execution in Amazon MWAA by starting a new DAG run with optional configuration and custom run ID.
Instructions
Trigger a new DAG run.
Args: environment_name: Name of the MWAA environment dag_id: The DAG ID to trigger dag_run_id: Custom run ID (optional, will be auto-generated if not provided) conf: Configuration JSON for the DAG run note: Optional note for the DAG run
Returns: Dictionary containing the created DAG run details
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment_name | Yes | ||
| dag_id | Yes | ||
| dag_run_id | No | ||
| conf | No | ||
| note | No |
Implementation Reference
- awslabs/mwaa_mcp_server/tools.py:285-310 (handler)The actual implementation of the trigger_dag_run tool logic, which invokes the Airflow API.
async def trigger_dag_run( self, environment_name: str, dag_id: str, dag_run_id: Optional[str] = None, conf: Optional[Dict[str, Any]] = None, note: Optional[str] = None, ) -> Dict[str, Any]: """Trigger a DAG run via Airflow API.""" self._check_readonly("trigger_dag_run") data: Dict[str, Any] = {} if dag_run_id: data["dag_run_id"] = dag_run_id else: data["dag_run_id"] = f"manual__{datetime.now(timezone.utc).isoformat()}" if conf: data["conf"] = conf if note: data["note"] = note return self._invoke_airflow_api( environment_name, "POST", f"/dags/{dag_id}/dagRuns", json_data=data ) - awslabs/mwaa_mcp_server/server.py:312-332 (registration)Tool registration for trigger_dag_run in the MCP server.
@mcp.tool(name="trigger_dag_run") async def trigger_dag_run( environment_name: str, dag_id: str, dag_run_id: Optional[str] = None, conf: Optional[Dict[str, Any]] = None, note: Optional[str] = None, ) -> Dict[str, Any]: """Trigger a new DAG run. Args: environment_name: Name of the MWAA environment dag_id: The DAG ID to trigger dag_run_id: Custom run ID (optional, will be auto-generated if not provided) conf: Configuration JSON for the DAG run note: Optional note for the DAG run Returns: Dictionary containing the created DAG run details """ return await tools.trigger_dag_run(environment_name, dag_id, dag_run_id, conf, note)