Celery 다수의 Worker & Queue 사용하는 방법
목차
Celery Multiple Worker
다수의 Worker를 사용하려면
이전에 /etc/systemd/system/celery.service
를 만들었는데 해당 파일을 수정해야 한다.
/etc/systemd/system/celery-default.service
를 새로 생성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Celery Default Service
After=network.target
[Service]
Type=forking
User=user_name
Group=user_name
Environment="PATH=/home/user_name/project_folder/venv/bin:/usr/bin:/bin:/usr/sbin:/sbin"
WorkingDirectory=/home/user_name/project_folder
ExecStart=/home/user_name/project_folder/venv/bin/celery -A celery_app.celery worker --loglevel=info -f celery.logs -E -n work-default@%h
Restart=always
[Install]
WantedBy=multi-user.target
/etc/systemd/system/celery-other.service
를 새로 생성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Celery Other Service
After=network.target
[Service]
Type=forking
User=user_name
Group=user_name
Environment="PATH=/home/user_name/project_folder/venv/bin:/usr/bin:/bin:/usr/sbin:/sbin"
WorkingDirectory=/home/user_name/project_folder
ExecStart=/home/user_name/project_folder/venv/bin/celery -A celery_app.celery worker --loglevel=info -f celery.logs -E -n work-other@%h
Restart=always
[Install]
WantedBy=multi-user.target
이후에 systemctl daemon-reload
를 실행한 뒤에
systemctl restart celery-default.service
, systemctl restart celery-other.service
를 각각 실행한다.
그렇게 하면 하나의 서버에서 work-default@/root
, work-other@/root
라는 워커가 생성된 것을 flower에서 확인할 수 있다.
Celery Multiple Queue
이제 각각의 Worker가 서로 다른 Queue를 사용하게 설정을 할 것 이다.
celery_app.py
파일을 아래와 같이 수정한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from kombu import Exchange, Queue
from celery import Celery
CELERY_QUEUE_DEFAULT = 'default'
CELERY_QUEUE_OTHER = 'other'
celery = Celery(__name__,
broker=f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}',
backend='rpc://')
celery.conf["task_default_queue"] = CELERY_QUEUE_DEFAULT
celery.conf["task_default_exchange_type"] = 'direct'
celery.conf["task_queues"] = (
Queue(
CELERY_QUEUE_DEFAULT,
Exchange(CELERY_QUEUE_DEFAULT, type='direct'),
routing_key=CELERY_QUEUE_DEFAULT,
),
Queue(
CELERY_QUEUE_OTHER,
Exchange(CELERY_QUEUE_OTHER, type='direct'),
routing_key=CELERY_QUEUE_OTHER,
),
)
celery.conf["task_routes"] = {
'app.directory_a.function_a': {
'queue': CELERY_QUEUE_OTHER,
'routing_key': CELERY_QUEUE_OTHER,
'exchnage': CELERY_QUEUE_OTHER
},
'app.directory_b.function_b': {
'queue': CELERY_QUEUE_DEFAULT,
'routing_key': CELERY_QUEUE_DEFAULT,
'exchnage': CELERY_QUEUE_DEFAULT
}
}
task_default_queue
로 기본 queue를 default
로 지정하고 task_default_exchange_type
를 direct
로 설정한다.
task_default_exchange_type
는 Celery에서 작업 큐를 처리할 때 교환 타입을 설정해서 메시지를 어떻게 분배할지를 결정하는데 사용된다.
- Direct Exchange: 메시지가 라우팅 키와 일치하는 큐로 직접 전달된다.
- Topic Exchange: 메시지가 라우팅 키의 패턴과 일치하는 큐로 전달된다.
- Fanout Exchange: 모든 바인딩된 큐로 메시지를 브로드캐스트된다.
- Headers Exchange: 메시지 헤더에 기반해 큐로 라우팅한다.
그 다음 systemctl restart celery-default.service
파일을 수정한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Celery Default Service
After=network.target
[Service]
Type=forking
User=user_name
Group=user_name
Environment="PATH=/home/user_name/project_folder/venv/bin:/usr/bin:/bin:/usr/sbin:/sbin"
WorkingDirectory=/home/user_name/project_folder
ExecStart=/home/user_name/project_folder/venv/bin/celery -A celery_app.celery worker --loglevel=info -f celery.logs -E -n work-default@%h --queues default --concurrency=4
Restart=always
[Install]
WantedBy=multi-user.target
/etc/systemd/system/celery-other.service
파일도 수정해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=Celery Other Service
After=network.target
[Service]
Type=forking
User=user_name
Group=user_name
Environment="PATH=/home/user_name/project_folder/venv/bin:/usr/bin:/bin:/usr/sbin:/sbin"
WorkingDirectory=/home/user_name/project_folder
ExecStart=/home/user_name/project_folder/venv/bin/celery -A celery_app.celery worker --loglevel=info -f celery.logs -E -n work-other@%h --queues other --concurrency=2
Restart=always
[Install]
WantedBy=multi-user.target
마찬가지로 systemctl daemon-reload
를 실행한 뒤에
systemctl restart celery-default.service
, systemctl restart celery-other.service
를 각각 실행한다.
그렇게 하고 flower에 들어가서 각 worker를 클릭해서 보면
Pool
탭에서 Max concurrency
각 데몬에서 설정한 것 처럼 4, 2로 설정된 것을 확인할 수 있다.
그리고 Queue
탭에서 데몬에서 설정한 Queue의 이름이 보이는 것을 확인할 수 있다.
celery.conf[“task_routes”]에 디렉토리별로 queue, exchange를 설정하면 설정한 값대로 자동으로 큐를 선택한다.
하지만 task_routes에 지정되지 않는 작업을 특정 큐로 보내고 싶다면
task에 @celery.task(bind=True, max_retries=5, acks_late=True, queue="new", exchange="new")
이런식으로 직접 설정을 하면 된다.
이렇게 하면 함수별로 명시적으로 지정이 된다.
Celery에서 다수의 Worker & Queue 사용하는 방법하는 방법을 알아봤습니다.
상황과 서버의 CPU, 메모리 스펙을 고려해서 Worker, Queue, concurrency를 조절해서 사용하면 될 것 같습니다.