[go: up one dir, main page]

Aller au contenu

« Programmation PHP/RabbitMQ » : différence entre les versions

Un livre de Wikilivres.
Contenu supprimé Contenu ajouté
Aucun résumé des modifications
 
(39 versions intermédiaires par 3 utilisateurs non affichées)
Ligne 4 : Ligne 4 :


== Installation ==
== Installation ==
=== Client PHP ===
<source lang=bash>
<syntaxhighlight lang=bash>
composer require php-amqplib/php-amqplib
composer require php-amqplib/php-amqplib
</syntaxhighlight>
</source>


=== Serveur ===
Une interface graphique existe pour lire et manipuler les messages manuellement. On la trouve par exemple sur Docker<ref>https://hub.docker.com/_/rabbitmq</ref>.
L'installation du serveur est multi-plateforme. Sous Linux<ref>https://www.rabbitmq.com/install-debian.html</ref> :
<syntaxhighlight lang=bash>
apt-get install rabbitmq-server
</syntaxhighlight>

Test de fonctionnement :
<syntaxhighlight lang=bash>
telnet localhost 5672
</syntaxhighlight>

==== Site de gestion ====
Une interface graphique existe pour lire et manipuler les messages manuellement, c'est le ''management plugin''<ref>https://www.rabbitmq.com/management.html</ref>. Pour l'activer :
<syntaxhighlight lang=bash>
/usr/sbin/rabbitmq-plugins enable rabbitmq_management
</syntaxhighlight>

Test de fonctionnement depuis le serveur :
<syntaxhighlight lang=bash>
curl localhost:15672
</syntaxhighlight>
Depuis le client : http://mon_serveur:15672

On la trouve aussi sur Docker<ref>https://hub.docker.com/_/rabbitmq</ref>.

Pour trouver le fichier de configuration :
cat /usr/sbin/rabbitmq-server | grep RABBITMQ_ENV


== Connexion ==
== Connexion ==
Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest".
Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". Pour tester :
<source lang=php>
<syntaxhighlight lang=bash>
curl -i -u guest:guest http://localhost:15672/api/whoami
</syntaxhighlight>

Si cela ne fonctionne pas, configurer le serveur avec <code>rabbitmqctl</code>. Exemple sous Linux :
<syntaxhighlight lang=bash>
/usr/sbin/rabbitmqctl add_user userDev mon_mot_de_passe
/usr/sbin/rabbitmqctl set_permissions -p / userDev '.*' '.*' '.*'
/usr/sbin/rabbitmqctl set_user_tags userDev management
/usr/sbin/rabbitmqctl list_users
</syntaxhighlight>

=== Connexion PHP ===
<syntaxhighlight lang=php>
$connection = new AMQPStreamConnection($host, $port, $login, $password);
$connection = new AMQPStreamConnection($host, $port, $login, $password);
...
</source>
$connection->close();
</syntaxhighlight>

{{remarque|Sur le framework Symfony, on peut utiliser le composant Messenger à la place.}}


== Création de queue et routage ==
== Création de queue et routage ==
Pour créer une queue simple prête à recevoir des messages :
On distingue plusieurs types d’''exchange''<ref>https://i2.wp.com/blog.knoldus.com/wp-content/uploads/2018/12/exchanges-topic-fanout-direct.png?resize=1024%2C625&ssl=1</ref> :
<syntaxhighlight lang=php>
* ''fanout'' signifie que toutes les queues vont recevoir chaque message.
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false);
* ''direct'' qu'une seule queue le recevra.
</syntaxhighlight>


=== ''Exchange'' ===
<source lang=php>
{{Média externe
| image1 = [//i2.wp.com/blog.knoldus.com/wp-content/uploads/2018/12/exchanges-topic-fanout-direct.png?resize=1024%2C625&ssl=1 Schéma des différents types de routage RabbitMQ] sur le site : {{lien web|lang=en|url=https://dzone.com/articles/getting-started-with-rabbitmq-python-1|titre=Getting Started With RabbitMQ: Python|auteur=Jyoti Sachdeva|date=20/12/2018}}
}}
Une autre manière de poster des messages est en passant par un ''exchange''. On en distingue plusieurs types<ref>https://www.rabbitmq.com/tutorials/tutorial-three-php.html</ref> :
* ''direct'' : une seule queue recevra le message (patron de conception producteur/consommateur).
* ''fanout'' : toutes les queues liée à l’''exchange'' recevront le message (patron de conception producteur/abonné).
* ''topic'' : les queues de l’''exchange'' inscrites aux sujets concernés recevront le message (selon un motif dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)<ref>https://www.rabbitmq.com/tutorials/tutorial-five-php.html</ref>.
* ''headers'' : routage par en-tête de message plutôt que par "routing key".

Dans cet exemple, on rattache la queue à un ''exchange'' "Bus" :
<syntaxhighlight lang=php>
$this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
$this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Test.Wikibooks.queue', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Test.Wikibooks.queue', 'Bus');
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus');
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false);
</source>
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus');
</syntaxhighlight>

