Введение в Python Celery

Практически каждый в сообществе Python хотя бы раз слышал о Celery, а может быть, даже уже работал с ним. По сути, это удобный инструмент, который помогает запускать скрипты или избранные части вашей программы в отдельном процессе или даже на отдельном компьютере или сервере, что экономит время и силы.

Скачивайте книги ТОЛЬКО на русском языке у нас в телеграм канале: PythonBooksRU

Celery снижает нагрузку на производительность, реализуя часть функциональности в виде отложенных задач на том же сервере, где расположено ваше приложение, либо на другом устройстве. Чаще всего разработчики используют его для отправки электронной почты. Однако Celery может предложить гораздо больше. В этой статье я покажу вам некоторые основы Celery, а также несколько полезных практик.

Основы Celery

Если вы уже работали с Celery, можете пропустить эту главу. Но если он для вас в новинку, здесь вы узнаете, как встроить его в свой проект и попробуете настроить Celery для Django. По сути, мы просто создадим экземпляр Celery и будем использовать его для пометки функций Python как задач.

Лучше создать экземпляр в отдельном файле, так как для работы с Celery так же, как и с WSGI в Django, потребуется запустить его. Например, если вы создадите два экземпляра (Flask и Celery) в одном файле в приложении Flask и запустите его, у вас будет создано два экземпляра, но использоваться будет только один. То же самое происходит, когда вы запускаете Celery в Python.

Базовые примеры использования Python Celery

Как уже было упомянуто выше, основным юзкейсом Celery является отправка электронной почты. Вот наглядный пример того, как это можно сделать:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context

from myproject.celery import app


def render_template(template, context):
    engine = Engine.get_default()

    tmpl = engine.get_template(template)

    return tmpl.render(Context(context))
    

@app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
    )

В этом коде используется Django, поскольку это основной фреймворк для веб-приложений. Используя Celery, мы сокращаем время ответа клиенту, так как отделяем процесс отправки от основного кода, отвечающего за возврат ответа пользователю.

Самый простой способ выполнить эту задачу – вызвать метод delay, который предоставляется декоратором app.task.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

Кроме того, Celery предоставляет еще много возможностей. Например, мы можем настроить повторные попытки при ошибках.

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

Теперь, если отправка не удалась, задача будет перезапущена повторно через десять минут. Также вы сможете задать количество повторных попыток.

Некоторые из вас могут удивиться, почему render_template расположен за пределами вызова send_mail. Это потому, что мы обернули вызов send_mail в try/except (в нем лучше иметь конкретный код, который мы хотим обработать), а smtplib.SMTPException обрабатывает ошибку связанную именно с отправкой почты, не с созданием шаблона письма.

Celery для опытных пользователей

Задачи по расписанию Celery Django

Celery позволяет запускать задачи с помощью планировщиков, подобных crontab в Linux.

Прежде всего, если вы хотите выполнять какие-то задачи периодически, вы должны запустить Celery worker с флагом -beat, иначе Celery будет игнорировать планировщик. Следующий шаг – создание конфигурации, в которой будет указано, какая задача должна выполняться и когда. Вот пример:

from celery.schedules import crontab


CELERY_BEAT_SCHEDULE = {
    'monday-statistics-email': {
        'task': 'myproject.apps.statistics.tasks.monday_email',
        'schedule': crontab(day_of_week=1, hour=7),
    },
}

*Если вы не используете Django, вам следует использовать celery_app.conf.beat_schedule вместо CELERY_BEAT_SCHEDULE. В этой конфигурации мы запускаем только одну задачу, которая будет выполняться каждый понедельник в 7 утра. Главным ключом является имя или cronjob, а не задача. Вы можете добавлять аргументы к задачам и выбирать, что должно быть сделано в случае, если одна и та же задача должна выполняться в разное время с разными аргументами. Метод crontab поддерживает синтаксис системного crontab – например, crontab(minute='/15') – для запуска задачи каждые 15 минут.

Отложенное выполнение задач в Celery

Вы также можете установить для задач в очереди Celery тайм-аут перед выполнением. Например, когда вам нужно отправить уведомление после выполнения действия. Для этого используйте метод apply_async с аргументом eta или countdown:

  • eta – выполнение задачи в конкретное время
  • countdown – выполнение задачи через N секунд

Если нам нужно отправить 2 письма, одно через 15 минут, а второе – в 7 часов утра 20 мая, это можно сделать следующим образом:

from datetime import datetime


send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    countdown=15 * 60
)

