포스트

Celery

목차

  1. Celery?
  2. Celery 설치
  3. Celery Redis 연결
  4. Celery Daemon
  5. Celery 예시

Celery?

Celery는 분산 작업 큐다.
비동기 작업을 처리하고 스케줄링하기 위한 도구로, 주로 웹 애플리케이션에서 시간이 오래 걸리는 작업을 비동기적으로 처리할 때 사용된다.

Celery의 주요 구성 요소는 다음과 같다.

  1. Tasks: Celery에서 실행할 작업을 나타낸다.
    각 작업은 일반적으로 Python 함수로 정의되며, @app.task 데코레이터를 사용하여 Celery 애플리케이션에 등록된다.

  2. Workers: Celery 작업을 처리하는 프로세스다.
    Workers는 Celery 애플리케이션과 연결되어 작업을 가져와 실행하고, 작업 결과를 반환한다.

  3. Broker: 작업 실행자와 애플리케이션 간의 통신을 관리하는 중간 매개체다.
    Celery는 메시지 브로커를 사용하여 작업을 보내고 받는다.
    RabbitMQ, Redis, Amazon SQS 등의 다양한 브로커를 지원한다.

4.Result Backend: Celery 작업의 결과를 저장하는 데 사용되는 백엔드.
작업의 결과는 주로 데이터베이스나 캐시 저장소에 저장된다.

Celery 설치

pip install celery로 설치하면 된다.

Celery Redis 연결

celery_app.py를 생성한다.

1
2
3
4
5
6
7
from celery import Celery

celery = Celery(__name__,
                broker=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0",
                backend=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0")
celery.conf.update(CELERY_TIMEZONE='Asia/Seoul',
                   CELERY_ENABLE_UTC=False)           

이렇게 생성하고

celery -A celery_app.celery worker --loglevel=info -f celery.logs -E를 실행해보면 celery가 잘 작동하는 것을 확인할 수 있다.
또 celery.logs 파일이 생성되는 것을 확인할 수 있다.

Celery Daemon

/etc/systemd/system/celery.service 파일을 생성한다.

1
2
3
4
5
6
7
8
9
10
11
12
[Unit]
Description=Celery Service
After=network.target

[Service]
Type=simple
User=juhopark
WorkingDirectory=디렉토리
ExecStart=디렉토리/celery -A celery_app.celery worker --loglevel=info -f celery.logs -E

[Install]
WantedBy=multi-user.target

시스템 등록

1
2
3
sudo systemctl daemon-reload
sudo systemctl enable celery
sudo systemctl start celery

동작 확인
sudo service celery status

Celery 예시

1
2
3
4
5
app
-- test
---- test_worker.py
celery_app.py
main.py

이런 예시 구조를 가지고 있는 상황이다.
그럼 celery_app.py를

1
2
3
4
celery = Celery(__name__,
                broker=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/",
                backend=f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}",
                include=['app.test.test_worker'])

이렇게 추가해주면 test_worker.py에 추가한 celery task가 celery에 등록이 된다.

test_worker.py에는 아래와 같은 코드를 작성한다.

1
2
3
4
5
6
from celery_app import celery


@celery.task
def add(x, y):
    return x + y

main.py에서는 아래와 같은 코드를 작성한다.

1
2
3
4
5
6
7
8
9
10
11
12
from app.test.test_worker import add
import time

result = add.apply_async(args=[1, 2]) # 1,2를 파라미터로 전달하여 task를 queue에 등록

while not result.ready(): # 작업이 완료되었는지 확인
  pass
else:
  if result.successful(): # 작업이 성공적으로 완료했는지 확인
    result_value = result.get()
  else:
    result_value = None # 작업 실패