포스트

Airflow 동적으로 DAG 생성하기

목차

  1. Airflow 동적으로 DAG 생성

Airflow 동적으로 DAG 생성

지난 글에서 설정한 Variable을 이용해서 동적으로 DAG을 생성하려고 합니다.

1) 동적 DAG 생성 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from datetime import datetime, timedelta
from airflow.settings import Session
from airflow.models import Variable


session = Session()
variables = session.query(Variable).all()
dag_info_list = {var.key: var.val for var in variables}

def create_dag(dag_id, schedule_interval, default_args, start_date, task_infos):
    with DAG(dag_id=dag_id,
            schedule=schedule_interval,
            default_args=default_args,
            start_date=start_date,
            tags=[task_infos[0]['project']],
            is_paused_upon_creation=False, # unpause 상태로 생성
            catchup=False) as dag:

      task_map = {}
      for task_info in task_infos:
        current_task = PythonOperator(task_id=task_id,
                                      python_callable=test_function1,
                                      op_args=[task_info],
                                      dag=dag,
                                      trigger_rule=TriggerRule.ALL_DONE,
                                      queue="airflow_queue")

        task_map[task_id] = current_task
    
    # dependency 설정
    task_ids = list(task_map.keys())
    for i in range(len(task_ids) - 1):
        task_map[task_ids[i]] >> task_map[task_ids[i + 1]]

    return dag

def main():
  for dag_info in dag_info_list:
    schedule_interval= '0 0 * * *'
    default_args = {'owner': 'owner',
                    'retries': 5,
                    'retry_delay': timedelta(minutes=1),
                    'execution_timeout': timedelta(hours=1),
                    'email_on_failure': True,
                    'email_on_retry': True,
                    'depends_on_past': True}
    start_date = datetime(2025, 1, 1, 0, 0)
    info = Variable.get(dag_info, deserialize_json=True)

    globals()[dag_info] = create_dag(dag_info, schedule_interval, default_args, start_date, info)

main()


다음 글에서는 생성한 DAG을 중지, 삭제 하는 방법에 대해서 알아보겠습니다.