Django でウェブアプリを作る際、遅い処理をタスクキューにするには、celery が便利です。今回、社内勉強会で Django + celery のチュートリアルを行ったので、celery で簡単なタスクを動かすまでを書いておきます。
内容としては Celery ドキュメントの First steps with Django をなぞっています。
環境
- MacOS
- Python 3.6.5
- Django 2.1
- Celery 4.2.1
- Redis
Redisは、キューのブローカーとして使います。Redis以外にも、RabbitMQ やAmazon SQS が使えます。
Redis サーバの起動方法は書いていませんので、適宜起動してください。
プロジェクトフォルダの作成
$ mkdir celery_handson
$ cd celery_handson
venv (仮想環境) の作成
$ python3.6 -m venv venv
$ . venv/bin/activate
ライブラリのインストール
$ pip install django
$ pip install celery django-celery-results redis django-redis
$ pip install ipython
django-admin.py を認識させるため、仮想環境に入り直しておきます。
$ deactivate
$ . venv/bin/activate
Djangoプロジェクトの作成
(今、celery_handson ディレクトリにいますが、さらにその中に celery_handson という名前の Django プロジェクトを作ります)
$ django-admin.py startproject celery_handson
$ cd celery_handson
Django_celery_results の導入
settings.py を編集し、INSTALLED_APPS
に django_celery_results
を追加します。
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_results', # ←追加
]
DBの作成
$ ./manage.py migrate
これで、Djangoの初期設定は完了しました。
試しに、
$ ./manage.py runserver
を起動し、http://127.0.0.1:8000/
にアクセスしてみると、テストページが表示されます。
Celery 設定の追加
settings.py に追加します。
CELERY_BROKER_URL = "redis://<Redisサーバのホスト>:6379/1"
CELERY_RESULT_BACKEND = "django-db"
Celery ファイルの作成
celery_handson/celery.py を作成
urls.py の並びに、celery.py を作ります。
# celery_handson/celery_handson/celery.py
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
settings = os.getenv(
"DJANGO_SETTINGS_MODULE", "celery_handson.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", settings)
app = Celery('celery_handson')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(['celery_handson'])
※ 公式ドキュメントにあるファイルほぼそのままです。
タスクスクリプトの作成
urls.py, celery.py の並びに、tasks ディレクトリを作り、その中に __init__.py を作ります。
# celery_handson/celery_handson/tasks/__init__.py
from ..celery import app
@app.task()
def add_numbers(a, b):
print('Request: {}'.format(a + b))
return a + b
celery ワーカーの起動
./manage.py があるディレクトリで行います。
$ celery -A celery_handson worker --concurrency=1
--concurrency
は並列性のオプションで、省略しても問題ありませんが開発中は1が扱いやすいように思います。
ターミナル内で実行し続けますので、そのターミナルはそのままにして、次の「タスクの実行」からは新しいターミナルを起動してください。
(ちなみに celery 実行バイナリは、venv/bin/celery にいます。)
(iTerm2 を使っている場合、ターミナルに画像が表示されます)
タスクの実行
同期処理
$ ./manage.py shell
>>> from celery_handson.tasks import add_numbers
>>> add_numbers(3, 4)
Request: 7
7
普通に同プロセス内で add_numbers をコールしただけです。これは celery は関係ありません。
非同期処理
>>> add_numbers.delay(5, 6)
<AsyncResult: deeddee2-74a6-48ec-9f22-987417135592>
@app.task()
デコレータがついた関数は Celery のタスクとして登録され、.delay
メソッドが使えるようになっています。.delay
をコールすることで、非同期実行のタスクキューとしてブローカーに登録され、ワーカーはそれを拾って処理を行います。
.delay
メソッドの戻り値である AsyncResult
は、result
プロパティの中に結果が入っているかもしれません。
また、django_celery_results
を使っていれば処理結果は DB (モデル) に記録され、task_id
の値を元に取得することができます。
>>> tr = TaskResult.objects.get(task_id='2781413b-282e-4de8-b134-b338b592ad02')
>>> tr.status
'SUCCESS'
>>> tr.result
'11'
ビュー内で行う重い処理や、マネジメントコマンドから実行するタスクで並列したいものは、このタスクの delay メソッドを使うことで簡単に非同期化でき、使用者のストレスを下げることができます。
PyCharm の設定
タスクの開発は、他のスクリプトと同様にIDEでブレイクポイントを設定すると効率的に開発できます。
Django の環境設定
一番上の celery_handson ディレクトリを PyCharm で開きます。
Project Interpreter
⌘+, で設定を開き、検索窓に interpreter と入力。
Project Interpreter を、先程作った venv 内のPython を指定します。最近の PyCharm は自動認識します。
Django Support
検索窓に django と入力し、Languages & Frameworks ➝ Django の設定をします。Enable Django Support にチェックを入れ、Settings に celery_handson/settings.py を指定。
Project Structure
検索窓に structure と入力し、./manage.py が入っている celery_handson ディレクトリを Sources に追加しておきます。
Run/Debug Configurations の登録
エディタ右上の、Edit Configurations... を選択し、+ ボタンから Django Server を選択 して設定を登録します。
Enironment variables (環境変数) に、DJANGO_SETTINGS_MODULE celery_handson.settings を登録しておきます。(無くても動くかもしれません。PyCharm の他の設定によります。設定しとくと無難です)
この設定を実行すると、Django テストサーバが起動します。
Celery用の設定
Run/Debug Configurations の + をクリック ➝ Python で、
Script path に venv/bin/celery、parameters に -A celery_handson worker --concurrency=1 を、念の為 Environment variables: に DJANGO_SETTINGS_MODULE celery_handson.settings を登録しておきます。
この環境をデバッグ起動すれば、ワーカープロセス内でブレイクポイントが使えるようになり、開発がしやすくなります。
Celery ワーカーの状態取得
from celery.task import control
inspect = control.inspect()
inspect.stats() # ステータスを取得
inspect.ping() # 疎通確認
inspect.active() # 現在実行中のタスク
Supervisor の設定サンプル
[program:celery-handson]
autostart = true
autorestart = true
user = ubuntu
environment = DJANGO_SETTINGS_MODULE=celery_handson.settings
command = /var/django/celery_handson/venv/bin/celery -A celery_handson worker --concurrency=8
directory = /var/django/celery_handson/celery_handson
stopasgroup = true
; logs
stdout_logfile = /var/log/supervisor/celery_handson.log
stdout_logfile_maxbytes = 1MB
stdout_logfile_backups = 5
stdout_capture_maxbytes = 1MB
redirect_stderr = true
; プロセス再起動:
; sudo supervisorctl restart celery-handson
メモ: django_celery_beat でDBで設定したタスクを実行させるには
command = /var/django/celery_handson/venv/bin/celery -A celery_handson beat --scheduler=django_celery_beat.schedulers:DatabaseScheduler