Celery
목차
Celery?
Celery는 분산 작업 큐다.
비동기 작업을 처리하고 스케줄링하기 위한 도구로, 주로 웹 애플리케이션에서 시간이 오래 걸리는 작업을 비동기적으로 처리할 때 사용된다.
Celery의 주요 구성 요소는 다음과 같다.
Tasks: Celery에서 실행할 작업을 나타낸다.
각 작업은 일반적으로 Python 함수로 정의되며, @app.task 데코레이터를 사용하여 Celery 애플리케이션에 등록된다.Workers: Celery 작업을 처리하는 프로세스다.
Workers는 Celery 애플리케이션과 연결되어 작업을 가져와 실행하고, 작업 결과를 반환한다.Broker: 작업 실행자와 애플리케이션 간의 통신을 관리하는 중간 매개체다.
Celery는 메시지 브로커를 사용하여 작업을 보내고 받는다.
RabbitMQ, Redis, Amazon SQS 등의 다양한 브로커를 지원한다.
4.Result Backend: Celery 작업의 결과를 저장하는 데 사용되는 백엔드.
작업의 결과는 주로 데이터베이스나 캐시 저장소에 저장된다.
Celery 설치
pip install celery
로 설치하면 된다.
Celery Redis 연결
celery_app.py
를 생성한다.
1
2
3
4
5
6
7
from celery import Celery
celery = Celery(__name__,
broker=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0",
backend=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0")
celery.conf.update(CELERY_TIMEZONE='Asia/Seoul',
CELERY_ENABLE_UTC=False)
이렇게 생성하고
celery -A celery_app.celery worker --loglevel=info -f celery.logs -E
를 실행해보면 celery가 잘 작동하는 것을 확인할 수 있다.
또 celery.logs 파일이 생성되는 것을 확인할 수 있다.
Celery Daemon
/etc/systemd/system/celery.service
파일을 생성한다.
1
2
3
4
5
6
7
8
9
10
11
12
[Unit]
Description=Celery Service
After=network.target
[Service]
Type=simple
User=juhopark
WorkingDirectory=디렉토리
ExecStart=디렉토리/celery -A celery_app.celery worker --loglevel=info -f celery.logs -E
[Install]
WantedBy=multi-user.target
시스템 등록
1
2
3
sudo systemctl daemon-reload
sudo systemctl enable celery
sudo systemctl start celery
동작 확인
sudo service celery status
Celery 예시
1
2
3
4
5
app
-- test
---- test_worker.py
celery_app.py
main.py
이런 예시 구조를 가지고 있는 상황이다.
그럼 celery_app.py를
1
2
3
4
celery = Celery(__name__,
broker=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/",
backend=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}",
include=['app.test.test_worker'])
이렇게 추가해주면 test_worker.py에 추가한 celery task가 celery에 등록이 된다.
test_worker.py에는 아래와 같은 코드를 작성한다.
1
2
3
4
5
6
from celery_app import celery
@celery.task
def add(x, y):
return x + y
main.py에서는 아래와 같은 코드를 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
from app.test.test_worker import add
import time
result = add.apply_async(args=[1, 2]) # 1,2를 파라미터로 전달하여 task를 queue에 등록
while not result.ready(): # 작업이 완료되었는지 확인
pass
else:
if result.successful(): # 작업이 성공적으로 완료했는지 확인
result_value = result.get()
else:
result_value = None # 작업 실패