포스트

Celery Airflow

목차

  1. Celery Airflow

Celery Airflow

Airflow Celery Executor에 라이브러리 버전 요구사항을 확인해야한다.
AIRFLOW_HOME 환경변수를 설정한다. export AIRFLOW_HOME=~/airflow
이후 아래의 명령어를 통해서 설치한다.
pip install "apache-airflow[celery]==2.10.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.4/constraints-3.8.txt"
airflow가 설치되었는지 airflow version으로 확인한다.

1) airflow 데이터베이스 초기화

airflow db migrate 명령어를 실행한다.
다른 블로그에서는 airflow db init 명령어를 사용하는데 버전 업데이트 되면서 init 함수는 deprecated 되었다.

2) airflow 관리자 계정 생성

1
2
3
4
5
6
airflow users create \
    --username admin \
    --firstname name \
    --lastname name \
    --role Admin \
    --email admin@example.com

user 생성이 잘 되었는지 확인하려면 airflow users list 명령어를 실행한다.

3) airflow. cfg 설정하기

airflow.cfg 파일에서 executor를 Celery로 변경한다.

1
executor = CeleryExecutor

그리고 broker_url과 result_backend를 설정한다.

1
2
broker_url = amqp://user:id@localhost:port/vhost
result_backend = redis://:id@localhost:port/0

Celery Executor를 사용하려면 mysql이나 postgresql을 사용해야한다.
postgresql을 설치한다.

1
2
3
4
sudo apt update
sudo apt install postgresql postgresql-contrib
pip install psycopg2-binary
sudo -u postgres psql

데이터베이스 및 사용자 생성해준다.

1
2
3
4
5
6
CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'your_password';
ALTER ROLE airflow_user SET client_encoding TO 'utf8';
ALTER ROLE airflow_user SET default_transaction_isolation TO 'read committed';
ALTER ROLE airflow_user SET timezone TO 'Asia/Seoul';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

그리고 나서 다시 airflow.cfg 파일을 열고 수정한다.
sql_alchemy_conn = postgresql+psycopg2://airflow_user:your_password@localhost/airflow_db

airflow 시간대를 default_timezone = Asia/Seoul로 변경한다.
그 다음 example DAG을 보이지 않게 하기 위해서 load_examples = False로 수정한다.

그리고 새 데이터베이스로 마이그레이션을 초기화한다.
airflow db migrate

4) airflow 실행

1
2
3
airflow scheduler -D
airflow webserver -p 8080 -d
airflow celery worker -D -q airflow_queue

localhost:8080에 접속해서 접속이 잘 되는지 확인하면 된다.
localhost:5555에 접속해서 flower에서 celery worker와 queue를 확인해본다.
localhost:15672에 접속해서 rabbitmq에서 queue를 확인해본다.
이상이 없으면 잘 생성이 되었다.

나는 -D 옵션을 줘서 데몬으로 실행해도 되지만 systemd 서비스 파일을 만들기로 했다.
1) airflow scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Airflow Scheduler Daemon
After=network.target

[Service]
User={user}
Group={user}
WorkingDirectory=/home/{user}/{project_folder}
Environment="PATH=/home/{user}/{project_folder}/venv/bin:/usr/bin:/bin"
ExecStart=/home/{user}/{project_folder}/venv/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

2) airflow celery worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Airflow Worker Daemon
After=network.target

[Service]
User={user}
Group={user}
WorkingDirectory=/home/{user}/{project_folder}
Environment="PATH=/home/{user}/{project_folder}/venv/bin:/usr/bin:/bin"
ExecStart=/home/{user}/{project_folder}/venv/bin/airflow celery worker -q airflow_queue
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

3) airflow webserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Airflow Webserver Daemon
After=network.target

[Service]
User={user}
Group={user}
WorkingDirectory=/home/{user}/{project_folder}
Environment="PATH=/home/{user}/{project_folder}/venv/bin:/usr/bin:/bin"
ExecStart=/home/{user}/{project_folder}/venv/bin/airflow webserver -p 8080
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

이 후 작성한 서비스를 systmed에 등록한다.
sudo systemctl daemon-reload

서버 재부팅 시 자동으로 실행되도록 설정한다.

1
2
3
sudo systemctl enable airflow-scheduler
sudo systemctl enable airflow-celery-worker
sudo systemctl enable airflow-webserver

각 데몬을 실행한다.

1
2
3
sudo systemctl start airflow-scheduler
sudo systemctl start airflow-celery-worker
sudo systemctl start airflow-webserver

서비스 상태를 확인한다.

1
2
3
sudo systemctl status airflow-scheduler
sudo systemctl status airflow-celery-worker
sudo systemctl status airflow-webserver

로그를 확인하고 싶으면 아래 명령어를 실행한다.

1
2
3
sudo journalctl -u airflow-scheduler -f
sudo journalctl -u airflow-celery-worker -f
sudo journalctl -u airflow-webserver -f

5) airflow dag 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from random import randrange
import sys

# Python Path 추가
sys.path.append('/home/{user}/{project_folder}')

# Celery 작업 가져오기
from app.test.task import add

# Celery 작업을 호출하는 래퍼 함수
def trigger_celery_task(**kwargs):
    a = randrange(10)
    b = randrange(10)
    result = add.delay(a, b)  # Celery 작업 호출
    return result.id  # 작업 ID 반환

# DAG 정의
with DAG(
    dag_id='celery_integration_dag',
    schedule_interval=timedelta(minutes=1),  # 1분마다 실행
    start_date=datetime(2025, 1, 1),
    tags=['example'],
    catchup=False
) as dag:
    
    celery_task = PythonOperator(
        task_id='trigger_celery_task',
        python_callable=trigger_celery_task,
        queue="airflow_queue"
    )

celery_task


다음 글로는 airflow retry 관련해서 작성하겠습니다.