RabbitMQ: Очереди задач

26-09-2024 - 8 minutes, 13 seconds -
Web-разработка php очереди rabbitmq

В первом уроке мы написали программы для отправки и получения сообщений из именованной очереди. В этом - мы создадим очередь задач, которая будет использоваться для распределения трудоемких задач между несколькими "работниками".

Основная идея очередей задач заключается в том, чтобы избежать немедленного выполнения ресурсоемкой задачи и необходимости ожидания ее завершения. Вместо этого мы планируем выполнение задачи позже. Мы инкапсулируем задачу в виде сообщения и отправляем ее в очередь. Рабочий процесс, работающий в фоновом режиме, будет выводить задачи и в конечном итоге выполнять задание. Когда вы запускаете много "работников", задачи будут распределены между ними.

Эта концепция особенно полезна в веб-приложениях, где невозможно выполнить сложную задачу в течение короткого временного промежутка HTTP-запроса.

Подготовка

В предыдущем уроке мы отправили сообщение, содержащее "Привет, мир!". Теперь мы будем отправлять строки, обозначающие сложные задачи. У нас нет реальной задачи, такой как изменение размера изображений или рендеринг pdf-файлов, поэтому давайте подделаем это, просто притворившись, что мы заняты - с помощью функции sleep(). Мы примем количество точек в строке за ее сложность; каждая точка будет составлять одну секунду "работы". Например, поддельная задача, описанная Hello... это займет три секунды.

Мы немного изменим send.php код из нашего предыдущего примера, позволяющий отправлять произвольные сообщения из командной строки. Эта программа будет планировать задачи в нашу рабочую очередь, так что давайте назовем ее _newtask.php:

$data = implode(' ', array_slice($argv, 1));

if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo ' [x] Sent ', $data, "\n";

Наш старый receive.php сценарий также требует некоторых изменений: для каждой точки в теле сообщения требуется по секунде работы. Он будет извлекать сообщения из очереди и выполнять задачу, так что давайте назовем его worker.php:

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

Обратите внимание, что наша поддельная задача имитирует время выполнения.

Запустите их как в первом уроке:

# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."

Циклический перебор

Одним из преимуществ использования очереди задач является возможность легкого распараллелить работу. Если мы накапливаем накопившуюся работу, мы можем просто добавить больше работников и таким образом легко масштабироваться.

Во-первых, давайте попробуем запустить два worker.php сценарии одновременно. Они оба будут получать сообщения из очереди, но как именно? Давайте посмотрим.

Вам нужно открыть три консоли. Две будут управлять сценарием worker.php. Этими консолями будут два наших потребителя - C1 и C2.

php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C

В третьей мы опубликуем новые задачи. После того, как вы запустили приложение, вы можете опубликовать несколько сообщений:

# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Давайте посмотрим, что получили наши работники:

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

По умолчанию RabbitMQ будет последовательно отправлять каждое сообщение следующему получателю. В среднем каждый получатель получит одинаковое количество сообщений. Такой способ распространения сообщений называется циклическим перебором. Попробуйте проделать это с тремя или более работниками.

Подтверждение сообщения

Выполнение задачи может занять несколько секунд. Вы можете задаться вопросом, что произойдет, если один из получателей начнет выполнять длинную задачу и умрет, выполнив ее лишь частично. С нашим текущим кодом, как только RabbitMQ доставляет сообщение получателю, он немедленно помечает его для удаления. В этом случае, если вы убьете работника, мы потеряем сообщение, которое он только что обрабатывал. Мы также потеряем все сообщения, которые были отправлены этому конкретному работнику, но еще не были обработаны.

Но мы не хотим терять ни одной задачи. Если работник умирает, мы бы хотели, чтобы задача была передана другому работнику.

Чтобы убедиться, что сообщение никогда не будет потеряно, RabbitMQ поддерживает подтверждения сообщений. Подтверждение отправляется обратно получателем, чтобы сообщить RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может удалить его.

Если получатель умирает (его канал закрыт, соединение закрыто или TCP-соединение потеряно), не отправив сообщение, RabbitMQ поймет, что сообщение не было обработано полностью, и повторно поставит его в очередь. Если в то же время в Сети есть другие получатели, он быстро перенаправит его другому получателю. Таким образом, вы можете быть уверены, что ни одно сообщение не будет потеряно, даже если работники иногда умирают.

При подтверждении доставки получателем устанавливается тайм-аут (по умолчанию 30 минут). Это помогает обнаруживать глючных (застрявших) получателей, которые никогда не подтверждают доставку. Вы можете увеличить это время ожидания.

Подтверждения сообщений ранее были отключены нами самими. Пришло время включить их, установив для четвертого параметра basic_consume значение false (true означает отсутствие подтверждения) и отправив соответствующее подтверждение от работника, как только мы закончим с задачей.

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done\n";
  $msg->ack();
};

