Рубріки: Теория

Оптимизация очередей через Celery и Flask: делаем это правильно

Ігор Грегорченко

Если вы наткнулись на эту статью, скорее всего, вы знакомы с Flask и работаете над добавлением функции в свое веб-приложение, на выполнение которой уходит довольно много минут (если не больше). Возможно, вы хотите узнать, есть ли лучший или более быстрый способ сделать это. Да, именно так — такой способ есть, мы его подробно обсудим в этом справочном материале для всех любителей Python.

Если в вашем приложении есть долго выполняющаяся задача, например, обработка загруженных данных или отправка электронной почты, вы не хотите ждать ее завершения во время запроса. Вместо этого используйте очередь задач для отправки необходимых данных другому процессу, который будет выполнять задачу в фоновом режиме.

Celery — это мощная очередь задач, которую можно использовать как для простых фоновых задач, так и для сложных многоэтапных программ и расписаний. Это руководство покажет вам, как использовать Celery с помощью Flask, но предполагает, что вы уже прочитали руководство «Первые шаги с Celery» в документации по Celery.

Celery — это отдельный пакет Python. Установите его из PyPI с помощью pip:

$ pip install celery

Введение

Некоторые распространенные примеры, которые мы обсудим ниже,  включают в себя:

  • Вызов стороннего API для получения некоторых данных на основе пользовательского ввода.
  • Отправка электронного письма пользователю при регистрации.
  • Генерация отчета в формате PDF.

Все эти типы задач блокируют цикл запрос/ответ до его завершения, что означает, что пользователю придется подождать некоторое время. Чтобы разгрузить длительно выполняющиеся задачи, подобные этим, вы можете использовать Celery, который предоставляет механизм для разгрузки этих задач на отдельные рабочие потоки.

Celery взаимодействует с помощью сообщений, обычно используя брокер для посредничества между клиентами и рабочими потоками. Чтобы инициировать задачу, клиент Celery добавляет сообщение в очередь, а брокер затем доставляет это сообщение рабочему потоку.

Наиболее часто используемыми брокерами являются Redis и RabbitMQ. Мы установим сервер Redis локально, чтобы использовать этот механизм.

Необходимые стартовые условия

  • Python 3.6+
  • Virtualenv v20+
  • Кроме того, ожидается промежуточное знание Python и Flask. Все остальное будет объяснено по ходу статьи.

Настройка проекта

Скачайте стартовый проект и настройте его с помощью следующих команд:

git clone -b step_1 https://github.com/raunaqness/flask_celery_tutorial.git
cd flask_celery_tutorial

# make virtualenv
virtualenv v
source v/bin/activate

# install dependencies
pip install -r requirements.txt

# start server
export FLASK_APP=app; python -m flask run

Откройте 127.0.0.1:5000 в браузере, и, если все работает нормально, вы должны увидеть надпись «Hello, world!».

Далее добавим маршрут, который будет содержать кнопку Button, при нажатии на которую будет запускаться имитация длительной задачи, например, отправка электронного письма, создание отчета в формате PDF, вызов API стороннего разработчика и т.д.

Мы сымитируем этот API с помощью time.sleep(), который заблокирует работу приложения на 15 секунд.

Откройте файл app.py и добавьте следующий блок кода.

# route that will show will simply render an HTML template
@app.route("/tasks")
def tasks():
    return render_template("tasks.html")

# route that will execute a long-running task
@app.route("/long_running_task")
def long_running_task():
    # time in seconds 
    time_to_wait = 15

    print(f"This task will take {time_to_wait} seconds to complete...")
    time.sleep(time_to_wait)

    return f"<p>The task completed in {time_to_wait} seconds!"

Убедитесь, что импортировали модуль времени, добавив следующее, вместе с утверждениями импорта в верхней части файла:

import time

Далее создайте каталог с именем templates в корне проекта. Внутри него создайте новый файл tasks.html и добавьте в него следующее:

<!DOCTYPE html>
<html>

