Skip to main content
Glama

airflow_best_practices

Provides best practices guidance for designing, optimizing, and securing Apache Airflow workflows on Amazon MWAA, covering DAG patterns, performance, resource management, error handling, and security.

Instructions

Get MWAA and Apache Airflow best practices guidance.

Returns comprehensive guidance on:

  • DAG design patterns

  • Performance optimization

  • Resource management

  • Error handling

  • Security best practices

  • MWAA-specific considerations

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The tool handler that registers 'airflow_best_practices' and returns the guidance content.
    @mcp.tool(name="airflow_best_practices")
    async def airflow_best_practices() -> str:
        """Get MWAA and Apache Airflow best practices guidance.
    
        Returns comprehensive guidance on:
        - DAG design patterns
        - Performance optimization
        - Resource management
        - Error handling
        - Security best practices
        - MWAA-specific considerations
        """
        return AIRFLOW_BEST_PRACTICES
  • The constant containing the actual best practices text content returned by the handler.
    AIRFLOW_BEST_PRACTICES = """
    # MWAA and Apache Airflow Best Practices Guide
    
    ## Environment Setup and Configuration
    
    ### 1. MWAA Environment Sizing
    - **Start Small**: Begin with mw1.small and scale up based on actual usage
    - **Monitor Metrics**: Use CloudWatch metrics to track worker utilization
    - **Auto-scaling**: Configure min/max workers appropriately (typically 1-10 for most workloads)
    - **Scheduler Count**: Use 2 schedulers for HA, increase only for very large deployments
    
    ### 2. S3 Bucket Organization
    ```
    s3://your-mwaa-bucket/
    ├── dags/
    │   ├── main_workflow.py
    │   └── utils/
    │       └── helpers.py
    ├── plugins/
    │   └── custom_operators.zip
    ├── requirements/
    │   └── requirements.txt
    └── scripts/
        └── startup.sh
    ```
    
    ### 3. Requirements Management
    - Pin all package versions in requirements.txt
    - Test requirements locally before deploying
    - Use constraints files for Airflow providers
    - Keep requirements minimal to reduce startup time
    
    ## DAG Design Patterns
    
    ### 1. Idempotency
    - Design tasks to be re-runnable without side effects
    - Use upsert operations instead of inserts
    - Include date partitioning in data operations
    
    ### 2. Task Dependencies
    ```python
    # Good: Clear linear dependencies
    extract >> transform >> load
    
    # Better: Parallel where possible
    extract >> [transform_a, transform_b] >> load
    
    # Best: Dynamic task mapping (Airflow 2.3+)
    @task
    def process_file(filename):
        # Process individual file
        pass
    
    filenames = get_files()
    process_file.expand(filename=filenames)
    ```
    
    ### 3. Error Handling
    ```python
    # Set appropriate retries
    default_args = {
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': timedelta(minutes=30),
    }
    
    # Use trigger rules for error paths
    error_handler = EmptyOperator(
        task_id='handle_errors',
        trigger_rule='one_failed'
    )
    ```
    
    ## Performance Optimization
    
    ### 1. DAG Loading Time
    - Keep DAG files small and focused
    - Avoid heavy imports at module level
    - Use Jinja templating instead of Python loops for static DAGs
    - Limit the number of DAGs per file
    
    ### 2. Task Execution
    - Use appropriate task concurrency limits
    - Implement connection pooling for databases
    - Batch operations where possible
    - Use XCom sparingly (max 48KB per XCom)
    
    ### 3. Sensor Optimization
    ```python
    # Use reschedule mode for long-running sensors
    s3_sensor = S3KeySensor(
        task_id='wait_for_file',
        bucket_key='s3://bucket/key',
        mode='reschedule',  # Don't occupy worker slot
        poke_interval=300,  # Check every 5 minutes
        timeout=3600,       # 1 hour timeout
    )
    ```
    
    ## Security Best Practices
    
    ### 1. IAM Roles
    - Use separate execution role with minimal permissions
    - Implement role assumption for cross-account access
    - Avoid hardcoding credentials
    
    ### 2. Secrets Management
    ```python
    # Use Airflow connections and variables
    from airflow.models import Variable
    from airflow.hooks.base import BaseHook
    
    # Get connection
    conn = BaseHook.get_connection('my_db')
    
    # Get variable (with default)
    api_key = Variable.get('api_key', default_var='')
    
    # Use AWS Secrets Manager
    from airflow.providers.amazon.aws.hooks.secrets_manager import SecretsManagerHook
    hook = SecretsManagerHook(conn_id='aws_default')
    secret = hook.get_secret('my-secret')
    ```
    
    ### 3. Network Security
    - Use private subnets for MWAA
    - Implement VPC endpoints for AWS services
    - Configure security groups with minimal access
    
    ## Monitoring and Alerting
    
    ### 1. CloudWatch Integration
    - Monitor key metrics: CPU, memory, task duration
    - Set up alarms for failed tasks and DAG failures
    - Use custom metrics for business KPIs
    
    ### 2. Logging Best Practices
    ```python
    import logging
    
    logger = logging.getLogger(__name__)
    
    @task
    def process_data():
        logger.info("Starting data processing")
        try:
            # Process data
            logger.info(f"Processed {record_count} records")
        except Exception as e:
            logger.error(f"Processing failed: {str(e)}")
            raise
    ```
    
    ### 3. SLA Management
    ```python
    # Set SLA for critical tasks
    critical_task = PythonOperator(
        task_id='critical_process',
        python_callable=process_critical_data,
        sla=timedelta(hours=2),
    )
    
    # Define SLA miss callback
    def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
        # Send notification
        pass
    
    dag.sla_miss_callback = sla_miss_callback
    ```
    
    ## Cost Optimization
    
    ### 1. Environment Management
    - Pause environments during non-business hours
    - Use smaller environments for dev/test
    - Clean up old logs and artifacts
    
    ### 2. Task Efficiency
    - Minimize task runtime
    - Use appropriate instance types
    - Batch small tasks together
    
    ### 3. Data Transfer
    - Process data in the same region
    - Use VPC endpoints to avoid NAT gateway costs
    - Compress data before transfer
    
    ## Common Pitfalls to Avoid
    
    1. **Top-level Code**: Avoid database queries or API calls at DAG parse time
    2. **Large XComs**: Don't pass large data through XCom
    3. **Dynamic DAGs**: Be careful with dynamic DAG generation performance
    4. **Missing Cleanup**: Always clean up temporary resources
    5. **Hardcoded Dates**: Use Airflow's execution_date context
    6. **Ignoring Idempotency**: Ensure all tasks can be safely re-run
    7. **Over-scheduling**: Don't schedule DAGs more frequently than needed
    8. **Resource Leaks**: Close connections and clean up resources
    
    ## MWAA-Specific Considerations
    
    ### 1. Limitations
    - No kubectl access to underlying Kubernetes
    - Limited pip packages (must be compatible with Amazon Linux)
    - Maximum environment size constraints
    - No direct database access
    
    ### 2. Migration Tips
    - Test DAGs in MWAA development environment
    - Verify all dependencies are available
    - Update connection strings and credentials
    - Plan for downtime during migration
    
    ### 3. Troubleshooting
    - Check CloudWatch logs for detailed errors
    - Verify S3 permissions and bucket policies
    - Ensure VPC configuration allows internet access (for PyPI)
    - Monitor environment health metrics
    
    Remember: Always test changes in a development environment before deploying to production!
    """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paschmaria/mwaa-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server