send_mail_task.apply_async(
    (('noreply@example.com', ), 'Celery cookbook test', 'test', {}),
    eta=datetime(2019, 5, 20, 7, 0)

Настройка очередей Python Celery

Celery может помочь с распределением задач, когда у вас есть несколько воркеров на разных серверах, которые используют одну очередь сообщений для планирования задач. Вы можете настроить дополнительную очередь для вашей задачи/воркера.

Например, отправка электронной почты является критической частью вашей системы, и вы не хотите, чтобы другие задачи влияли на отправку. Тогда вы можете добавить новую очередь, назовем ее mail, и использовать ее только для отправки электронных писем.

CELERY_TASK_ROUTES = {
    'myproject.apps.mail.tasks.send_mail_task': {'queue': 'mail', },
}

*Если вы не используете Django, используйте celery_app.conf.task_routes вместо CELERY_TASK_ROUTES.

Запустите два отдельных celery worker для очереди по умолчанию и новой очереди:

celery -A myproject worker -l info -Q celery
celery -A myproject worker -l info -Q mail

Первая строка запустит worker для очереди по умолчанию под названием celery, а вторая строка запустит worker для очереди mail. Вы можете использовать первый worker без аргумента -Q, тогда он будет использовать все настроенные очереди.

Длительные задачи Python Celery

Иногда приходится иметь дело с задачами, написанными для просмотра записей в базе данных и выполнения некоторых операций. Довольно часто разработчики забывают о росте размера данных, что может привести к очень долгому времени выполнения задачи.

Всегда лучше прописывать подобные задачи таким образом, чтобы они позволяли работать с кусочками данных. Самый простой способ – добавить в задачу параметры offset и limit. Это позволит вам указать размер куска, а также курсор для получения новой части данных.

@celery_app.task
def send_good_morning_mail_task(offset=0, limit=100):
    users = User.objects.filter(is_active=True).order_by('id')[offset:offset + limit]
    for user in users:
        send_good_morning_mail(user)

    if len(users) >= limit:
        send_good_morning_mail_task.delay(offset + limit, limit)

Это очень простой пример того, как может быть реализована подобная задача. В конце мы проверяем, сколько пользователей мы нашли в базе данных. Если число равно лимиту, то, вероятно, у нас есть новые пользователи для обработки. Поэтому мы запускаем задачу снова, с новым смещением.

Если количество пользователей меньше лимита, значит, это последняя часть, и нам не нужно продолжать. Внимание: при такой реализации задачи записи должны иметь одинаковый порядок.

Получение результатов выполнения задачи

Большинство разработчиков не записывают результаты, полученные после выполнения задачи. Но представьте, что вы можете взять часть кода, использовать ее для задачи и затем выполнять эту задачу отдельно, как только получите запрос от пользователя.

Когда нам нужны результаты задачи, мы либо получаем их сразу же (если задача завершена), либо ждем завершения задачи. Затем мы включаем результат в ответ. Используя такой подход, можно сократить время отклика, что очень полезно для ваших пользователей и качества сайта.

Мы используем этот подход для выполнения одновременных операций. В одном из наших проектов у нас много пользовательских данных и много поставщиков услуг. В поисках лучшего поставщика услуг мы проводим много расчетов и проверок. Чтобы сделать это быстрее, мы создаем задачи для пользователя с каждым поставщиком услуг, запускаем их и собираем результаты, чтобы показать пользователю. Это очень легко сделать с помощью групп задач Celery.

from celery import group

@celery_app.task
def calculate_service_provider_task(user_id, provider_id):
    user = User.objects.get(pk=user_id)
    provider = ServiceProvider.objects.get(pk=provider_id)

    return calculate_service_provider(user, provider)


@celery_app.task
def find_best_service_provider_for_user(user_id):
    user = User.objects.get(pk=user_id)
    providers = ServiceProvider.objects.related_to_user(user)

    calc_group = group([
        calculate_service_provider_task.s(user.pk, provider.pk)
        for provider in providers
    ]).apply_async()

    return calc_group

Итак, зачем мы вообще запускаем две задачи? Мы используем вторую задачу для формирования групп задач расчета, их дельнейшего использования и результата. Кроме того, во второй задаче можно назначить фильтрацию проектов – например, поставщиков услуг, которые необходимо предоставить для данного пользователя. Все это можно делать, пока Celery выполняет другую работу. Когда группа задач возвращается, результат первой задачи будет являться нужным для нас расчетом.

Вот пример использования этого подхода в коде:

def view(request):
    find_job = find_best_service_provider_for_user.delay(request.user.pk)

    # do other stuff

    calculations_results = find_job.get().join()

    # process calculations_results and send response

Здесь мы выполняем вычисления как можно быстрее, ждем результатов в конце работы метода, затем формируем ответ и отправляем его пользователю.

Лучшие практики Python Celery

Tiny data

Возможно, я уже упоминал, что в качестве аргументов задачи я использую идентификаторы записей базы данных, а не полные объекты. Это хороший способ уменьшить размер очереди сообщений. Но важнее то, что при выполнении задачи данные в базе данных могут быть изменены. А когда у вас есть только идентификаторы, вы получаете свежие данные, а не устаревшие, как при передаче объектов.

Транзакции

Еще одна отличная практика Celery. Иногда могут возникнуть проблемы, когда выполняемая задача не может найти объект в базе данных. Почему это происходит? В Django, если вы хотите выполнять задачи после регистрации пользователя, такие как отправка приветственного письма, настройки Django оборачивают все запросы в транзакцию. В Celery же задачи выполняются быстро, еще до завершения транзакции. Поэтому, если вы используете Celery при работе в Django, вы можете столкнуться с тем, что пользователь не существует в базе данных (пока).

Чтобы справиться с этим, вы можете набрать в Google “task transaction implementation”. Вкратце, это переписанный метод apply_async в task, классе, который устанавливает задачу в сигнале transaction.on_commit вместо того, чтобы делать это немедленно.

Заключение

Как видите, Celery имеет гораздо больше возможностей, чем просто отправка электронной почты.

Вы можете одновременно выполнять различные задачи с помощью основного процесса, и пока код выполнят основной функционал, Celery будет выполнять более мелкие задачи.

Вы можете настроить очереди, работать с частями данных в длительно выполняемых задачах и устанавливать время выполнения задач. Это позволит вам лучше планировать ход работы и тратить сэкономленное время на более важные вещи, пока группы задач Celery творят свою магию.

Перевод статьи «The Python Celery Cookbook: Small Tool, Big Possibilities».