dag_design_guidance
Provides expert guidance on designing efficient Apache Airflow DAGs, covering task dependencies, dynamic generation, sensor patterns, XCom usage, testing strategies, and common pitfalls to avoid.
Instructions
Get detailed guidance on designing efficient Airflow DAGs.
Returns expert guidance on:
Task dependencies and parallelism
Dynamic DAG generation
Sensor patterns
XCom usage
Testing strategies
Common pitfalls to avoid
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The handler function for the `dag_design_guidance` tool, which returns the guidance text defined in `DAG_DESIGN_GUIDANCE`.
@mcp.tool(name="dag_design_guidance") async def dag_design_guidance() -> str: """Get detailed guidance on designing efficient Airflow DAGs. Returns expert guidance on: - Task dependencies and parallelism - Dynamic DAG generation - Sensor patterns - XCom usage - Testing strategies - Common pitfalls to avoid """ return DAG_DESIGN_GUIDANCE - The definition/schema of the guidance text used by the `dag_design_guidance` tool.
DAG_DESIGN_GUIDANCE = """ # Airflow DAG Design and Optimization Guide ## DAG Structure Fundamentals ### 1. Basic DAG Anatomy ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta # Define default arguments default_args = { 'owner': 'data-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Instantiate DAG dag = DAG( 'example_dag', default_args=default_args, description='A simple example DAG', schedule_interval='@daily', catchup=False, max_active_runs=1, tags=['example', 'tutorial'], ) ``` ### 2. Task Dependencies Patterns #### Linear Dependencies ```python # Simple chain task1 >> task2 >> task3 # Alternative syntax chain(task1, task2, task3) ``` #### Parallel Processing ```python # Fan-out/Fan-in pattern start >> [process_a, process_b, process_c] >> combine_results # Complex dependencies extract >> validate [transform_1, transform_2] >> aggregate validate >> [transform_1, transform_2] aggregate >> load ``` #### Dynamic Task Generation ```python # Airflow 2.3+ Dynamic Task Mapping @task def get_files(): return ['file1.csv', 'file2.csv', 'file3.csv'] @task def process_file(filename: str): # Process individual file return f"Processed {filename}" @task def combine_results(results: list): return f"Combined {len(results)} results" with DAG('dynamic_tasks', ...) as dag: files = get_files() processed = process_file.expand(filename=files) combine_results(processed) ``` ## Advanced Patterns ### 1. Conditional Execution ```python # Using BranchPythonOperator from airflow.operators.python import BranchPythonOperator def choose_branch(**context): if context['execution_date'].weekday() == 0: # Monday return 'monday_task' return 'other_day_task' branching = BranchPythonOperator( task_id='branching', python_callable=choose_branch, ) monday_task = PythonOperator(task_id='monday_task', ...) other_day_task = PythonOperator(task_id='other_day_task', ...) join = PythonOperator( task_id='join', trigger_rule='none_failed_or_skipped', ... ) branching >> [monday_task, other_day_task] >> join ``` ### 2. SubDAGs and Task Groups ```python # Task Groups (preferred over SubDAGs) from airflow.utils.task_group import TaskGroup with TaskGroup('processing_group') as processing: extract = PythonOperator(task_id='extract', ...) transform = PythonOperator(task_id='transform', ...) extract >> transform start >> processing >> end ``` ### 3. Cross-DAG Dependencies ```python # Using ExternalTaskSensor from airflow.sensors.external_task import ExternalTaskSensor wait_for_upstream = ExternalTaskSensor( task_id='wait_for_upstream_dag', external_dag_id='upstream_dag', external_task_id='final_task', mode='reschedule', ) # Using TriggerDagRunOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator trigger_downstream = TriggerDagRunOperator( task_id='trigger_downstream', trigger_dag_id='downstream_dag', conf={'key': 'value'}, wait_for_completion=True, ) ``` ## Sensor Patterns ### 1. Efficient Sensor Usage ```python # S3 Key Sensor with reschedule mode from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor wait_for_file = S3KeySensor( task_id='wait_for_s3_file', bucket_name='my-bucket', bucket_key='data/{{ ds }}/input.csv', mode='reschedule', # Frees up worker slot poke_interval=300, timeout=3600, soft_fail=True, # Don't fail the DAG if timeout ) # Custom sensor with exponential backoff class CustomSensor(BaseSensorOperator): def poke(self, context): # Custom logic return check_condition() def execute(self, context): self.poke_interval = 60 # Start with 1 minute super().execute(context) # Double interval each time (up to max) self.poke_interval = min(self.poke_interval * 2, 1800) ``` ### 2. Smart Waiting Patterns ```python # Date-aware sensor from airflow.sensors.date_time import DateTimeSensor wait_for_time = DateTimeSensor( task_id='wait_for_0300', target_time=time(3, 0), # 3:00 AM mode='reschedule', ) # Timeout with fallback wait_with_fallback = S3KeySensor( task_id='wait_or_continue', bucket_key='optional-file.csv', timeout=1800, # 30 minutes soft_fail=True, ) ``` ## XCom Best Practices ### 1. Proper XCom Usage ```python # Pushing to XCom @task def push_value(): # Automatically pushed as return value return {'key': 'value', 'count': 42} # Explicit push def push_explicit(**context): context['task_instance'].xcom_push(key='my_key', value='my_value') # Pulling from XCom @task def pull_value(**context): # Pull from specific task value = context['task_instance'].xcom_pull( task_ids='push_value', key='return_value' ) # Pull from multiple tasks values = context['task_instance'].xcom_pull( task_ids=['task1', 'task2'] ) ``` ### 2. XCom Alternatives for Large Data ```python # Store reference in XCom, not data @task def process_large_data(): # Process data and save to S3 s3_key = f"s3://bucket/processed/{uuid.uuid4()}.parquet" # ... save data to s3_key # Return reference, not data return {'s3_key': s3_key, 'record_count': 1000000} # Use Airflow Variables for shared config from airflow.models import Variable @task def get_config(): config = Variable.get('etl_config', deserialize_json=True) return config ``` ## Testing Strategies ### 1. Unit Testing DAGs ```python import pytest from airflow.models import DagBag def test_dag_loaded(): dagbag = DagBag() dag = dagbag.get_dag('my_dag') assert dag is not None assert len(dag.tasks) > 0 def test_dag_structure(): dag = get_dag('my_dag') # Check task dependencies task1 = dag.get_task('task1') task2 = dag.get_task('task2') assert task2 in task1.downstream_list # Check default args assert dag.default_args['retries'] == 2 ``` ### 2. Testing Individual Tasks ```python from airflow.models import TaskInstance from airflow.utils.state import State def test_task_execution(): dag = get_dag('my_dag') task = dag.get_task('process_data') ti = TaskInstance(task=task, execution_date=datetime.now()) ti.run(ignore_task_deps=True, test_mode=True) assert ti.state == State.SUCCESS ``` ## Common Anti-Patterns to Avoid ### 1. ❌ Top-level Operations ```python # BAD: Executes during DAG parsing data = fetch_from_database() # This runs every time DAG is parsed! with DAG('bad_dag', ...) as dag: process = PythonOperator( task_id='process', python_callable=lambda: process_data(data) ) ``` ### 2. ✅ Proper Pattern ```python # GOOD: Executes only when task runs def fetch_and_process(): data = fetch_from_database() return process_data(data) with DAG('good_dag', ...) as dag: process = PythonOperator( task_id='process', python_callable=fetch_and_process ) ``` ### 3. ❌ Non-Idempotent Tasks ```python # BAD: Creates duplicate records on retry def insert_records(): db.insert(generate_records()) ``` ### 4. ✅ Idempotent Alternative ```python # GOOD: Safe to retry def upsert_records(**context): execution_date = context['execution_date'] records = generate_records(execution_date) # Delete existing records for this date db.delete(date=execution_date) # Insert new records db.insert(records) ``` ## Performance Optimization Tips 1. **Minimize DAG Parsing Time**: Keep imports light, avoid complex logic 2. **Use Task Concurrency**: Set appropriate `max_active_tasks_per_dag` 3. **Batch Operations**: Process multiple records in single task 4. **Leverage Parallelism**: Use `max_active_runs` and parallel task design 5. **Smart Scheduling**: Avoid overlapping schedule intervals 6. **Connection Pooling**: Reuse database connections 7. **Efficient Sensors**: Always use `mode='reschedule'` for long waits Remember: The goal is to create maintainable, efficient, and reliable workflows that scale with your data needs!