[go: up one dir, main page]

Aller au contenu

Programmation PHP/RabbitMQ

Un livre de Wikilivres.

RabbitMQ est un logiciel de messages en protocole AMQP. Il permet donc à des processus de produire des messages JSON dans des files d'attente pour que d'autres les consomme ensuite[1].

Installation

composer require php-amqplib/php-amqplib

Une interface graphique existe pour lire et manipuler les messages manuellement. On la trouve par exemple sur Docker[2].

Connexion

Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest".

$connection = new AMQPStreamConnection($host, $port, $login, $password);

Création de queue et routage

On distingue plusieurs types d’exchange[3] :

  • fanout signifie que toutes les queues vont recevoir chaque message.
  • direct qu'une seule queue le recevra.
        $this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Test.Wikibooks.queue', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Test.Wikibooks.queue', 'Bus');

Production

        $amqpMessage = new AMQPMessage(json_encode('Hello World!'),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        $this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Test.Wikibooks.queue');

Consommation

Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().

        $this->rabbitMqConnection->getChannel()->basic_consume(
            'Test.Wikibooks.queue',
            gethostname() . '#' . rand(1, 9999),
            false,
            false,
            false,
            false,
            [$this, 'consumeCallback']
        );

        while (count($this->rabbitMqConnection->getChannel()->callbacks)) {
            $this->rabbitMqConnection->getChannel()->wait();
        }

    public function consumeCallback(?AMQPMessage $msg)
    {
        if (empty($msg)) {
            return null;
        }

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

        var_dump(json_decode($msg->getBody()));
    }

Références