Django に Celery タスクキューを導入し、遅い処理を利用者に体感させないようにする


Django でウェブアプリを作る際、遅い処理をタスクキューにするには、celery が便利です。今回、社内勉強会で Django + celery のチュートリアルを行ったので、celery で簡単なタスクを動かすまでを書いておきます。

内容としては Celery ドキュメントの First steps with Django をなぞっています。

環境

  • MacOS
  • Python 3.6.5
  • Django 2.1
  • Celery 4.2.1
  • Redis

Redisは、キューのブローカーとして使います。Redis以外にも、RabbitMQ やAmazon SQS が使えます。

Redis サーバの起動方法は書いていませんので、適宜起動してください。

プロジェクトフォルダの作成

$ mkdir celery_handson
$ cd celery_handson

venv (仮想環境) の作成

$ python3.6 -m venv venv
$ . venv/bin/activate

ライブラリのインストール

$ pip install django
$ pip install celery django-celery-results redis django-redis
$ pip install ipython

django-admin.py を認識させるため、仮想環境に入り直しておきます。

$ deactivate
$ . venv/bin/activate

Djangoプロジェクトの作成

(今、celery_handson ディレクトリにいますが、さらにその中に celery_handson という名前の Django プロジェクトを作ります)

$ django-admin.py startproject celery_handson
$ cd celery_handson

Django_celery_results の導入

settings.py を編集し、INSTALLED_APPS に django_celery_results を追加します。

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_results', # ←追加
]

DBの作成

$ ./manage.py migrate

これで、Djangoの初期設定は完了しました。

試しに、

$ ./manage.py runserver

を起動し、http://127.0.0.1:8000/ にアクセスしてみると、テストページが表示されます。

Celery 設定の追加

settings.py に追加します。

CELERY_BROKER_URL = "redis://<Redisサーバのホスト>:6379/1"
CELERY_RESULT_BACKEND = "django-db"

Celery ファイルの作成

celery_handson/celery.py を作成

urls.py の並びに、celery.py を作ります。

# celery_handson/celery_handson/celery.py
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
settings = os.getenv(
"DJANGO_SETTINGS_MODULE", "celery_handson.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", settings)

app = Celery('celery_handson')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(['celery_handson'])

公式ドキュメントにあるファイルほぼそのままです。

タスクスクリプトの作成

urls.py, celery.py の並びに、tasks ディレクトリを作り、その中に __init__.py を作ります。

# celery_handson/celery_handson/tasks/__init__.py
from ..celery import app


@app.task()
def add_numbers(a, b):
print('Request: {}'.format(a + b))
return a + b

celery ワーカーの起動

./manage.py があるディレクトリで行います。

celery -A celery_handson worker --concurrency=1

--concurrency は並列性のオプションで、省略しても問題ありませんが開発中は1が扱いやすいように思います。

ターミナル内で実行し続けますので、そのターミナルはそのままにして、次の「タスクの実行」からは新しいターミナルを起動してください。

(ちなみに celery 実行バイナリは、venv/bin/celery にいます。)

(iTerm2 を使っている場合、ターミナルに画像が表示されます)

タスクの実行

同期処理

$ ./manage.py shell
>>> from celery_handson.tasks import add_numbers
>>> add_numbers(3, 4)
Request: 7
7

普通に同プロセス内で add_numbers をコールしただけです。これは celery は関係ありません。

非同期処理

>>> add_numbers.delay(5, 6)
<AsyncResult: deeddee2-74a6-48ec-9f22-987417135592>

@app.task() デコレータがついた関数は Celery のタスクとして登録され、.delay メソッドが使えるようになっています。.delay をコールすることで、非同期実行のタスクキューとしてブローカーに登録され、ワーカーはそれを拾って処理を行います。

.delay メソッドの戻り値である AsyncResult は、result プロパティの中に結果が入っているかもしれません。

また、django_celery_results を使っていれば処理結果は DB (モデル) に記録され、task_id の値を元に取得することができます。

>>> tr = TaskResult.objects.get(task_id='2781413b-282e-4de8-b134-b338b592ad02')
>>> tr.status
'SUCCESS'
>>> tr.result
'11'

ビュー内で行う重い処理や、マネジメントコマンドから実行するタスクで並列したいものは、このタスクの delay メソッドを使うことで簡単に非同期化でき、使用者のストレスを下げることができます。

PyCharm の設定

タスクの開発は、他のスクリプトと同様にIDEでブレイクポイントを設定すると効率的に開発できます。

Django の環境設定

一番上の celery_handson ディレクトリを PyCharm で開きます。

Project Interpreter

⌘+, で設定を開き、検索窓に interpreter と入力。

Project Interpreter を、先程作った venv 内のPython を指定します。最近の PyCharm は自動認識します。

Django Support

検索窓に django と入力し、Languages & Frameworks ➝ Django の設定をします。Enable Django Support にチェックを入れ、Settings に celery_handson/settings.py を指定。

Project Structure

検索窓に structure と入力し、./manage.py が入っている celery_handson ディレクトリを Sources に追加しておきます。

Run/Debug Configurations の登録

エディタ右上の、Edit Configurations... を選択し、+ ボタンから Django Server を選択 して設定を登録します。

Enironment variables (環境変数) に、DJANGO_SETTINGS_MODULE celery_handson.settings を登録しておきます。(無くても動くかもしれません。PyCharm の他の設定によります。設定しとくと無難です)

この設定を実行すると、Django テストサーバが起動します。

Celery用の設定

Run/Debug Configurations の + をクリック ➝ Python で、

Script path に venv/bin/celery、parameters に -A celery_handson worker --concurrency=1 を、念の為 Environment variables: に DJANGO_SETTINGS_MODULE celery_handson.settings を登録しておきます。

この環境をデバッグ起動すれば、ワーカープロセス内でブレイクポイントが使えるようになり、開発がしやすくなります。

Celery ワーカーの状態取得

from celery.task import control
inspect = control.inspect()
inspect.stats()  # ステータスを取得
inspect.ping()  # 疎通確認
inspect.active()  # 現在実行中のタスク

Supervisor の設定サンプル

[program:celery-handson]
autostart = true
autorestart = true
user = ubuntu
environment = DJANGO_SETTINGS_MODULE=celery_handson.settings
command = /var/django/celery_handson/venv/bin/celery -A celery_handson worker --concurrency=8
directory = /var/django/celery_handson/celery_handson
stopasgroup = true

; logs
stdout_logfile = /var/log/supervisor/celery_handson.log
stdout_logfile_maxbytes = 1MB
stdout_logfile_backups = 5
stdout_capture_maxbytes = 1MB
redirect_stderr = true


; プロセス再起動:
; sudo supervisorctl restart celery-handson


メモ: django_celery_beat でDBで設定したタスクを実行させるには

command = /var/django/celery_handson/venv/bin/celery -A celery_handson beat --scheduler=django_celery_beat.schedulers:DatabaseScheduler
現在の評価: 3.3

コメント

コメントを投稿
コメントするには TORICO-ID にログインしてください。
ログイン コメント利用規約