#42. Celery

Celery

enter image description here

План

  1. Что надо знать до Celery
  2. Celery - распределённая очередь задач
    1. Как работает Celery
    2. Брокеры
    3. Установка Celery и брокера Redis
  3. Celery и Python
  4. Celery и Django
  5. Литература

Что надо знать до Celery

Процесс — экземпляр программы во время выполнения, независимый объект, которому выделены системные ресурсы (например, процессорное время и память). Каждый процесс выполняется в отдельном адресном пространстве: один процесс не может получить доступ к переменным и структурам данных другого. Если процесс хочет получить доступ к чужим ресурсам, необходимо использовать межпроцессное взаимодействие. Это могут быть конвейеры, файлы, каналы связи между компьютерами и многое другое.

Поток использует то же самое пространства стека, что и процесс, а множество потоков совместно используют данные своих состояний. Как правило, каждый поток может работать (читать и писать) с одной и той же областью памяти, в отличие от процессов, которые не могут просто так получить доступ к памяти другого процесса. У каждого потока есть собственные регистры и собственный стек, но другие потоки могут их использовать.
Поток — определенный способ выполнения процесса. Когда один поток изменяет ресурс процесса, это изменение сразу же становится видно другим потокам этого процесса.

Синхронным (synchronous) называется такое взаимодействие между компонентами, при котором клиент, отослав запрос, блокируется и может продолжать работу только после получения ответа от сервера. По этой причине такой вид взаимодействия называют иногда блокирующим (blocking).

В рамках асинхронного (asynchronous) или неблокирующего (non blocking) взаимодействия клиент после отправки запроса серверу может продолжать работу, даже если ответ на запрос еще не пришел. Асинхронное взаимодействие позволяет получить более высокую производительность системы за счет использования времени между отправкой запроса и получением ответа на него для выполнения других задач. Другое важное преимущество асинхронного взаимодействия — меньшая зависимость клиента от сервера, возможность продолжать работу, даже если машина, на которой находится сервер, стала недоступной. Это свойство используется для организации надежной связи между компонентами, даже если и клиент, и сервер не все время находятся в рабочем состоянии.
асинхронный запрос к серверу

NoSQL (от англ. not only SQL — не только SQL) — термин, обозначающий ряд подходов, направленных на реализацию систем управления базами данных, имеющих существенные отличия от моделей, используемых в традиционных реляционных СУБД с доступом к данным средствами языка SQL. Применяется к базам данных, в которых делается попытка решить проблемы масштабируемости и доступности за счёт атомарности (англ. atomicity) и согласованности данных (англ. consistency). Подробнее тут

NoSQL-базы данных

Celery - распределённая очередь задач

Logo
Celery это ничто иное как распределённая очередь задач, реализованная на языке Python.

Celery - это простая, гибкая и надежная распределенная система для обработки огромного количества сообщений, включаюзя в себя инструменты, необходимые для поддержки такой системы.

Это очередь задач с упором на обработку в реальном времени, а также с поддержкой планирования задач.

Celery имеет открытый исходный код и находится под лицензией BSD.

Итак, что же умеет Celery:

  • Выполнять асинхронно задания
  • Выполнять периодические задания(умная замена cron)
  • Выполнять отложенные задания
  • Распределенное выполнение (может быть запущен на N серверах)
  • В пределах одного worker’а возможно конкурентное выполнение нескольких задач(одновременно)
  • Выполнять задание повторно, если вылез exception
  • Ограничивать количество заданий в единицу времени (rate limit, для задания или глобально)
  • Несложно мониторить выполнение заданий
  • Выполнять подзадания
  • Присылать отчеты об exception’ах
  • Проверять выполнилось ли задание

Как работает Celery

start page

Brocker (Брокер)

Брокер сообщений (он же диспетчер очереди) — это посредник(транспорт), который принимает и отдает сообщения (задачи) между отдельными модулями/приложениями внутри некоторой сложной системы, где модули/приложения должны общаться между собой — то есть пересылать данные друг другу.

Worker (Воркер)

Воркер это отдельно запущенный процесс для выполнения определённых задач, Celery запускается на одном или нескольких воркерах, что бы выполнять задачи паралельно на каждом воркере.

Back-end (Бэкэнд)