# - - - - - - - - - - - - - - - - - - - - - - - -  V 
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Используя этот код, мы можем быть уверены, что даже если вы убьете работника с помощью CTRL+C во время обработки сообщения, ничего не будет потеряно. Вскоре после смерти работника все неподтвержденные сообщения будут доставлены повторно.

Подтверждение должно быть отправлено по тому же каналу, по которому была получена доставка. Попытки подтвердить использование другого канала приведут к выбросу исключения.

Забытое подтверждение

Это распространенная ошибка - пропустить ack. Это простая ошибка, но последствия серьезны. Сообщения будут повторно доставлены, когда ваш клиент завершит работу (что может выглядеть как случайная повторная доставка), но RabbitMQ будет потреблять все больше и больше памяти, поскольку он не сможет освободить какие-либо нераспакованные сообщения.

Чтобы отладить такого рода ошибки, вы можете использовать rabbitmqctl для печати поля messages_unacknowledged:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Долговечные сообщения

Мы научились делать так, чтобы даже в случае смерти получателя задача не была потеряна. Но наши задачи все равно будут потеряны, если сервер RabbitMQ остановится.

Когда RabbitMQ завершит работу или выйдет из строя, он забудет об очередях и сообщениях, если вы не скажете ему этого не делать. Чтобы убедиться, что сообщения не будут потеряны, необходимы две вещи: нам нужно пометить долговечными как очередь, так и сообщения.

Во-первых, нам нужно убедиться, что очередь переживет перезапуск узла RabbitMQ. Для этого нам нужно объявить его долговечным. Для этого мы передаем третий параметр queue_declare как true:

$channel->queue_declare('hello', false, true, false, false);

Хотя эта команда верна сама по себе, она не будет работать в нашей текущей настройке. Это потому, что мы уже определили очередь с именем hello, которая не является долговременной. RabbitMQ не позволяет вам переопределять существующую очередь с другими параметрами и вернет ошибку любой программе, которая попытается это сделать.

Но есть быстрый обходной путь - давайте объявим очередь с другим именем, например task_queue:

$channel->queue_declare('task_queue', false, true, false, false);

Этот флаг, установленный в значение true, должен применяться как к коду отправителя, так и к коду получателя.

На данный момент мы уверены, что очередь task_queue не будет потеряна, даже если RabbitMQ перезапустится. Теперь нам нужно пометить наши сообщения как постоянные - установив свойство сообщения delivery_mode = 2, которое AMQPMessage принимает как часть массива свойств.

$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

Примечание о постоянстве сообщений

Пометка сообщений как постоянных не дает полной гарантии того, что сообщение не будет потеряно. Хотя она сообщает RabbitMQ сохранить сообщение на диск, все еще существует кратковременное окно, когда RabbitMQ принял сообщение и еще не сохранил его. Кроме того, RabbitMQ не выполняет fsync(2) для каждого сообщения - оно может быть просто сохранено в кэше и на самом деле не записано на диск. Гарантии сохраняемости невелики, но этого более чем достаточно для нашей простой очереди задач. Если вам нужна более надежная гарантия, вы можете использовать publisher confirms.

Справедливая отправка

Возможно, вы заметили, что циклический перебор по-прежнему работает не совсем так, как мы хотим. Например, в ситуации с двумя работниками, когда все нечетные сообщения тяжелые, а четные сообщения легкие, один работник будет постоянно занят, а другой почти не будет выполнять никакой работы. Что ж, RabbitMQ ничего об этом не знает и по-прежнему будет равномерно отправлять сообщения.

Это происходит потому, что RabbitMQ просто отправляет сообщение, когда сообщение попадает в очередь. Он не учитывает количество неподтвержденных сообщений для получателя. Он просто вслепую отправляет каждое n-е сообщение n-му получателю.

Чтобы победить это, мы можем использовать метод basic_qos с настройкой prefetch_count = 1. Это говорит RabbitMQ не передавать более одного сообщения рабочему за раз. Или, другими словами, не отправляйте новое сообщение рабочему до тех пор, пока он не обработает и не подтвердит предыдущее сообщение. Вместо этого он отправит его следующему работнику, который еще не занят.

$channel->basic_qos(null, 1, null);

Примечание о размере очереди

Если все работники заняты, ваша очередь может заполниться. Вы захотите следить за этим и, возможно, добавить больше работников или использовать какую-то другую стратегию.

Собирая все это воедино

Окончательный код нашего _newtask.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

И нашего worker.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

Используя подтверждения сообщений и предварительную выборку, вы можете настроить рабочую очередь. Параметры долговечности позволяют задачам сохраняться даже при перезапуске RabbitMQ.

 


 

Оригинал статьи