По своей сути Beanstalkd — упрощенная и легкая система очередей, которая разрабатывалась под нужны Causes. Представляется как менеджер заданий распределенного приложения, который собирает отложенные задачи (отправка почты, различные запросы рода).
Определенные процессы размещают задачи в очередь, а воркеры получают и выполняют задачи из очереди.
Среди основных возможностей beanstalkd:
Терминология инструмента отличается от привычных серверов сообщений, того же Gearman, к примеру:
Основных команд у beanstalkd тоже немного:
Больше команд и опций на официальной странице системы.
Beanstalkd входит в систему пакетов aptitude:
sudo aptitude install beanstalkd
# Также собирается из исходников с GitHub
А для запуска достаточно выполнить:
beanstalkd
# По умолчанию выполняется локально используя порт 11300
После установки демоном beanstalkd также можно управлять как сервисом ОС. Он выполняется в RAM, а для обеспечения fault tolerance запускается с дополнительной опцией:
beanstalkd -b ~/beanstore &
# Пишет данные очереди в каталог beanstore, выполнение в фоне
Если перезапустить beanstalkd с такими же параметрами, то он в первую очередь проверит лог и продолжит выполнение с места остановки.
Демона beanstalkd можно запускать с опциями:
-b DIR директория для валидации
-f MS fsync каждое значение миллисекунд ( -f0 для "always fsync")
-F без fsync (по умолчанию)
-l ADDR слушать адрес (0.0.0.0 по умолчанию)
-p PORT слушать порт (11300 по умолчанию)
-u USER стать пользователем и группой
-z BYTES максимальный размер задания в байтах (65535 по умолчанию)
-s BYTES размер каждого файла валидации (10485760 по умолчанию)
(будут округлены до значения, кратного 512 байт)
-c уменьшить binlog (по умолчанию)
-n не уменьшать binlog
-v показать версию
-V улучшенный вывод
-h показать справку # Опции можно комбинировать
Ко всему прочему beanstalkd поддерживает клиенты для всех популярных языков и обладает API для создания проприетарных клиентов.
Для первого примера используем клиент Beaneater для Ruby. Размещение задачи (сообщения) в очередь будет выглядеть так:
require 'beaneater'
require 'json'
beanstalk = Beaneater::Pool.new(['localhost:11300'])
tube = beanstalkd.tubes['my-tube']
job = {some: 'key', value: 'object'}.to_json
tube.put job # Задает адрес и порт, имя очереди и само сообщение, в формате JSON
После этого нужно создать скрипт для вылавливания заданий из очереди:
beanstalkd.tubes.watch!('my-tube')
loop do
job = beanstalk.tubes.reserve
begin
# обработка задачи
job.delete
rescue Exception => e
job.bury
end
end # Дожидается готовности задачи, обрабатывает ее и повторяет весь процесс
Для управления приложениями-потребителями логично использовать контроллеры процессов, как supervisor.
Если обрабатывается большая, тяжелая задача, то верхний пример можно представить как:
beanstalkd.jobs.register('my-tube') do |job|
# ... обработка задачи
end
beanstalkd.jobs.process! # “Оборачивает” скрипт, резервируя, обрабатывая, а затем удаляя или откладывая задачу, основываясь на результате
Во втором примере реализована отправка почты при помощи Mandrill. Для удобства установки и использования клиента Pheanstalk для PHP рекомендуется использовать Composer:
composer require mandrill/mandrill pda/pheanstalk
# Установка зависимостей
Потребуется всего два скрипта: поставщика задачи в очередь и потребителя, который будет обрабатывать задания.
Код поставщика будет иметь вид:
<?php
require_once __DIR__ . '/vendor/autoload.php';
$email = array(
'to' => 'xyz@example.com',
'from' => 'abc@example.com',
'subject' => 'Subject',
'body' => 'Some text'
);
$pheanstalk = new \Pheanstalk\Pheanstalk('127.0.0.1');
# Добавляет JSON для задачи "email_queue"
$pheanstalk
->useTube('email_queue')
->put(json_encode($email)); # Создает локальную задачу email_queue — строку JSON массива с данными
А воркер будет выглядеть так:
<?php
require_once __DIR__ . '/vendor/autoload.php';
$pheanstalk = new \Pheanstalk\Pheanstalk('127.0.0.1');
# Чтение очереди beanstalkd
while (true) {
# Проверка соединения
if (!$pheanstalk->getConnection()->isServiceListening()) {
echo "Ошибка соединения, подождите... \n";
# Ждет 5 с
sleep(5);
# Запуск следующей итерации
continue;
}
# Получить задачу из очереди, если она готова
$job = $pheanstalk
->watch('email_queue')
->ignore('default')
->reserve();
$email = json_decode($job->getData(), true);
try {
# Отправка почты с использованием Mandrill API
$mandrill = new Mandrill('[MANDRILL-API-KEY');
$message = array(
'html' => $email['body'],
'subject' => $email['subject'],
'from_email' => $email['from'],
'to' => array(
array(
'email' => $email['to'],
)
)
);
# Непосредственная отправка письма
$result = $mandrill->messages->send($message);
# Удалить задачу из очереди
$pheanstalk->delete($job);
echo "Письмо отправлено \n";
} catch (Exception $e) {
echo "Ошибка отправки - {$e->getMessage()} \n";
}
} # Не забудьте использовать свой уникальный API
Затем можно запускать команду:
nohup php email_worker.php > path/to/logfile.txt &
# Игнорирует сигнал SIGHUP, записывает данные в текстовый файл, выполняется в фоне
Beanstalkd подходит для реализации системы очередей любой сложности на любом популярном языке программирования. А распределенное выполнение позволит использовать инструмент в высоко-нагруженных приложениях.
На фоне роста спроса на ликвидность в бычьем рынке 2025 года, криптозаймы снова выходят на…
Прокси (proxy), или прокси-сервер — это программа-посредник, которая обеспечивает соединение между пользователем и интернет-ресурсом. Принцип…
Согласитесь, было бы неплохо соединить в одно сайт и приложение для смартфона. Если вы еще…
Повсеместное распространение смартфонов привело к огромному спросу на мобильные игры и приложения. Миллиарды пользователей гаджетов…
В перечне популярных чат-ботов с искусственным интеллектом Google Bard (Gemini) еще не пользуется такой популярностью…
Скрипт (англ. — сценарий), — это небольшая программа, как правило, для веб-интерфейса, выполняющая определенную задачу.…