<head>
    <title>Tasks</title>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />

    <!-- Bootstrap CSS -->
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-1BmE4kWBq78iYhFldvKuhfTAU6auU8tT94WrHftjDbrCEXSU1oBoqyl2QvZ6jIW3" crossorigin="anonymous" />
</head>

<body>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"
        integrity="sha384-ka7Sk0Gln4gmtz2MlQnikT1wXgYsOg+OMhuP+IlRH9sENBO0LRn5q+8nbTov4+1p"
        crossorigin="anonymous"></script>

    <div>
        <a class="btn btn-primary" href="/long_running_task" role="button">Trigger Long Running Task</a>
    </div>
</body>

</html>

Структура (дерево) вашего проекта должна выглядеть примерно так:

code
├── __pycache__
│   └── app.cpython-38.pyc
├── app.py
├── requirements.txt
└── templates
    └── tasks.html

2 directories, 4 files

Вернитесь в терминал, остановите и снова запустите сервер Flask, затем откройте 127.0.0.1:5000/tasks в браузере. Вы должны увидеть страницу tasks.html с единственной кнопкой.

Теперь, когда вы нажмете на кнопку Trigger Long-Running Task, произойдет переход к маршруту /long_running_task, который выполнит функцию def long_running_task(), определенную в файле app.py.

Обратите внимание, что страница будет находиться в состоянии «загрузки» в течение 15 секунд, так что ваше приложение застрянет в этом состоянии и не сможет выполнить никакую другую операцию, пока не завершится текущая.

Через 15 секунд вы должны увидеть выполненную задачу и ожидаемый ответ в браузере.

Также обратите внимание, что вы сможете видеть операторы печати в окне терминала во время выполнения длительного задания.

Теперь давайте посмотрим, как мы можем использовать Celery для выполнения этой задачи в фоновом режиме. Если у вас возникли проблемы, вы можете посмотреть текущее состояние вашего проекта вот здесь.

Настройка Celery и Redis

Вы уже установили пакет Celery python в начальной настройке. Чтобы подтвердить установку пакета, вы можете запустить pip freeze в окне терминала с активированным virtualenv, чтобы увидеть все установленные пакеты.

Далее необходимо установить Redis Server на локальной машине. Официальную инструкцию по установке можно найти вот здесь.

Теперь давайте настроим Celery.

Начало работы с Celery

Создайте новый файл в корне проекта под названием celery_utils.py. Он будет использоваться для инициализации экземпляра приложения Celery, аналогично тому, как мы инициализируем приложение Flask в app.py. Добавьте в файл следующий код:

from celery import Celery

# celery config
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'