Бэкэнд в случае с Celery выступает в качестве хранилища результатов выполнения задач.

  • Producer (поставщик) ‒ программа, отправляющая сообщения.

  • Queue (очередь) ‒ очередь сообщений(задач). Она существует внутри брокера. Любое количество поставщиков может отправлять сообщения в одну очередь, также любое количество подписчиков может получать сообщения из одной очереди. В схемах очередь будет обозначена стеком и подписана именем:

  • Consumer (подписчик) ‒ программа, принимающая сообщения. Обычно подписчик находится в состоянии ожидания сообщений.

Поставщик, подписчик и брокер не обязаны находиться на одной физической машине.

Брокеры

AMQP
(Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.

RabbitMQ
RabbitMQ – это брокер сообщений с открытым исходным кодом. Он маршрутизирует собщения по всем базовым принципам протокола AMQP описанным в спецификации. Отправитель передает сообщение брокеру а тот доставляет его получателю. RabbitMQ реализует и дополняет протокол AM

Redis
Redis (расшифровывается как Remote Dictionary Server) – это быстрое хранилище данных типа «ключ‑значение» в памяти с открытым исходным кодом для использования в качестве базы данных, кэша, брокера сообщений или очереди.

Redis это NoSQL база данных! Для Celery крайне рекомендую использовать именно его.

Implementing Celery using Django for Background Task Processing - BoTree  Technologies

Установка Celery и брокера Redis

pip install celery

Celery 4.0+ официально уже не поддерживается для Windows
Варианты запуска
0. Использовать Linux

  1. Docker
  2. WSL 2 (для Windows 10)
  3. Переменная окружения или прямо в коде

Redis
Установка самого сервиса

sudo apt install redis-server

artem@HP:~$ redis-server
17624:C 01 Mar 2021 02:50:02.381 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
17624:C 01 Mar 2021 02:50:02.381 # Redis version=5.0.7, bits=64, commit=00000000, modified=0, pid=17624, just started
17624:C 01 Mar 2021 02:50:02.381 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
17624:M 01 Mar 2021 02:50:02.382 * Increased maximum number of open files to 10032 (it was originally set to 1024).
17624:M 01 Mar 2021 02:50:02.383 # Could not create server TCP listening socket *:6379: bind: Address already in use

artem@HP:~$ redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set foo bar
OK
127.0.0.1:6379> get foo
"bar"
127.0.0.1:6379> 

Для работы необходимо так же необходима и библиотека

pip install redis

Celery и Python

Создаем файл tasks.py и “приложение”, в котором необходимо указать название (понадобится для указания брокеру) и брокера.

from celery import Celery  
  
broker_url = 'redis://localhost'  
app = Celery('tasks', broker=broker_url)  
  
  
@app.task  
def add(x, y):  
    return x + y

Мы описали задачу, и обозначили её через декоратор приложения селери.

Для того, что бы мы могли вызвать задачу, необходимо запустить селери как отдельное приложение:

celery -A tasks worker --loglevel=INFO

консольная команда будет доступна после установки celery

-A app_name - указать имя приложения,
worker - запустить один воркер,
loglevel - уровень подробностей

Запуск и обработка результата

Для запуска задач, есть много разных способов, рассмотрим базовый.

from tasks import add
add.delay(4, 4)

Для запуска задачи немедленно используется метод delay (сокращенный метод apply_async()).

Запуск задач возвращает не результат, а AsyncResult, для того что бы получать значения необходимо при создании приложения указать парамет backend который отвечает за место хранения результатов, таким параметром может быть Redis:

broker_url = 'redis://localhost'
app = Celery('tasks', broker=broker_url, backend=broker_url)

Результат будет иметь достаточно большое кол-во методов и атрибутов.

Основные два метода это ready и get

ready - отвечает за то завершилась задача или еще в процессе.

get - ждет выполнения задачи и возвращает результат. Рекомендуется использовать после ready, что бы не ждать выполнения впустую.

>>> result = add.delay(4, 4)
>>> result.ready()
False
>>> result.get()
8

Иногда описание парметров задачи и ей вызов могут быть в совершенно разных местах, для этого существует механизм подписи:

s1 = add.s(2, 2)
res = s1.delay()
res.get()

в этом примере s1 это подпись задачи, тоесть задача заготовленная для выполнения, её можно сериализовать и отправить по сети, например, а выполнить в уже совершенно других местах.

Задачи можно группировать:

from celery import group
from proj.tasks import add

group(add.s(i, i) for i in range(10))().get()

Виды запуска

Есть три варианта запуска тасков:

apply_async(args[, kwargs[, …]])

Отправка сообщения с указанием дополнительных параметров

delay(*args, **kwargs)

Отправка сообщения без каких либо параметров самого сообщения

calling (__call__)

Просто вызов, декоратор не мешает нам просто вызвать функцию без селери :)

