Skip to main content
Glama

dag_design_guidance

Provides expert guidance on designing efficient Apache Airflow DAGs, covering task dependencies, dynamic generation, sensor patterns, XCom usage, testing strategies, and common pitfalls to avoid.

Instructions

Get detailed guidance on designing efficient Airflow DAGs.

Returns expert guidance on:

  • Task dependencies and parallelism

  • Dynamic DAG generation

  • Sensor patterns

  • XCom usage

  • Testing strategies

  • Common pitfalls to avoid

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The handler function for the `dag_design_guidance` tool, which returns the guidance text defined in `DAG_DESIGN_GUIDANCE`.
    @mcp.tool(name="dag_design_guidance")
    async def dag_design_guidance() -> str:
        """Get detailed guidance on designing efficient Airflow DAGs.
    
        Returns expert guidance on:
        - Task dependencies and parallelism
        - Dynamic DAG generation
        - Sensor patterns
        - XCom usage
        - Testing strategies
        - Common pitfalls to avoid
        """
        return DAG_DESIGN_GUIDANCE
  • The definition/schema of the guidance text used by the `dag_design_guidance` tool.
    DAG_DESIGN_GUIDANCE = """
    # Airflow DAG Design and Optimization Guide
    
    ## DAG Structure Fundamentals
    
    ### 1. Basic DAG Anatomy
    ```python
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    
    # Define default arguments
    default_args = {
        'owner': 'data-team',
        'depends_on_past': False,
        'start_date': datetime(2024, 1, 1),
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    # Instantiate DAG
    dag = DAG(
        'example_dag',
        default_args=default_args,
        description='A simple example DAG',
        schedule_interval='@daily',
        catchup=False,
        max_active_runs=1,
        tags=['example', 'tutorial'],
    )
    ```
    
    ### 2. Task Dependencies Patterns
    
    #### Linear Dependencies
    ```python
    # Simple chain
    task1 >> task2 >> task3
    
    # Alternative syntax
    chain(task1, task2, task3)
    ```
    
    #### Parallel Processing
    ```python
    # Fan-out/Fan-in pattern
    start >> [process_a, process_b, process_c] >> combine_results
    
    # Complex dependencies
    extract >> validate
    [transform_1, transform_2] >> aggregate
    validate >> [transform_1, transform_2]
    aggregate >> load
    ```
    
    #### Dynamic Task Generation
    ```python
    # Airflow 2.3+ Dynamic Task Mapping
    @task
    def get_files():
        return ['file1.csv', 'file2.csv', 'file3.csv']
    
    @task
    def process_file(filename: str):
        # Process individual file
        return f"Processed {filename}"
    
    @task
    def combine_results(results: list):
        return f"Combined {len(results)} results"
    
    with DAG('dynamic_tasks', ...) as dag:
        files = get_files()
        processed = process_file.expand(filename=files)
        combine_results(processed)
    ```
    
    ## Advanced Patterns
    
    ### 1. Conditional Execution
    ```python
    # Using BranchPythonOperator
    from airflow.operators.python import BranchPythonOperator
    
    def choose_branch(**context):
        if context['execution_date'].weekday() == 0:  # Monday
            return 'monday_task'
        return 'other_day_task'
    
    branching = BranchPythonOperator(
        task_id='branching',
        python_callable=choose_branch,
    )
    
    monday_task = PythonOperator(task_id='monday_task', ...)
    other_day_task = PythonOperator(task_id='other_day_task', ...)
    join = PythonOperator(
        task_id='join',
        trigger_rule='none_failed_or_skipped',
        ...
    )
    
    branching >> [monday_task, other_day_task] >> join
    ```
    
    ### 2. SubDAGs and Task Groups
    ```python
    # Task Groups (preferred over SubDAGs)
    from airflow.utils.task_group import TaskGroup
    
    with TaskGroup('processing_group') as processing:
        extract = PythonOperator(task_id='extract', ...)
        transform = PythonOperator(task_id='transform', ...)
        extract >> transform
    
    start >> processing >> end
    ```
    
    ### 3. Cross-DAG Dependencies
    ```python
    # Using ExternalTaskSensor
    from airflow.sensors.external_task import ExternalTaskSensor
    
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream_dag',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        mode='reschedule',
    )
    
    # Using TriggerDagRunOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    trigger_downstream = TriggerDagRunOperator(
        task_id='trigger_downstream',
        trigger_dag_id='downstream_dag',
        conf={'key': 'value'},
        wait_for_completion=True,
    )
    ```
    
    ## Sensor Patterns
    
    ### 1. Efficient Sensor Usage
    ```python
    # S3 Key Sensor with reschedule mode
    from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
    
    wait_for_file = S3KeySensor(
        task_id='wait_for_s3_file',
        bucket_name='my-bucket',
        bucket_key='data/{{ ds }}/input.csv',
        mode='reschedule',  # Frees up worker slot
        poke_interval=300,
        timeout=3600,
        soft_fail=True,  # Don't fail the DAG if timeout
    )
    
    # Custom sensor with exponential backoff
    class CustomSensor(BaseSensorOperator):
        def poke(self, context):
            # Custom logic
            return check_condition()
        
        def execute(self, context):
            self.poke_interval = 60  # Start with 1 minute
            super().execute(context)
            # Double interval each time (up to max)
            self.poke_interval = min(self.poke_interval * 2, 1800)
    ```
    
    ### 2. Smart Waiting Patterns
    ```python
    # Date-aware sensor
    from airflow.sensors.date_time import DateTimeSensor
    
    wait_for_time = DateTimeSensor(
        task_id='wait_for_0300',
        target_time=time(3, 0),  # 3:00 AM
        mode='reschedule',
    )
    
    # Timeout with fallback
    wait_with_fallback = S3KeySensor(
        task_id='wait_or_continue',
        bucket_key='optional-file.csv',
        timeout=1800,  # 30 minutes
        soft_fail=True,
    )
    ```
    
    ## XCom Best Practices
    
    ### 1. Proper XCom Usage
    ```python
    # Pushing to XCom
    @task
    def push_value():
        # Automatically pushed as return value
        return {'key': 'value', 'count': 42}
    
    # Explicit push
    def push_explicit(**context):
        context['task_instance'].xcom_push(key='my_key', value='my_value')
    
    # Pulling from XCom
    @task
    def pull_value(**context):
        # Pull from specific task
        value = context['task_instance'].xcom_pull(
            task_ids='push_value',
            key='return_value'
        )
        
        # Pull from multiple tasks
        values = context['task_instance'].xcom_pull(
            task_ids=['task1', 'task2']
        )
    ```
    
    ### 2. XCom Alternatives for Large Data
    ```python
    # Store reference in XCom, not data
    @task
    def process_large_data():
        # Process data and save to S3
        s3_key = f"s3://bucket/processed/{uuid.uuid4()}.parquet"
        # ... save data to s3_key
        
        # Return reference, not data
        return {'s3_key': s3_key, 'record_count': 1000000}
    
    # Use Airflow Variables for shared config
    from airflow.models import Variable
    
    @task
    def get_config():
        config = Variable.get('etl_config', deserialize_json=True)
        return config
    ```
    
    ## Testing Strategies
    
    ### 1. Unit Testing DAGs
    ```python
    import pytest
    from airflow.models import DagBag
    
    def test_dag_loaded():
        dagbag = DagBag()
        dag = dagbag.get_dag('my_dag')
        assert dag is not None
        assert len(dag.tasks) > 0
    
    def test_dag_structure():
        dag = get_dag('my_dag')
        
        # Check task dependencies
        task1 = dag.get_task('task1')
        task2 = dag.get_task('task2')
        
        assert task2 in task1.downstream_list
        
        # Check default args
        assert dag.default_args['retries'] == 2
    ```
    
    ### 2. Testing Individual Tasks
    ```python
    from airflow.models import TaskInstance
    from airflow.utils.state import State
    
    def test_task_execution():
        dag = get_dag('my_dag')
        task = dag.get_task('process_data')
        
        ti = TaskInstance(task=task, execution_date=datetime.now())
        ti.run(ignore_task_deps=True, test_mode=True)
        
        assert ti.state == State.SUCCESS
    ```
    
    ## Common Anti-Patterns to Avoid
    
    ### 1. ❌ Top-level Operations
    ```python
    # BAD: Executes during DAG parsing
    data = fetch_from_database()  # This runs every time DAG is parsed!
    
    with DAG('bad_dag', ...) as dag:
        process = PythonOperator(
            task_id='process',
            python_callable=lambda: process_data(data)
        )
    ```
    
    ### 2. ✅ Proper Pattern
    ```python
    # GOOD: Executes only when task runs
    def fetch_and_process():
        data = fetch_from_database()
        return process_data(data)
    
    with DAG('good_dag', ...) as dag:
        process = PythonOperator(
            task_id='process',
            python_callable=fetch_and_process
        )
    ```
    
    ### 3. ❌ Non-Idempotent Tasks
    ```python
    # BAD: Creates duplicate records on retry
    def insert_records():
        db.insert(generate_records())
    ```
    
    ### 4. ✅ Idempotent Alternative
    ```python
    # GOOD: Safe to retry
    def upsert_records(**context):
        execution_date = context['execution_date']
        records = generate_records(execution_date)
        
        # Delete existing records for this date
        db.delete(date=execution_date)
        # Insert new records
        db.insert(records)
    ```
    
    ## Performance Optimization Tips
    
    1. **Minimize DAG Parsing Time**: Keep imports light, avoid complex logic
    2. **Use Task Concurrency**: Set appropriate `max_active_tasks_per_dag`
    3. **Batch Operations**: Process multiple records in single task
    4. **Leverage Parallelism**: Use `max_active_runs` and parallel task design
    5. **Smart Scheduling**: Avoid overlapping schedule intervals
    6. **Connection Pooling**: Reuse database connections
    7. **Efficient Sensors**: Always use `mode='reschedule'` for long waits
    
    Remember: The goal is to create maintainable, efficient, and reliable workflows that scale with your data needs!

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paschmaria/mwaa-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server