Skip to main content
Glama

dag_design_guidance

Provides expert guidance on designing efficient Apache Airflow DAGs, covering task dependencies, dynamic generation, sensor patterns, XCom usage, testing strategies, and common pitfalls to avoid.

Instructions

Get detailed guidance on designing efficient Airflow DAGs.

Returns expert guidance on:

  • Task dependencies and parallelism

  • Dynamic DAG generation

  • Sensor patterns

  • XCom usage

  • Testing strategies

  • Common pitfalls to avoid

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The handler function for the `dag_design_guidance` tool, which returns the guidance text defined in `DAG_DESIGN_GUIDANCE`.
    @mcp.tool(name="dag_design_guidance")
    async def dag_design_guidance() -> str:
        """Get detailed guidance on designing efficient Airflow DAGs.
    
        Returns expert guidance on:
        - Task dependencies and parallelism
        - Dynamic DAG generation
        - Sensor patterns
        - XCom usage
        - Testing strategies
        - Common pitfalls to avoid
        """
        return DAG_DESIGN_GUIDANCE
  • The definition/schema of the guidance text used by the `dag_design_guidance` tool.
    DAG_DESIGN_GUIDANCE = """
    # Airflow DAG Design and Optimization Guide
    
    ## DAG Structure Fundamentals
    
    ### 1. Basic DAG Anatomy
    ```python
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    
    # Define default arguments
    default_args = {
        'owner': 'data-team',
        'depends_on_past': False,
        'start_date': datetime(2024, 1, 1),
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    # Instantiate DAG
    dag = DAG(
        'example_dag',
        default_args=default_args,
        description='A simple example DAG',
        schedule_interval='@daily',
        catchup=False,
        max_active_runs=1,
        tags=['example', 'tutorial'],
    )
    ```
    
    ### 2. Task Dependencies Patterns
    
    #### Linear Dependencies
    ```python
    # Simple chain
    task1 >> task2 >> task3
    
    # Alternative syntax
    chain(task1, task2, task3)
    ```
    
    #### Parallel Processing
    ```python
    # Fan-out/Fan-in pattern
    start >> [process_a, process_b, process_c] >> combine_results
    
    # Complex dependencies
    extract >> validate
    [transform_1, transform_2] >> aggregate
    validate >> [transform_1, transform_2]
    aggregate >> load
    ```
    
    #### Dynamic Task Generation
    ```python
    # Airflow 2.3+ Dynamic Task Mapping
    @task
    def get_files():
        return ['file1.csv', 'file2.csv', 'file3.csv']
    
    @task
    def process_file(filename: str):
        # Process individual file
        return f"Processed {filename}"
    
    @task
    def combine_results(results: list):
        return f"Combined {len(results)} results"
    
    with DAG('dynamic_tasks', ...) as dag:
        files = get_files()
        processed = process_file.expand(filename=files)
        combine_results(processed)
    ```
    
    ## Advanced Patterns
    
    ### 1. Conditional Execution
    ```python
    # Using BranchPythonOperator
    from airflow.operators.python import BranchPythonOperator
    
    def choose_branch(**context):
        if context['execution_date'].weekday() == 0:  # Monday
            return 'monday_task'
        return 'other_day_task'
    
    branching = BranchPythonOperator(
        task_id='branching',
        python_callable=choose_branch,
    )
    
    monday_task = PythonOperator(task_id='monday_task', ...)
    other_day_task = PythonOperator(task_id='other_day_task', ...)
    join = PythonOperator(
        task_id='join',
        trigger_rule='none_failed_or_skipped',
        ...
    )
    
    branching >> [monday_task, other_day_task] >> join
    ```
    
    ### 2. SubDAGs and Task Groups
    ```python
    # Task Groups (preferred over SubDAGs)
    from airflow.utils.task_group import TaskGroup
    
    with TaskGroup('processing_group') as processing:
        extract = PythonOperator(task_id='extract', ...)
        transform = PythonOperator(task_id='transform', ...)
        extract >> transform
    
    start >> processing >> end
    ```
    
    ### 3. Cross-DAG Dependencies
    ```python
    # Using ExternalTaskSensor
    from airflow.sensors.external_task import ExternalTaskSensor
    
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream_dag',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        mode='reschedule',
    )
    
    # Using TriggerDagRunOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    trigger_downstream = TriggerDagRunOperator(
        task_id='trigger_downstream',
        trigger_dag_id='downstream_dag',
        conf={'key': 'value'},
        wait_for_completion=True,
    )
    ```
    
    ## Sensor Patterns
    
    ### 1. Efficient Sensor Usage
    ```python
    # S3 Key Sensor with reschedule mode
    from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
    
    wait_for_file = S3KeySensor(
        task_id='wait_for_s3_file',
        bucket_name='my-bucket',
        bucket_key='data/{{ ds }}/input.csv',
        mode='reschedule',  # Frees up worker slot
        poke_interval=300,
        timeout=3600,
        soft_fail=True,  # Don't fail the DAG if timeout
    )
    
    # Custom sensor with exponential backoff
    class CustomSensor(BaseSensorOperator):
        def poke(self, context):
            # Custom logic
            return check_condition()
        
        def execute(self, context):
            self.poke_interval = 60  # Start with 1 minute
            super().execute(context)
            # Double interval each time (up to max)
            self.poke_interval = min(self.poke_interval * 2, 1800)
    ```
    
    ### 2. Smart Waiting Patterns
    ```python
    # Date-aware sensor
    from airflow.sensors.date_time import DateTimeSensor
    
    wait_for_time = DateTimeSensor(
        task_id='wait_for_0300',
        target_time=time(3, 0),  # 3:00 AM
        mode='reschedule',
    )
    
    # Timeout with fallback
    wait_with_fallback = S3KeySensor(
        task_id='wait_or_continue',
        bucket_key='optional-file.csv',
        timeout=1800,  # 30 minutes
        soft_fail=True,
    )
    ```
    
    ## XCom Best Practices
    
    ### 1. Proper XCom Usage
    ```python
    # Pushing to XCom
    @task
    def push_value():
        # Automatically pushed as return value
        return {'key': 'value', 'count': 42}
    
    # Explicit push
    def push_explicit(**context):
        context['task_instance'].xcom_push(key='my_key', value='my_value')
    
    # Pulling from XCom
    @task
    def pull_value(**context):
        # Pull from specific task
        value = context['task_instance'].xcom_pull(
            task_ids='push_value',
            key='return_value'
        )
        
        # Pull from multiple tasks
        values = context['task_instance'].xcom_pull(
            task_ids=['task1', 'task2']
        )
    ```
    
    ### 2. XCom Alternatives for Large Data
    ```python
    # Store reference in XCom, not data
    @task
    def process_large_data():
        # Process data and save to S3
        s3_key = f"s3://bucket/processed/{uuid.uuid4()}.parquet"
        # ... save data to s3_key
        
        # Return reference, not data
        return {'s3_key': s3_key, 'record_count': 1000000}
    
    # Use Airflow Variables for shared config
    from airflow.models import Variable
    
    @task
    def get_config():
        config = Variable.get('etl_config', deserialize_json=True)
        return config
    ```
    
    ## Testing Strategies
    
    ### 1. Unit Testing DAGs
    ```python
    import pytest
    from airflow.models import DagBag
    
    def test_dag_loaded():
        dagbag = DagBag()
        dag = dagbag.get_dag('my_dag')
        assert dag is not None
        assert len(dag.tasks) > 0
    
    def test_dag_structure():
        dag = get_dag('my_dag')
        
        # Check task dependencies
        task1 = dag.get_task('task1')
        task2 = dag.get_task('task2')
        
        assert task2 in task1.downstream_list
        
        # Check default args
        assert dag.default_args['retries'] == 2
    ```
    
    ### 2. Testing Individual Tasks
    ```python
    from airflow.models import TaskInstance
    from airflow.utils.state import State
    
    def test_task_execution():
        dag = get_dag('my_dag')
        task = dag.get_task('process_data')
        
        ti = TaskInstance(task=task, execution_date=datetime.now())
        ti.run(ignore_task_deps=True, test_mode=True)
        
        assert ti.state == State.SUCCESS
    ```
    
    ## Common Anti-Patterns to Avoid
    
    ### 1. ❌ Top-level Operations
    ```python
    # BAD: Executes during DAG parsing
    data = fetch_from_database()  # This runs every time DAG is parsed!
    
    with DAG('bad_dag', ...) as dag:
        process = PythonOperator(
            task_id='process',
            python_callable=lambda: process_data(data)
        )
    ```
    
    ### 2. ✅ Proper Pattern
    ```python
    # GOOD: Executes only when task runs
    def fetch_and_process():
        data = fetch_from_database()
        return process_data(data)
    
    with DAG('good_dag', ...) as dag:
        process = PythonOperator(
            task_id='process',
            python_callable=fetch_and_process
        )
    ```
    
    ### 3. ❌ Non-Idempotent Tasks
    ```python
    # BAD: Creates duplicate records on retry
    def insert_records():
        db.insert(generate_records())
    ```
    
    ### 4. ✅ Idempotent Alternative
    ```python
    # GOOD: Safe to retry
    def upsert_records(**context):
        execution_date = context['execution_date']
        records = generate_records(execution_date)
        
        # Delete existing records for this date
        db.delete(date=execution_date)
        # Insert new records
        db.insert(records)
    ```
    
    ## Performance Optimization Tips
    
    1. **Minimize DAG Parsing Time**: Keep imports light, avoid complex logic
    2. **Use Task Concurrency**: Set appropriate `max_active_tasks_per_dag`
    3. **Batch Operations**: Process multiple records in single task
    4. **Leverage Parallelism**: Use `max_active_runs` and parallel task design
    5. **Smart Scheduling**: Avoid overlapping schedule intervals
    6. **Connection Pooling**: Reuse database connections
    7. **Efficient Sensors**: Always use `mode='reschedule'` for long waits
    
    Remember: The goal is to create maintainable, efficient, and reliable workflows that scale with your data needs!
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations provided, so description carries full burden. Mentions 'Returns expert guidance' indicating read-only nature, but lacks details on whether guidance is static/dynamic, cached, or any rate limits. Adequate but minimal behavioral disclosure.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Front-loaded purpose statement followed by bulleted list of specific guidance topics. No redundant text. Each sentence earns its place by defining scope or output content.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Appropriate for a zero-parameter tool with output schema. Description adequately summarizes output content (guidance topics) without needing to duplicate full schema definitions. Could improve by mentioning this requires no inputs.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Zero parameters present per schema (empty object with additionalProperties: false). Per rubric, 0 params = baseline 4. Schema requires no additional semantic explanation in description.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

States clear verb+resource ('Get detailed guidance on designing efficient Airflow DAGs') and distinguishes from operational siblings like get_dag or create_environment via specific topic coverage (XCom, Sensors, etc.). However, does not explicitly differentiate from sibling 'airflow_best_practices' which may overlap in scope.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

Lists output topics but provides no guidance on when to use this tool versus 'airflow_best_practices' or other alternatives. No mention of prerequisites or when NOT to use (e.g., when seeking actual DAG code vs design guidance).

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paschmaria/mwaa-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server