RabbitMQ: Публикация и Подписка

26-09-2024 - 4 minutes, 44 seconds -
Web-разработка php очереди rabbitmq

В предыдущем уроке мы создали рабочую очередь. Предположение, лежащее в основе рабочей очереди, заключается в том, что каждая задача доставляется ровно одному работнику. В этой части мы сделаем нечто совершенно другое - мы передадим сообщение нескольким получателям.Этот шаблон известен как "публикация/подписка".

Чтобы проиллюстрировать эту закономерность, мы собираемся создать простую систему ведения журнала. Он будет состоять из двух программ: первая будет выдавать сообщения журнала, а вторая будет получать и печатать их.

В нашей системе ведения журнала каждая запущенная копия программы-получателя будет получать сообщения. Таким образом, мы сможем запустить один приемник и направить журналы на диск; и в то же время мы сможем запустить другой приемник и увидеть журналы на экране.

По сути, опубликованные сообщения журнала будут транслироваться всем получателям.

Обменники

В предыдущих частях руководства мы отправляли и получали сообщения в очередь и из нее. Теперь пришло время представить полную модель обмена сообщениями в Rabbit.

Давайте быстро пройдемся по тому, что мы рассмотрели в предыдущих уроках:

  • Отправитель - это пользовательское приложение, которое отправляет сообщения.
  • Очередь - это буфер, в котором хранятся сообщения.
  • Получатель - это пользовательское приложение, которое получает сообщения.

Основная идея модели обмена сообщениями в RabbitMQ заключается в том, что отправитель никогда не отправляет никаких сообщений непосредственно в очередь. На самом деле, довольно часто отправитель даже не знает, будет ли сообщение вообще доставлено в какую-либо очередь.

Вместо этого отправитель может отправлять сообщения только на "обменник". Обменник - это очень простая вещь. С одной стороны, он получает сообщения от отправителей, а с другой стороны, помещает их в очереди. Обменник должен точно знать, что делать с полученным сообщением. Должен ли он быть добавлен к определенной очереди? Должен ли он быть добавлен ко многим очередям? Или его следует выбросить. Правила для этого определяются типом обмена.

Доступно несколько типов обменников: direct, topic, headers и fanout. Мы сосредоточимся на последнем - fanout. Давайте создадим обменник такого типа и назовем его logs:

$channel->exchange_declare('logs', 'fanout', false, false, false);

Обменик fanout (разветвления) очень прост. Как вы, вероятно, можете догадаться из названия, он просто передает все сообщения, которые он получает, во все известные ему очереди. И это именно то, что нам нужно для нашего скрипта.

Список обменников

Чтобы посмотреть обменники на сервере, вы можете запустить всегда полезный rabbitmqctl:

sudo rabbitmqctl list_exchanges

В этом списке будут некоторые обменники amq.* и обменник по умолчанию (безымянный). Они создаются по умолчанию, но вряд ли вам понадобится использовать их в данный момент.

Обменник по умолчанию

В предыдущих частях руководства мы ничего не знали об обменниках, но все же могли отправлять сообщения в очереди. Это было возможно, потому что мы использовали обменник по умолчанию, который мы идентифицируем по пустой строке ("").

Вспомните, как мы опубликовали сообщение ранее:

$channel->basic_publish($msg, ", "hello");

Здесь мы используем обменник по умолчанию или безымянный обменник: сообщения направляются в очередь с именем, указанным routing_key, если оно существует. Ключ маршрутизации является третьим аргументом для basic_publish

Теперь вместо этого мы можем публиковать на нашем именованном обменнике:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

Временные очереди

Как вы, возможно, помните, ранее мы использовали очереди с определенными именами (помните hello и task_queue?). Возможность назвать очередь имела для нас решающее значение - нам нужно было указать работникам на одну и ту же очередь. Присвоение имени очереди важно, если вы хотите разделить очередь между отправителями и получателями.

Но это не относится к нашему регистратору. Мы хотим услышать обо всех сообщениях журнала, а не только о их подмножестве. Нас также интересуют только текущие сообщения, а не старые. Чтобы решить эту проблему, нам нужны две вещи.

Во-первых, всякий раз, когда мы подключаемся к Rabbit, нам нужна свежая, пустая очередь. Для этого мы могли бы создать очередь со случайным именем или, что еще лучше, позволить серверу выбрать для нас случайное имя очереди.

Во-вторых, как только мы отключим потребителя, очередь должна быть автоматически удалена.

В клиенте php-amqplib, когда мы указываем имя очереди в виде пустой строки, мы создаем недолговечную очередь с сгенерированным именем:

[$queue_name, ,] = $channel->queue_declare("");

Когда метод возвращается, переменная $queue_name содержит случайное имя очереди, сгенерированное RabbitMQ. Например, это может выглядеть как amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Когда соединение, объявившее его, закроется, очередь будет удалена, поскольку она объявлена как исключительная. Вы можете узнать больше об исключительном флаге и других свойствах очереди в руководстве по очередям.

Привязка

Мы уже создали обменник fanout и очередь. Теперь нам нужно сообщить обменнику, чтобы он отправлял сообщения в нашу очередь. Эта связь между обменником и очередью называется привязкой.

$channel->queue_bind($queue_name, 'logs');

Отныне скрипт logs будет добавлять сообщения в нашу очередь.

Список привязок

Вы можете перечислить существующие привязки, используя, как вы уже догадались,

rabbitmqctl list_bindings

Собирая всё воедино

Программа-отправитель, которая выдает сообщения журнала, внешне не сильно отличается от предыдущего руководства. Самое важное изменение заключается в том, что теперь мы хотим публиковать сообщения на нашем сервере обмена логами вместо безымянного.

Вот код для emit_log.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

Как вы видите, после установления соединения мы объявили обменник. Этот шаг необходим, так как публикация на несуществующий обменник запрещена.

Сообщения будут потеряны, если ни одна очередь еще не привязана к обменнику, но для нас это нормально; если ни один получатель еще не прослушивает, мы можем безопасно отбросить сообщение.

Код для receive_logs.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

 


 

Оригинал статьи