RabbitMQ: Введение

26-09-2024 - 3 minutes, 50 seconds -
Web-разработка php очереди rabbitmq

RabbitMQ - это посредник сообщений: он принимает и пересылает сообщения.

Вы можете сравнивать его с почтовым отделением: когда вы кладете почту в почтовый ящик, вы можете быть уверены, что почтальон в конечном итоге доставит почту вашему получателю.

Давайте определимся с понятиями, используемыми в RabbitMQ:

  • Producer - программа, которая отправляет сообщения, так называемый "отправитель".
  • Queue - это имя почтового ящика, который находится внутри RabbitMQ. Хотя сообщения проходят через RabbitMQ и ваши приложения, они могут храниться только внутри очереди. Очередь ограничена только ограничениями памяти и диска хоста, и по сути, это большой буфер сообщений. Многие отправители могут отправлять сообщения, которые попадают в одну очередь, и многие потребители могут пытаться получать данные из одной очереди.
  • Consumer - это программа, которая в основном ожидает получения сообщений, так называемый "потребитель".

Обратите внимание, что отправитель, потребитель и посредник не обязательно должны находиться на одном хосте.

"Привет, мир"

В этой части руководства мы напишем две программы на PHP, которые взаимодействуют с помощью RabbitMQ с помощью клиентской библиотеки (php-amqplib), для которой требуется PHP 7.x или 8.x.

Первая программа будет производителем, который отправляет одно сообщение, а вторая - потребителем, который получает сообщения и распечатывает их. Мы остановимся на некоторых деталях API php-amqplib, сосредоточившись на этой очень простой вещи, чтобы начать работу. Это система обмена сообщениями "Привет, мир".

Клиентская библиотека php-amqplib

RabbitMQ поддерживает несколько протоколов. В этом руководстве рассматривается AMQP 0-9-1, который представляет собой открытый протокол общего назначения для обмена сообщениями. Существует множество клиентов для RabbitMQ на самых разных языках. В этом руководстве мы будем использовать php-amqplib и Composer для управления зависимостями.

Добавьте файл composer.json в свой проект:

{
    "require": {
        "php-amqplib/php-amqplib": ">=3.0"
    }
}

или выполните команду

composer require php-amqplib/php-amqplib

Теперь у нас установлена библиотека php-amqplib и мы можем продолжить.

Отправка

Мы назовем нашего отправителя send.php и нашего потребителя - receive.php . Отправитель подключится к RabbitMQ, отправит одно сообщение, а затем завершит работу.

В send.php, нам нужно включить библиотеку и использовать необходимые классы:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

и затем подключится к серверу:

# Здесь создаётся и абстрагируется соединение с сокетом 
# (и делается большая работа за нас: соединение заботится о согласовании версии протокола, аутентификации и всем прочим)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

# Затем мы создаем канал, в котором находится
# большая часть API для выполнения задач.
$channel = $connection->channel();

Для отправки сообщения нам необходимо:

#  1. Создать экземпляр сообщения
$msg = new AMQPMessage('Hello World!');

# 2. Объявить очередь для отправки
$channel->queue_declare('hello', false, false, false, false);

# 3. Опубликовать сообщение в очереди
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

Объявление очереди является идемпотентным - она будет создана только в том случае, если она еще не существует.

Содержимое сообщения представляет собой массив байтов, поэтому вы можете хранить там все, что вам нравится.

Наконец, мы можем закрыть канал и соединение:

$channel->close();
$connection->close();

Здесь вы можете посмотреть весь код файла send.php.

Отправка не работает!

Если вы впервые используете RabbitMQ и не видите сообщения "Отправлено", то, возможно, вам придется ломать голову, гадая, что может быть не так. Возможно, RabbitMQ был запущен без достаточного количества свободного места на диске (по умолчанию ему требуется не менее 200 МБ свободного места) и поэтому отказывается принимать сообщения. Проверьте файл журнала брокера, чтобы подтвердить и при необходимости уменьшить лимит. Документация по файлу конфигурации покажет вам, как установить disk_free_limit.

Получение

Наш получатель прослушивает сообщения от RabbitMQ, поэтому, в отличие от отправителя, который публикует одно сообщение, мы будем поддерживать приемник включенным, чтобы прослушивать сообщения и выводить их.

Код (в receive.php) имеет почти те же классы, что и отправка:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

Настройка такая же, как у отправителя: мы открываем соединение и канал, объявляем очередь, из которой мы собираемся читать сообщения.

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

Обратите внимание, что здесь мы также объявляем очередь. Поскольку мы можем запустить получателя раньше отправителя, мы хотим убедиться, что очередь существует, прежде чем попытаемся использовать сообщения из нее.

Мы собираемся сообщить серверу, чтобы он доставлял нам сообщения из очереди. Мы определим код, который будет получать сообщения, отправляемые сервером.

Имейте в виду, что сообщения отправляются асинхронно с сервера клиентам.

$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

Наш код будет блокироваться, пока в нашем $channel есть вызовы. Всякий раз, когда мы получаем сообщение, в наш $callback будет передано полученное сообщение.

Здесь вы можете посмотреть весь код файла receive.php.

Список очередей

Если вы захотите посмотреть список очередей RabbitMQ и сколько в них сообщений, вы можете сделать это с помощью утилиты rabbitmqctl::

sudo rabbitmqctl list_queues

 


 

Оригинал статьи