RabbitMQ: Маршрутизация

26-09-2024 - 3 minutes, 45 seconds -
Web-разработка php rabbitmq очереди

В предыдущем уроке мы создали простую систему ведения журнала. Мы смогли транслировать сообщения журнала многим получателям.

В этом руководстве мы собираемся добавить к нему функцию - мы собираемся сделать возможной подписку только на подмножество сообщений. Например, мы сможем направлять в файл журнала только сообщения о критических ошибках (для экономии места на диске), сохраняя при этом возможность печатать все сообщения журнала на консоли.

Привязки

В предыдущих примерах мы уже создавали привязки. Вы можете вспомнить код, подобный:

$channel->queue_bind($queue_name, 'logs');

Привязка - это связь между обменником и очередью. Это можно просто прочитать как: очередь интересуется сообщениями с этого обменника.

Привязки могут принимать дополнительный параметр routing_key. Чтобы избежать путаницы с параметром $channel::basic_publish, мы будем называть его ключом привязки. Вот как мы могли бы создать привязку с помощью ключа:

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

Значение ключа привязки зависит от типа обмена. Разветвленные обменники, которые мы использовали ранее, просто игнорировали его значение.

Прямой обмен

Наша система ведения журнала из предыдущего руководства транслирует все сообщения всем получателям. Мы хотим расширить это, чтобы разрешить фильтрацию сообщений в зависимости от их серьезности. Например, мы можем захотеть, чтобы скрипт, который записывает сообщения журнала на диск, получал только критические ошибки и не тратил дисковое пространство на предупреждения или информационные сообщения журнала.

Мы использовали fanout (разветвленный) обменник, который не дает нам большой гибкости - он способен только на бессмысленную трансляцию.

Вместо этого мы будем использовать обменник direct (прямой). Алгоритм маршрутизации, лежащий в основе прямого обмена, прост - сообщение отправляется в очереди, ключ привязки которых точно совпадает с ключом маршрутизации сообщения.

Чтобы проиллюстрировать это, рассмотрим следующую настройку:

direct-exchange

В этой настройке мы можем видеть прямой обмен X с привязанными к нему двумя очередями. Первая очередь связана ключом привязки orange, а вторая имеет две привязки, одну с ключом привязки black, а другую с green.

При такой настройке сообщение, опубликованное на бирже с ключом маршрутизации orange, будет перенаправлено в очередь Q1. Сообщения с ключом маршрутизации black или green будут отправляться в Q2. Все остальные сообщения будут отброшены.

Множественные привязки

direct-exchange-multiple

Совершенно законно связывать несколько очередей одним и тем же ключом привязки. В нашем примере мы могли бы добавить привязку между X и Q1 с помощью ключа привязки black. В этом случае прямой обмен будет вести себя как разветвление и будет транслировать сообщение во все соответствующие очереди. Сообщение с ключом black маршрутизации будет доставлено как в Q1, так и в Q2.

Излучающие журналы

Мы будем использовать эту модель для нашей системы ведения журнала. Вместо обменника fanout мы будем отправлять сообщения на обменник direct. Мы будем указывать серьезность журнала в качестве ключа маршрутизации. Таким образом, принимающий скрипт сможет выбрать серьезность, которую он хочет получить. Давайте сначала сосредоточимся на выпуске журналов.

Как всегда, сначала нам нужно создать обменник:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

И мы готовы отправить сообщение:

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);

Чтобы упростить ситуацию, мы предположим, что $severity может быть одной из 'info', 'warning', 'error'..

Подписка

Получение сообщений будет работать так же, как в предыдущем руководстве, за одним исключением - мы собираемся создать новый обменник для каждой интересующей нас серьезности.

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

Собирая всё воедино

python-four

Код для emit_log_direct.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();

Код для receive_logs_direct.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

Если вы хотите сохранять 'warning' и 'error' (не 'info') сообщения в файл, просто выполните в консоли:

php receive_logs_direct.php warning error > logs_from_rabbit.log

Если вы хотите увидеть все сообщения на экране, тогда выполните это:

php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

И для примера, что бы передать error ошибку, просто наберите

php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

 


 

Оригинал статьи