RabbitMQ: Темы

26-09-2024 - 3 minutes, 57 seconds -
Web-разработка php очереди rabbitmq

В предыдущем уроке мы улучшили нашу систему ведения журнала. Вместо того, чтобы использовать разветвленный обменник (fanout), способный только к фиктивной трансляции, мы использовали прямой (direct) и получили возможность выборочного получения журналов.

Хотя использование прямого обмена улучшило нашу систему, у нее все еще есть ограничения - она не может выполнять маршрутизацию на основе нескольких критериев.

В нашей системе ведения журнала мы могли бы захотеть подписаться не только на журналы, основанные на серьезности, но и на источнике, который отправил журнал. Возможно, вы знакомы с этой концепцией из инструмента syslog unix, который маршрутизирует журналы на основе как серьезности (info/warn/crit...), так и удобства (auth/cron/kern...).

Это дало бы нам большую гибкость - мы можем захотеть прослушивать только критические ошибки, поступающие из "cron", но также и все журналы из "kern".

Чтобы реализовать это в нашей системе ведения журнала, нам нужно узнать о более сложном обменнике - тематическом (topic).

Тематический обменник

Сообщения, отправляемые на тематический обменник, не могут иметь произвольный routing_key - это должен быть список слов, разделенных точками. Слова могут быть любыми, но обычно они указывают на некоторые функции, связанные с сообщением. Несколько допустимых примеров ключей маршрутизации: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". В ключе маршрутизации может быть столько слов, сколько вам нравится, до предела в 255 байт.

Ключ привязки также должен быть в той же форме. Логика тематического обменника аналогична прямому - сообщение, отправленное с определенным ключом маршрутизации, будет доставлено во все очереди, которые связаны соответствующим ключом привязки. Однако есть два важных особых случая для привязки ключей:

  • * (звездочка) может заменить ровно одно слово.
  • # (хэш) может заменить ноль или более слов.
  • Проще всего объяснить это на примере:

python-five

В этом примере мы собираемся отправлять сообщения, все из которых описывают животных. Сообщения будут отправляться с ключом маршрутизации, который состоит из трех слов (две точки). Первое слово в ключе маршрутизации будет описывать скорость, второе - цвет, а третье - вид: <speed>.<color>.<species>.

Мы создали три привязки: Q1 привязан с помощью ключа привязки *.orange.*, а Q2 - с помощью *.*.rabbit и lazy.#.

Эти привязки можно резюмировать следующим образом:

  • Q1 интересуется всеми оранжевыми животными.
  • Q2 хочет услышать все о кроликах и все о ленивых животных.

Сообщение с ключом маршрутизации, установленным на quick.orange.rabbit, будет доставлено в обе очереди. Сообщение lazy.orange.elephant также достанется им обоим. С другой стороны, quick.orange.fox попадет только в первую очередь, а lazy.brown.fox - только во вторую. lazy.pink.rabbit будет доставлен во вторую очередь только один раз, даже если он соответствует двум привязкам. quick.brown.fox не соответствует ни одной привязке, поэтому он будет отброшен.

Что произойдет, если мы разорвем наш контракт и отправим сообщение с одним или четырьмя словами, например "оранжевый" или "быстрый.оранжевый.самец.кролик"? Что ж, эти сообщения не будут соответствовать никаким привязкам и будут потеряны.

С другой стороны, lazy.orange.male.rabbit, даже если он состоит из четырех слов, будет соответствовать последней привязке и будет доставлен во вторую очередь.

Тематический обменник

Тематический обменник является мощным и может вести себя так же, как и другие обменники.

Когда очередь связана ключом привязки # (хэш) - она будет получать все сообщения, независимо от ключа маршрутизации - как в разветвленном обменнике.

Когда специальные символы * (звезда) и # (хэш) не используются в привязках, тематический обменник будет вести себя так же, как и прямой.

Собирая все это воедино

Мы собираемся использовать тематический обменник в нашей системе ведения журнала. Мы начнем с рабочего предположения, что ключи маршрутизации журналов будут содержать два слова: <facility>.<severity>.

Код почти такой же, как и в предыдущем руководстве.

Код для emit_log_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

Код для receive_logs_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

Что бы получить все логи:

php receive_logs_topic.php "#"

Что бы получить все логи с важностью "kern":

php receive_logs_topic.php "kern.*"

Или что бы получить все критичные логи:

php receive_logs_topic.php "*.critical"

Вы можете выбрать несколько вариантов

php receive_logs_topic.php "kern.*" "*.critical"

И что бы вызвать ошибку типа "kern.critical", выполните:

php emit_log_topic.php "kern.critical" "A critical kernel error"

Обратите внимание, что код не делает никаких предположений о ключах маршрутизации или привязки, вы можете захотеть поиграть с более чем двумя параметрами ключа маршрутизации.

 


 

Оригинал статьи