Основные параметры apply_async()

  1. сountdown - отправить через
add.apply_async((2,2), countdown=10)
# отправить через 10 секунд
  1. eta - отправить в конкретное время
add.apply_async((2,2), eta=now() + timedelta(seconds=10))
# отправить через 10 секунд
  1. expires - время после которого перестать выполнять задачу, можно указать как цифру так и время
add.apply_async((4,5), countdown=60, expires=120)
add.apply_async((4,5), expires=now() + timedelta(days=2))
  1. link - выполнить другую задачу по завершению текущей, основываясь на результатах текущей
add.apply_async((2, 2), link=add.s(16))
# ( 2 + 2 ) + 16

Сelery beat - Переодические задачи

Селери может выполнять какие-либо задачи просто по графику

Для этого нужно настроить приложение:

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

add-every-30-seconds Ключ словаря, это только название, можно указать что угодно.

task это выполняемый таск

args его аргументы

schedule: частота выполнения в секундах

Выполнение по крону

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Cron - система задания расписания, можно сделать практически какое угодно.

Для расписания нужно запускать отдельный воркер (beat) для расписания.

Celery и Django

Для использования селери в django рекомендуется создать еще один файл celery.py на одном уровне с settings.py

- proj/
  - manage.py
  - proj/
    - celery.py
    - __init__.py
    - settings.py
    - urls.py
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Параметр namespace при указании конфига будет отвечать за то с какого слова будут начинатся настройки в settings.py

Например:

# Celery Configuration Options
CELERY_BROKER_URL = 'redis://localhost'  
CELERY_RESULT_BACKEND = 'redis://localhost'
CELERY_TIMEZONE = 'America/New_York'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

app.autodiscover_tasks() - эта строчка будет отвечать за автоматический поиск таков во всех приложениях.

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

На тоже же уровне где и settings.py создать\использовать файл __init__.py в зависимости от версии python

# __init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Все задачи необходимо покрывать не стандартным декоратором task, а декоратором shared_task тогда django сможет автоматически найти все таски в приложении.

# tasks.py

from celery import shared_task
from demoapp.models import Widget


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

Так же для Django существует много различных расширений, например:

django-celery-results - что бы хранить резльутаты в бд или кеше джанго.

django-celery-beat - настройка для переодических задач, сразу вшитая в админку джаго.

Классическим примером использования Celery является отправка электронной почты. Я использую этот пример, чтобы показать вам основы использования Celery. Для начало создадим view и задачу:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context

from myproject.celery import app


def render_template(template, context):
    engine = Engine.get_default()
    tmpl = engine.get_template(template)
    return tmpl.render(Context(context))
    
    
@celery_app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
)

Используя Celery, мы сокращаем время ответа клиенту, поскольку отделяем процесс отправки от основного кода, отвечающего за возврат ответа.

Самый простой способ выполнить эту задачу — вызвать метод delay, предоставляемый декоратором app.task.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

Celery так же позволяет настроить повторные попытки после сбоя.

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

Теперь задача будет перезапущена через десять минут, в случае если отправка не будет удачной. Кроме того, вы сможете установить количество повторных попыток.
Hендеринг шаблона вынесен за пределы вызова send_mail. Это потому, что мы заключаем вызов send_mail в try / except, и лучше иметь как можно меньше кода в try / except.

Flower - инструмент для мониторинга Celery

pip install flower

запуск

flower -A proj --port=5555

и из Celery

celery flower -A proj --address=127.0.0.1 --port=5555

enter image description here

Попрактиваться в качесвтве Д.З.

Повторить то, что сделано в видео

Литература

  1. Введение в Celery Python
  2. Celery: начинаем правильно
  3. О RabbitMQ habr
  4. Redis Quick Start
  5. First Steps with Celery and Next Steps
  6. Видео, но лучше сразу со второй части и перематывать