Celery という Python のタスクキューを使う機会があったので、簡単に使い方をメモします。
“タスクキュー” とは あるタスクを非同期に実行するための仕組みで、タスクの実行を依頼する Client と、タスクを実行する Worker というプロセスで構成されます。Client と Worker は “タスクキュー” と呼ばれるデータストアを介してやり取りします。
処理の流れは以下のようになります。
- まず Worker プロセスを起動する。Worker は継続的にタスクキューをモニタリングします。
- Client プロセスが実行したいタスクをキューに登録。
- Worker プロセスはキューにタスクが登録されたのを検知してタスクを実行。
- 以降 Client がタスクを登録する度に Worker がそのタスクを実行します。
このような仕組みを使うことでシステム間の依存を減らせたり、長時間かかる処理を非同期に別のマシンで実行できるなどのメリットがあります。
用語
- Celery Application: Celery のメインとなるコンポーネントを “Application” と呼びます。 実体は
Celery
クラスのインスタンスです。 - Broker: 先に “Client と Worker はデータストアを介してやり取りする” と書きましたが、このデータストアのことを Celery では Broker と呼んでいます。RabbitMQ や Redis が使われることが多いです。
- Backend: 通常タスクは非同期で実行されるため Client はタスクの処理結果を知ることができませんが、Celery ではタスクの処理結果を Broker とは別のデータストアに保存しておいて、Client がタスクの処理結果を問い合わせることができるようにする仕組みがあります。この処理結果を保存しておくデータストアのことを Celery では Backend と呼びます。
システム構成
- OS: Ubuntu 20.04
※ なお Celery は Windows をサポートしていません。
フォルダ構成
- celery_tutorial
|- celery_app.py
|- client.py
|- worker.py
|- config
| -- celery_config.py
-- tasks
-- add.py
“celery_tutorial” フォルダを今回のルートフォルダとして、中に celery_app.py
, client.py
, worker.py
の 3つの Python ファイルがあります。そして tasks
フォルダの中に task.py
がある、構成です。各ファイルの役割はそれぞれ以下の通りです。
celery_app.py
: Celery では “Application” と呼ばれるコンポーネントを中心に構成されます。このファイルでは Celery Application を定義して他のファイルから参照できるようにします。worker.py
: Worker プロセスを実行します。client.py
: Client プロセスを実行します。config/celery_config.py
: Celery Application の設定を定義します。tasks/add.py
: Client により実行されるタスクを定義します。
Broker 用データストアのインストール
まず最初に Broker として使うデータストアを用意しておく必要があります。今回は Redis を使います。
shell
# Redis をインストール
$ sudo apt install redis-server
# Redis の初期設定
$ sudo nano /etc/redis/redis.conf
---
# systemd で実行制御できるよう設定
supervised systemd
# リモートからアクセスできるよう設定
protected-mode no
bind 0.0.0.0 ::0
---
shell
# Redis を再起動
$ sudo systemctl restart redis.service
# 接続テスト
$ redis-cli
127.0.0.1:6374>ping
PONG
Celery のインストール
Celery は pip
でインストールできます。今回は Broker に Redis を使っているため、Python の Redis クライアントライブラリも一緒にインストールしておく必要があります。
shell
$ pip install celery redis
Celery を使ったアプリケーションの実装
タスク
それでは Celery を使ったアプリケーションを実装します。まず tasks/add.py
からです。
Celery のタスクは “Annotation ベース” と “Class ベース” の2通りの定義の仕方があります。”Annotation ベース” は、@app.task
というアノテーションを Python のファンクションにつけることで、そのファンクションを Celery のタスクとして実行できるようにする、というものです。
公式のチュートリアルではこの Annotation ベース のやり方が紹介されているので、今回は Class ベースのやり方を紹介します。
tasks/add.py
from celery import Task
class AddTask(Task):
def run(self, x, y):
result = x + y
print(f"AddTask result: {result}")
return result
Class ベースのタスク定義では、Task
クラスを継承したクラスで run()
メソッドを定義します。タスクが実行される時、この run()
の中身が実行されます。今回は引数となる 2つの数値を足したものをコンソールに出力する、というタスクを定義しています。
Celery Application
次に “Application” と呼ばれる Celery の中心的なコンポーネントを定義します。Celery Application の設定を行う方法はいくつかあり、基本的なものでは、
- コンストラクタの引数に設定する方法と、
config_from_object()
メソッドを使って設定する方法
があります。
公式のチュートリアルではコンストラクタの引数に設定する方法が紹介されているので、今回は config_from_object()
を使う方法を紹介します。
Celery Config
config/celery_config.py
broker_url = 'redis://192.168.1.2:6379'
result_backend = 'rpc://'
enable_utc = True
timezone = "Asia/Tokyo"
Config は Python ファイルに設定したい項目の値を定義すれば OK です。設定できる項目の一覧は Configuration and defaults に記載されています。今回は Broker のURL、Backend の URL、タイムゾーンを設定しています。なお Backend に設定した "rpc://"
は、タスクの処理結果をデータストアに保存するのではなく、RPC メッセージとしてクライアントに送信するという設定です。
Celery Application
それではメインの Application の定義です。ファイルは celery_app.py
です。
celery_app.py
from celery import Celery
from config import celery_config
from tasks.add import AddTask
# 1. Celery インスタンスを作成
app = Celery('tasks')
# 2. Config を設定
app.config_from_object(celery_config)
# 3. タスクを登録
add_task = app.register_task(AddTask())
処理の内容はコメントの通りです。Celery
クラスのコンストラクタの第一引数には、タスクが定義されているパッケージ名を指定します。今回は tasks
フォルダ内にタスクを定義しているため、tasks
がパッケージ名になります。
また Class ベースのタスクの場合、register_task()
でタスクを登録し、その戻り値 (今回のケースでは add_task
変数) を使ってタスクの呼び出しを行います (Annotation ベースの場合、この処理は不要です)。
Worker 定義
次に Worker を定義します。Celery の Worker は Python プログラム内で起動する方法の他に、Celery に既に用意されている celery worker
コマンドで起動することもできます。
公式のチュートリアルでは celery worker
コマンドを使って Worker プロセスを起動する方法が紹介されていますので、今回は Python プログラムから起動する方法を紹介します。
worker.py
from celery_app import app
worker = app.Worker()
worker.start()
exitcode = worker.exitcode
print(f"Celery worker stopped. Exit code: {exitcode}")
処理の内容はそのままで、上記 celery_app.py
で定義した app
変数をインポートし、Worker インスタンスを作成した後 start()
を呼ぶことで実行できます。worker.start()
が実行された後は、エラーが発生するかプロセスが中断されない限り、プロセスは継続して実行され続けます。
※ Worker インスタンスについては公式ドキュメントに詳細が記載されていないため、celery worker
コマンドのソースコードが参考になります。
Client 定義
最後に Client の定義です。
client.py
from celery_app import add_task
def main():
res = add_task.delay(5, 2)
print(f"Result form add_task: {res.get()}")
if __name__ == '__main__':
main()
これも処理内容はそのままです。上記の celery_app.py
内でタスクを登録 (12行目 app.register_task(AddTask())
の部分) した時の戻り値をここでインポートして、delay()
メソッドを呼び出すことでタスクを実行できます。
なお delay()
は簡易的な呼び出し方法で、タスクの実行を細かく制御したい場合は apply_async()
メソッド (公式ドキュメント) を使う必要があります。
Backend を設定していてタスクが戻り値を返している場合、delay()
や apply_async()
の戻り値の get()
メソッドを呼ぶことで結果を取得することができます (ただしこれを行うと処理が非同期にならないため非推奨です)。
アプリケーションの実行
以上で実装は全て完了ですので、アプリケーションを実行してみます。
まず新しいシェルを開いて、以下のコマンドを実行して Worker を起動します (Broker は既に起動している前提です)。
shell
$ python3 worker.py
次に別のシェルを開いて、Client を実行します。
shell
$ python3 client.py
Result form add_task: 7
すると Client がタスクの結果を取得して表示しているのが確認できます。ここで Worker を起動したシェルを見てみると以下のように表示されていると思います。
$ python3 worker.py
-------------- celery@<hostnamep> v5.2.6
(省略)
[2022-05-07 18:01:18,224: WARNING/ForkPoolWorker-1] AddTask result: 7
これで Worker 側でもタスクが実行されていたのが確認できます。
Celery の基本的な使い方は以上です。Celery では Worker の並行性やタスクのリトライ回数など細かく設定できるため、公式ドキュメントを参照してください。