# initialize celery app
def get_celery_app_instance(app):
    celery = Celery(
        app.import_name,
        backend=CELERY_BROKER_URL,
        broker=CELERY_BROKER_URL
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

Вот краткое объяснение:

  • Импортируется пакет Celery python.
  • Определяется функция get_celery_app_instance, которая возвращает экземпляр Celery, который, в свою очередь, требует следующих параметров для инициализации:
    • name: это имя рабочего Celery.
    • backend: это URL используемого бэкенда, которым в данном случае является Redis, а URL хоста определяется переменной CELERY_BROKER_URL.
    • брокер: аналогично бэкенду, необходимо определить URL брокера, который также является сервером Redis.
  • <объяснить ContextTask>.
  • <возвратить экземпляр приложения celery>.

Далее, давайте используем Celery для определения долгоиграющей задачи. Внесите следующие изменения в app.py:

Добавьте следующее рядом с операторами импорта.

# importing function to get celery app instance
from celery_utils import get_celery_app_instance

Добавьте следующее после утверждений, инициализирующих приложение Flask:

# celery app instance
celery = get_celery_app_instance(app)

Затем добавьте следующий блок кода в нижнюю часть файла:

# celery tasks
@celery.task
def sending_email_with_celery():
    print("Executing Long running task : Sending email with celery...")
    time.sleep(15)
    print("Task complete!")

Здесь мы просто определили функцию с именем sending_email_with_celery(), которая будет имитировать отправку электронного письма, которая может занять 15 секунд.

Однако для того, чтобы эта функция выполнялась как фоновая задача, в строке чуть выше определения функции добавлен декоратор @celery.task.

Наконец, определите маршрут для запуска этой функции:

# route to trigger celery task
@app.route("/long_running_task_celery")
def long_running_task_celery():
    # function.delay() is used to trigger function as celery task
    sending_email_with_celery.delay()
    return f"Long running task triggered with Celery! Check terminal to see the logs..."

В этом блоке кода мы определяем маршрут /long_running_task_celery, который запускает функцию как задачу Celery.

Обратите внимание, что функция вызывается с помощью метода delay(). Это указывает на то, что мы хотим запустить эту функцию как задачу Celery, а не как обычную функцию Python.

Наконец, чтобы увидеть это в действии, давайте добавим еще одну кнопку в файл tasks.html для запуска этой функции.

<div>
    <a class="btn btn-primary" href="/long_running_task" role="button">Trigger Long Running Task</a>
</div>

<!-- new code -->
<div>
    <a class="btn btn-primary" href="/long_running_task_celery" role="button">Trigger Long Running Task with Celery</a>
</div>

Пришло время увидеть его в действии!

Убедитесь, что у вас запущен сервер Flask в окне терминала.

В другом окне терминала перейдите (cd) в корень проекта и выполните следующую команду для запуска Celery worker.

celery -A app.celery worker --loglevel=info

Откройте 127.0.0.1:5000/tasks в браузере, где вы должны увидеть две кнопки:

  • Запуск долго выполняющейся функции с помощью Python.
  • Запуск долго выполняющейся функции с помощью Celery.

Мы уже видели, что если мы запускаем долго выполняющуюся функцию с помощью Python, сервер замирает до завершения выполнения этой функции.

Теперь, если вы нажмете на кнопку Trigger Long-Running Task with Celery, вы увидите, что страница мгновенно перенаправляется на маршрут /long_running_task_celery, а в окне браузера появится ожидаемый результат.

В фоновом режиме выполнение функции осуществляется Celery. Чтобы посмотреть журнал выполнения функции, переключитесь в окно терминала, в котором вы запустили Celery worker. Оно должно выглядеть примерно так:

Заключение

Вот и все! Теперь вы знаете, как настраивать и запускать длительно выполняющиеся задачи с помощью Celery в вашем веб-приложении Flask.

Вот краткий обзор того, что мы обсудили выше. Чтобы запустить функцию как задачу Celery, вам необходимо:

  • Импортируйте экземпляр приложения Celery в ваш файл.
  • Добавьте декоратор @celery.task поверх определения функции.
  • Запустите функцию, используя метод function_name.delay().

Останні статті

Что такое прокси-сервер: пояснение простыми словами, зачем нужны прокси

Прокси (proxy), или прокси-сервер — это программа-посредник, которая обеспечивает соединение между пользователем и интернет-ресурсом. Принцип…

21.11.2024

Что такое PWA приложение? Зачем необходимо прогрессивное веб-приложение

Согласитесь, было бы неплохо соединить в одно сайт и приложение для смартфона. Если вы еще…

19.11.2024

Как создать игру на телефоне: программирование с помощью конструктора

Повсеместное распространение смартфонов привело к огромному спросу на мобильные игры и приложения. Миллиарды пользователей гаджетов…

17.11.2024

Google Bard: эффективный аналог ChatGPT

В перечне популярных чат-ботов с искусственным интеллектом Google Bard (Gemini) еще не пользуется такой популярностью…

14.11.2024

Скрипт и программирование: что это такое простыми словами

Скрипт (англ. — сценарий), — это небольшая программа, как правило, для веб-интерфейса, выполняющая определенную задачу.…

12.11.2024

Дедлайн в разработке: что это такое простыми словами

Дедлайн (от англ. deadline — «крайний срок») — это конечная дата стачи проекта или задачи…

11.11.2024