Skip to main content
Glama

create_environment

Create a new Amazon MWAA environment by configuring DAG storage, network settings, Airflow version, worker capacity, and access controls for workflow orchestration.

Instructions

Create a new MWAA environment.

Args: name: Environment name dag_s3_path: S3 path to DAGs folder (e.g., s3://bucket/dags) execution_role_arn: IAM role ARN for the environment network_configuration: VPC configuration with SubnetIds and SecurityGroupIds source_bucket_arn: ARN of the S3 bucket containing DAGs airflow_version: Apache Airflow version (e.g., "2.7.2") environment_class: Environment size (mw1.small, mw1.medium, mw1.large, mw1.xlarge, mw1.2xlarge) max_workers: Maximum number of workers (1-25) min_workers: Minimum number of workers (1-25) schedulers: Number of schedulers (2-5) webserver_access_mode: PUBLIC_ONLY or PRIVATE_ONLY weekly_maintenance_window_start: Maintenance window start (e.g., "SUN:03:00") tags: Resource tags airflow_configuration_options: Airflow configuration overrides logging_configuration: Logging settings for different components requirements_s3_path: S3 path to requirements.txt plugins_s3_path: S3 path to plugins.zip startup_script_s3_path: S3 path to startup script

Returns: Dictionary containing the ARN of the created environment

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
nameYes
dag_s3_pathYes
execution_role_arnYes
network_configurationYes
source_bucket_arnYes
airflow_versionNo
environment_classNo
max_workersNo
min_workersNo
schedulersNo
webserver_access_modeNo
weekly_maintenance_window_startNo
tagsNo
airflow_configuration_optionsNo
logging_configurationNo
requirements_s3_pathNo
plugins_s3_pathNo
startup_script_s3_pathNo

Implementation Reference

  • Actual implementation of create_environment tool in MWAATools class using boto3.
    async def create_environment(self, **kwargs: Any) -> Dict[str, Any]:
        """Create a new MWAA environment."""
        self._check_readonly("create_environment")
    
        try:
            params = {k: v for k, v in kwargs.items() if v is not None}
    
            boto_params: Dict[str, Any] = {}
            param_mapping = {
                "name": "Name",
                "dag_s3_path": "DagS3Path",
                "execution_role_arn": "ExecutionRoleArn",
                "network_configuration": "NetworkConfiguration",
                "source_bucket_arn": "SourceBucketArn",
                "airflow_version": "AirflowVersion",
                "environment_class": "EnvironmentClass",
                "max_workers": "MaxWorkers",
                "min_workers": "MinWorkers",
                "schedulers": "Schedulers",
                "webserver_access_mode": "WebserverAccessMode",
                "weekly_maintenance_window_start": "WeeklyMaintenanceWindowStart",
                "tags": "Tags",
                "airflow_configuration_options": "AirflowConfigurationOptions",
                "logging_configuration": "LoggingConfiguration",
                "requirements_s3_path": "RequirementsS3Path",
                "plugins_s3_path": "PluginsS3Path",
                "startup_script_s3_path": "StartupScriptS3Path",
            }
    
            for snake_key, value in params.items():
                if snake_key in param_mapping:
                    boto_params[param_mapping[snake_key]] = value
    
            response = self.mwaa_client.create_environment(**boto_params)
            return {"Arn": response["Arn"]}
    
        except (ClientError, BotoCoreError) as e:
            logger.error("Error creating environment: %s", e)
            return {"error": str(e)}
  • Registration of create_environment tool in server.py using FastMCP.
    @mcp.tool(name="create_environment")
    async def create_environment(
        name: str,
        dag_s3_path: str,
        execution_role_arn: str,
        network_configuration: Dict[str, Any],
        source_bucket_arn: str,
        airflow_version: Optional[str] = None,
        environment_class: Optional[str] = None,
        max_workers: Optional[int] = None,
        min_workers: Optional[int] = None,
        schedulers: Optional[int] = None,
        webserver_access_mode: Optional[str] = None,
        weekly_maintenance_window_start: Optional[str] = None,
        tags: Optional[Dict[str, str]] = None,
        airflow_configuration_options: Optional[Dict[str, str]] = None,
        logging_configuration: Optional[Dict[str, Any]] = None,
        requirements_s3_path: Optional[str] = None,
        plugins_s3_path: Optional[str] = None,
        startup_script_s3_path: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Create a new MWAA environment.
    
        Args:
            name: Environment name
            dag_s3_path: S3 path to DAGs folder (e.g., s3://bucket/dags)
            execution_role_arn: IAM role ARN for the environment
            network_configuration: VPC configuration with SubnetIds and SecurityGroupIds
            source_bucket_arn: ARN of the S3 bucket containing DAGs
            airflow_version: Apache Airflow version (e.g., "2.7.2")
            environment_class: Environment size (mw1.small, mw1.medium, mw1.large, mw1.xlarge, mw1.2xlarge)
            max_workers: Maximum number of workers (1-25)
            min_workers: Minimum number of workers (1-25)
            schedulers: Number of schedulers (2-5)
            webserver_access_mode: PUBLIC_ONLY or PRIVATE_ONLY
            weekly_maintenance_window_start: Maintenance window start (e.g., "SUN:03:00")
            tags: Resource tags
            airflow_configuration_options: Airflow configuration overrides
            logging_configuration: Logging settings for different components
            requirements_s3_path: S3 path to requirements.txt
            plugins_s3_path: S3 path to plugins.zip
            startup_script_s3_path: S3 path to startup script
    
        Returns:
            Dictionary containing the ARN of the created environment
        """
        max_workers_int = int(max_workers) if max_workers is not None else None
        min_workers_int = int(min_workers) if min_workers is not None else None
        schedulers_int = int(schedulers) if schedulers is not None else None
    
        return await tools.create_environment(
            name=name,
            dag_s3_path=dag_s3_path,
            execution_role_arn=execution_role_arn,
            network_configuration=network_configuration,
            source_bucket_arn=source_bucket_arn,
            airflow_version=airflow_version,
            environment_class=environment_class,
            max_workers=max_workers_int,
            min_workers=min_workers_int,
            schedulers=schedulers_int,
            webserver_access_mode=webserver_access_mode,
            weekly_maintenance_window_start=weekly_maintenance_window_start,
            tags=tags,
            airflow_configuration_options=airflow_configuration_options,
            logging_configuration=logging_configuration,
            requirements_s3_path=requirements_s3_path,
            plugins_s3_path=plugins_s3_path,
            startup_script_s3_path=startup_script_s3_path,
        )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paschmaria/mwaa-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server