Celery という Python のタスクキューを使う機会があったので、簡単に使い方をメモします。

“タスクキュー” とは あるタスクを非同期に実行するための仕組みで、タスクの実行を依頼する Client と、タスクを実行する Worker というプロセスで構成されます。Client と Worker は “タスクキュー” と呼ばれるデータストアを介してやり取りします。

処理の流れは以下のようになります。

  1. まず Worker プロセスを起動する。Worker は継続的にタスクキューをモニタリングします。
  2. Client プロセスが実行したいタスクをキューに登録。
  3. Worker プロセスはキューにタスクが登録されたのを検知してタスクを実行。
  4. 以降 Client がタスクを登録する度に Worker がそのタスクを実行します。

このような仕組みを使うことでシステム間の依存を減らせたり、長時間かかる処理を非同期に別のマシンで実行できるなどのメリットがあります。

用語

システム構成

※ なお Celery は Windows をサポートしていません

フォルダ構成

- celery_tutorial
  |- celery_app.py
  |- client.py
  |- worker.py
  |- config
  |   -- celery_config.py
  -- tasks
      -- add.py

“celery_tutorial” フォルダを今回のルートフォルダとして、中に celery_app.py, client.py, worker.py の 3つの Python ファイルがあります。そして tasks フォルダの中に task.py がある、構成です。各ファイルの役割はそれぞれ以下の通りです。

Broker 用データストアのインストール

まず最初に Broker として使うデータストアを用意しておく必要があります。今回は Redis を使います。

shell

# Redis をインストール
$ sudo apt install redis-server

# Redis の初期設定
$ sudo nano /etc/redis/redis.conf
---
# systemd で実行制御できるよう設定
supervised systemd

# リモートからアクセスできるよう設定
protected-mode no
bind 0.0.0.0 ::0
---

shell

# Redis を再起動
$ sudo systemctl restart redis.service

# 接続テスト
$ redis-cli
127.0.0.1:6374>ping
PONG

Celery のインストール

Celery は pip でインストールできます。今回は Broker に Redis を使っているため、Python の Redis クライアントライブラリも一緒にインストールしておく必要があります。

shell

$ pip install celery redis

Celery を使ったアプリケーションの実装

タスク

それでは Celery を使ったアプリケーションを実装します。まず tasks/add.py からです。

Celery のタスクは “Annotation ベース” と “Class ベース” の2通りの定義の仕方があります。”Annotation ベース” は、@app.task というアノテーションを Python のファンクションにつけることで、そのファンクションを Celery のタスクとして実行できるようにする、というものです。

公式のチュートリアルではこの Annotation ベース のやり方が紹介されているので、今回は Class ベースのやり方を紹介します。

tasks/add.py

from celery import Task

class AddTask(Task):
  def run(self, x, y):
    result = x + y
    print(f"AddTask result: {result}")
    return result

Class ベースのタスク定義では、Task クラスを継承したクラスで run() メソッドを定義します。タスクが実行される時、この run() の中身が実行されます。今回は引数となる 2つの数値を足したものをコンソールに出力する、というタスクを定義しています。

Celery Application

次に “Application” と呼ばれる Celery の中心的なコンポーネントを定義します。Celery Application の設定を行う方法はいくつかあり、基本的なものでは、

  1. コンストラクタの引数に設定する方法と、
  2. config_from_object() メソッドを使って設定する方法

があります。

公式のチュートリアルではコンストラクタの引数に設定する方法が紹介されているので、今回は config_from_object() を使う方法を紹介します。

Celery Config

config/celery_config.py

broker_url = 'redis://192.168.1.2:6379'
result_backend = 'rpc://'
enable_utc = True
timezone = "Asia/Tokyo"

Config は Python ファイルに設定したい項目の値を定義すれば OK です。設定できる項目の一覧は Configuration and defaults に記載されています。今回は Broker のURL、Backend の URL、タイムゾーンを設定しています。なお Backend に設定した "rpc://" は、タスクの処理結果をデータストアに保存するのではなく、RPC メッセージとしてクライアントに送信するという設定です。

Celery Application

それではメインの Application の定義です。ファイルは celery_app.py です。

celery_app.py

from celery import Celery
from config import celery_config
from tasks.add import AddTask

# 1. Celery インスタンスを作成
app = Celery('tasks')

# 2. Config を設定
app.config_from_object(celery_config)

# 3. タスクを登録
add_task = app.register_task(AddTask())

処理の内容はコメントの通りです。Celery クラスのコンストラクタの第一引数には、タスクが定義されているパッケージ名を指定します。今回は tasks フォルダ内にタスクを定義しているため、tasks がパッケージ名になります。

また Class ベースのタスクの場合、register_task() でタスクを登録し、その戻り値 (今回のケースでは add_task変数) を使ってタスクの呼び出しを行います (Annotation ベースの場合、この処理は不要です)。

Worker 定義

次に Worker を定義します。Celery の Worker は Python プログラム内で起動する方法の他に、Celery に既に用意されている celery worker コマンドで起動することもできます。

公式のチュートリアルでは celery worker コマンドを使って Worker プロセスを起動する方法が紹介されていますので、今回は Python プログラムから起動する方法を紹介します。

worker.py

from celery_app import app
worker = app.Worker()
worker.start()

exitcode = worker.exitcode
print(f"Celery worker stopped. Exit code: {exitcode}")

処理の内容はそのままで、上記 celery_app.py で定義した app 変数をインポートし、Worker インスタンスを作成した後 start() を呼ぶことで実行できます。worker.start() が実行された後は、エラーが発生するかプロセスが中断されない限り、プロセスは継続して実行され続けます。

※ Worker インスタンスについては公式ドキュメントに詳細が記載されていないため、celery worker コマンドのソースコードが参考になります。

Client 定義

最後に Client の定義です。

client.py

from celery_app import add_task

def main():
  res = add_task.delay(5, 2)
  print(f"Result form add_task: {res.get()}")

if __name__ == '__main__':
  main()

これも処理内容はそのままです。上記の celery_app.py 内でタスクを登録 (12行目 app.register_task(AddTask()) の部分) した時の戻り値をここでインポートして、delay() メソッドを呼び出すことでタスクを実行できます。

なお delay() は簡易的な呼び出し方法で、タスクの実行を細かく制御したい場合は apply_async() メソッド (公式ドキュメント) を使う必要があります。

Backend を設定していてタスクが戻り値を返している場合、delay()apply_async() の戻り値の get() メソッドを呼ぶことで結果を取得することができます (ただしこれを行うと処理が非同期にならないため非推奨です)。

アプリケーションの実行

以上で実装は全て完了ですので、アプリケーションを実行してみます。

まず新しいシェルを開いて、以下のコマンドを実行して Worker を起動します (Broker は既に起動している前提です)。

shell

$ python3 worker.py

次に別のシェルを開いて、Client を実行します。

shell

$ python3 client.py
Result form add_task: 7

すると Client がタスクの結果を取得して表示しているのが確認できます。ここで Worker を起動したシェルを見てみると以下のように表示されていると思います。

$ python3 worker.py

-------------- celery@<hostnamep> v5.2.6
(省略)
[2022-05-07 18:01:18,224: WARNING/ForkPoolWorker-1] AddTask result: 7

これで Worker 側でもタスクが実行されていたのが確認できます。

Celery の基本的な使い方は以上です。Celery では Worker の並行性やタスクのリトライ回数など細かく設定できるため、公式ドキュメントを参照してください。