Exemple de ''topic'' : on ne publie pas dans la queue mais dans l’''exchange'' qui leur routera ensuite le message.
<syntaxhighlight lang=php>
$this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus');
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus');
</syntaxhighlight>

=== QoS ===
Pour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent :
<syntaxhighlight lang=php>
$this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null);
</syntaxhighlight>

=== DLX ===
Le mode DLX (''{{lang|en|Dead Letter Exchanges}}'') permet de transférer un message d'une queue dans un autre après un certain temps<ref>https://www.rabbitmq.com/dlx.html</ref>.


== Production ==
== Production ==
<source lang=php>
<syntaxhighlight lang=php>
$amqpMessage = new AMQPMessage(json_encode('Hello World!'),
$amqpMessage = new AMQPMessage(json_encode('Hello World!'),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
);
$this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Test.Wikibooks.queue');
$this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1');
</syntaxhighlight>
</source>


== Consommation ==
== Consommation ==
Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().
Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().
<source lang=php>
<syntaxhighlight lang=php>
$this->rabbitMqConnection->getChannel()->basic_consume(
$this->rabbitMqConnection->getChannel()->basic_consume(
'Test.Wikibooks.queue',
'Wikibooks.Queue1',
gethostname() . '#' . rand(1, 9999),
gethostname() . '#' . rand(1, 9999),
false,
false,
Ligne 62 : Ligne 138 :
var_dump(json_decode($msg->getBody()));
var_dump(json_decode($msg->getBody()));
}
}
</syntaxhighlight>
</source>

En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues.


== Références ==
== Références ==

Version actuelle du 18 janvier 2023 à 12:51

RabbitMQ est un logiciel de messages en protocole AMQP. Il permet donc à des processus de produire des messages JSON dans des files d'attente pour que d'autres les consomme ensuite[1].

composer require php-amqplib/php-amqplib

L'installation du serveur est multi-plateforme. Sous Linux[2] :

apt-get install rabbitmq-server

Test de fonctionnement :

telnet localhost 5672

Site de gestion

[modifier | modifier le wikicode]

Une interface graphique existe pour lire et manipuler les messages manuellement, c'est le management plugin[3]. Pour l'activer :

/usr/sbin/rabbitmq-plugins enable rabbitmq_management

Test de fonctionnement depuis le serveur :

curl localhost:15672

Depuis le client : http://mon_serveur:15672

On la trouve aussi sur Docker[4].

Pour trouver le fichier de configuration :

cat  /usr/sbin/rabbitmq-server | grep RABBITMQ_ENV

Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". Pour tester :

curl -i -u guest:guest http://localhost:15672/api/whoami

Si cela ne fonctionne pas, configurer le serveur avec rabbitmqctl. Exemple sous Linux :

/usr/sbin/rabbitmqctl add_user userDev mon_mot_de_passe
/usr/sbin/rabbitmqctl set_permissions -p / userDev '.*' '.*' '.*'
/usr/sbin/rabbitmqctl set_user_tags userDev management
/usr/sbin/rabbitmqctl list_users
$connection = new AMQPStreamConnection($host, $port, $login, $password);
...
$connection->close();
 Sur le framework Symfony, on peut utiliser le composant Messenger à la place.

Création de queue et routage

[modifier | modifier le wikicode]

Pour créer une queue simple prête à recevoir des messages :

        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false);
icône image Image externe
Schéma des différents types de routage RabbitMQ sur le site : (en) Jyoti Sachdeva, « Getting Started With RabbitMQ: Python »,

Une autre manière de poster des messages est en passant par un exchange. On en distingue plusieurs types[5] :

  • direct : une seule queue recevra le message (patron de conception producteur/consommateur).
  • fanout : toutes les queues liée à l’exchange recevront le message (patron de conception producteur/abonné).
  • topic : les queues de l’exchange inscrites aux sujets concernés recevront le message (selon un motif dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)[6].
  • headers : routage par en-tête de message plutôt que par "routing key".

Dans cet exemple, on rattache la queue à un exchange "Bus" :

        $this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus');
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus');

Exemple de topic : on ne publie pas dans la queue mais dans l’exchange qui leur routera ensuite le message.

        $this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus');
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus');

Pour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent :

        $this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null);

Le mode DLX (Dead Letter Exchanges) permet de transférer un message d'une queue dans un autre après un certain temps[7].

        $amqpMessage = new AMQPMessage(json_encode('Hello World!'),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        $this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1');

Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().

        $this->rabbitMqConnection->getChannel()->basic_consume(
            'Wikibooks.Queue1',
            gethostname() . '#' . rand(1, 9999),
            false,
            false,
            false,
            false,
            [$this, 'consumeCallback']
        );

        while (count($this->rabbitMqConnection->getChannel()->callbacks)) {
            $this->rabbitMqConnection->getChannel()->wait();
        }

    public function consumeCallback(?AMQPMessage $msg)
    {
        if (empty($msg)) {
            return null;
        }

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

        var_dump(json_decode($msg->getBody()));
    }

En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues.