« Programmation PHP/RabbitMQ » : différence entre les versions
Aucun résumé des modifications |
|||
(39 versions intermédiaires par 3 utilisateurs non affichées) | |||
Ligne 4 : | Ligne 4 : | ||
== Installation == |
== Installation == |
||
=== Client PHP === |
|||
< |
<syntaxhighlight lang=bash> |
||
composer require php-amqplib/php-amqplib |
composer require php-amqplib/php-amqplib |
||
</syntaxhighlight> |
|||
</source> |
|||
=== Serveur === |
|||
⚫ | |||
L'installation du serveur est multi-plateforme. Sous Linux<ref>https://www.rabbitmq.com/install-debian.html</ref> : |
|||
<syntaxhighlight lang=bash> |
|||
apt-get install rabbitmq-server |
|||
</syntaxhighlight> |
|||
Test de fonctionnement : |
|||
<syntaxhighlight lang=bash> |
|||
telnet localhost 5672 |
|||
</syntaxhighlight> |
|||
==== Site de gestion ==== |
|||
⚫ | |||
<syntaxhighlight lang=bash> |
|||
/usr/sbin/rabbitmq-plugins enable rabbitmq_management |
|||
</syntaxhighlight> |
|||
Test de fonctionnement depuis le serveur : |
|||
<syntaxhighlight lang=bash> |
|||
curl localhost:15672 |
|||
</syntaxhighlight> |
|||
Depuis le client : http://mon_serveur:15672 |
|||
On la trouve aussi sur Docker<ref>https://hub.docker.com/_/rabbitmq</ref>. |
|||
Pour trouver le fichier de configuration : |
|||
cat /usr/sbin/rabbitmq-server | grep RABBITMQ_ENV |
|||
== Connexion == |
== Connexion == |
||
Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". |
Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". Pour tester : |
||
< |
<syntaxhighlight lang=bash> |
||
curl -i -u guest:guest http://localhost:15672/api/whoami |
|||
</syntaxhighlight> |
|||
Si cela ne fonctionne pas, configurer le serveur avec <code>rabbitmqctl</code>. Exemple sous Linux : |
|||
<syntaxhighlight lang=bash> |
|||
/usr/sbin/rabbitmqctl add_user userDev mon_mot_de_passe |
|||
/usr/sbin/rabbitmqctl set_permissions -p / userDev '.*' '.*' '.*' |
|||
/usr/sbin/rabbitmqctl set_user_tags userDev management |
|||
/usr/sbin/rabbitmqctl list_users |
|||
</syntaxhighlight> |
|||
=== Connexion PHP === |
|||
⚫ | |||
$connection = new AMQPStreamConnection($host, $port, $login, $password); |
$connection = new AMQPStreamConnection($host, $port, $login, $password); |
||
... |
|||
</source> |
|||
$connection->close(); |
|||
</syntaxhighlight> |
|||
{{remarque|Sur le framework Symfony, on peut utiliser le composant Messenger à la place.}} |
|||
== Création de queue et routage == |
== Création de queue et routage == |
||
Pour créer une queue simple prête à recevoir des messages : |
|||
On distingue plusieurs types d’''exchange''<ref>https://i2.wp.com/blog.knoldus.com/wp-content/uploads/2018/12/exchanges-topic-fanout-direct.png?resize=1024%2C625&ssl=1</ref> : |
|||
<syntaxhighlight lang=php> |
|||
* ''fanout'' signifie que toutes les queues vont recevoir chaque message. |
|||
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false); |
|||
* ''direct'' qu'une seule queue le recevra. |
|||
</syntaxhighlight> |
|||
=== ''Exchange'' === |
|||
⚫ | |||
{{Média externe |
|||
| image1 = [//i2.wp.com/blog.knoldus.com/wp-content/uploads/2018/12/exchanges-topic-fanout-direct.png?resize=1024%2C625&ssl=1 Schéma des différents types de routage RabbitMQ] sur le site : {{lien web|lang=en|url=https://dzone.com/articles/getting-started-with-rabbitmq-python-1|titre=Getting Started With RabbitMQ: Python|auteur=Jyoti Sachdeva|date=20/12/2018}} |
|||
}} |
|||
Une autre manière de poster des messages est en passant par un ''exchange''. On en distingue plusieurs types<ref>https://www.rabbitmq.com/tutorials/tutorial-three-php.html</ref> : |
|||
* ''direct'' : une seule queue recevra le message (patron de conception producteur/consommateur). |
|||
* ''fanout'' : toutes les queues liée à l’''exchange'' recevront le message (patron de conception producteur/abonné). |
|||
* ''topic'' : les queues de l’''exchange'' inscrites aux sujets concernés recevront le message (selon un motif dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)<ref>https://www.rabbitmq.com/tutorials/tutorial-five-php.html</ref>. |
|||
* ''headers'' : routage par en-tête de message plutôt que par "routing key". |
|||
Dans cet exemple, on rattache la queue à un ''exchange'' "Bus" : |
|||
<syntaxhighlight lang=php> |
|||
$this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false); |
$this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false); |
||
$this->rabbitMqConnection->getChannel()->queue_declare(' |
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false); |
||
$this->rabbitMqConnection->getChannel()->queue_bind(' |
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus'); |
||
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false); |
|||
</source> |
|||
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus'); |
|||
</syntaxhighlight> |
|||
Exemple de ''topic'' : on ne publie pas dans la queue mais dans l’''exchange'' qui leur routera ensuite le message. |
|||
<syntaxhighlight lang=php> |
|||
$this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false); |
|||
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false); |
|||
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus'); |
|||
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false); |
|||
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus'); |
|||
</syntaxhighlight> |
|||
=== QoS === |
|||
Pour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent : |
|||
<syntaxhighlight lang=php> |
|||
$this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null); |
|||
</syntaxhighlight> |
|||
=== DLX === |
|||
Le mode DLX (''{{lang|en|Dead Letter Exchanges}}'') permet de transférer un message d'une queue dans un autre après un certain temps<ref>https://www.rabbitmq.com/dlx.html</ref>. |
|||
== Production == |
== Production == |
||
< |
<syntaxhighlight lang=php> |
||
$amqpMessage = new AMQPMessage(json_encode('Hello World!'), |
$amqpMessage = new AMQPMessage(json_encode('Hello World!'), |
||
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] |
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] |
||
); |
); |
||
$this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', ' |
$this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1'); |
||
</syntaxhighlight> |
|||
</source> |
|||
== Consommation == |
== Consommation == |
||
Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume(). |
Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume(). |
||
< |
<syntaxhighlight lang=php> |
||
$this->rabbitMqConnection->getChannel()->basic_consume( |
$this->rabbitMqConnection->getChannel()->basic_consume( |
||
' |
'Wikibooks.Queue1', |
||
gethostname() . '#' . rand(1, 9999), |
gethostname() . '#' . rand(1, 9999), |
||
false, |
false, |
||
Ligne 62 : | Ligne 138 : | ||
var_dump(json_decode($msg->getBody())); |
var_dump(json_decode($msg->getBody())); |
||
} |
} |
||
</syntaxhighlight> |
|||
</source> |
|||
En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues. |
|||
== Références == |
== Références == |
Version actuelle du 18 janvier 2023 à 12:51
RabbitMQ est un logiciel de messages en protocole AMQP. Il permet donc à des processus de produire des messages JSON dans des files d'attente pour que d'autres les consomme ensuite[1].
Installation
[modifier | modifier le wikicode]Client PHP
[modifier | modifier le wikicode]composer require php-amqplib/php-amqplib
Serveur
[modifier | modifier le wikicode]L'installation du serveur est multi-plateforme. Sous Linux[2] :
apt-get install rabbitmq-server
Test de fonctionnement :
telnet localhost 5672
Site de gestion
[modifier | modifier le wikicode]Une interface graphique existe pour lire et manipuler les messages manuellement, c'est le management plugin[3]. Pour l'activer :
/usr/sbin/rabbitmq-plugins enable rabbitmq_management
Test de fonctionnement depuis le serveur :
curl localhost:15672
Depuis le client : http://mon_serveur:15672
On la trouve aussi sur Docker[4].
Pour trouver le fichier de configuration :
cat /usr/sbin/rabbitmq-server | grep RABBITMQ_ENV
Connexion
[modifier | modifier le wikicode]Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". Pour tester :
curl -i -u guest:guest http://localhost:15672/api/whoami
Si cela ne fonctionne pas, configurer le serveur avec rabbitmqctl
. Exemple sous Linux :
/usr/sbin/rabbitmqctl add_user userDev mon_mot_de_passe
/usr/sbin/rabbitmqctl set_permissions -p / userDev '.*' '.*' '.*'
/usr/sbin/rabbitmqctl set_user_tags userDev management
/usr/sbin/rabbitmqctl list_users
Connexion PHP
[modifier | modifier le wikicode]$connection = new AMQPStreamConnection($host, $port, $login, $password);
...
$connection->close();
Création de queue et routage
[modifier | modifier le wikicode]Pour créer une queue simple prête à recevoir des messages :
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false);
Exchange
[modifier | modifier le wikicode]Image externe | |
---|---|
Schéma des différents types de routage RabbitMQ sur le site : (en) Jyoti Sachdeva, « Getting Started With RabbitMQ: Python », |
Une autre manière de poster des messages est en passant par un exchange. On en distingue plusieurs types[5] :
- direct : une seule queue recevra le message (patron de conception producteur/consommateur).
- fanout : toutes les queues liée à l’exchange recevront le message (patron de conception producteur/abonné).
- topic : les queues de l’exchange inscrites aux sujets concernés recevront le message (selon un motif dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)[6].
- headers : routage par en-tête de message plutôt que par "routing key".
Dans cet exemple, on rattache la queue à un exchange "Bus" :
$this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus');
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus');
Exemple de topic : on ne publie pas dans la queue mais dans l’exchange qui leur routera ensuite le message.
$this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus');
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus');
QoS
[modifier | modifier le wikicode]Pour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent :
$this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null);
DLX
[modifier | modifier le wikicode]Le mode DLX (Dead Letter Exchanges) permet de transférer un message d'une queue dans un autre après un certain temps[7].
Production
[modifier | modifier le wikicode] $amqpMessage = new AMQPMessage(json_encode('Hello World!'),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1');
Consommation
[modifier | modifier le wikicode]Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().
$this->rabbitMqConnection->getChannel()->basic_consume(
'Wikibooks.Queue1',
gethostname() . '#' . rand(1, 9999),
false,
false,
false,
false,
[$this, 'consumeCallback']
);
while (count($this->rabbitMqConnection->getChannel()->callbacks)) {
$this->rabbitMqConnection->getChannel()->wait();
}
public function consumeCallback(?AMQPMessage $msg)
{
if (empty($msg)) {
return null;
}
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
var_dump(json_decode($msg->getBody()));
}
En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues.
Références
[modifier | modifier le wikicode]- ↑ https://www.rabbitmq.com/tutorials/tutorial-one-php.html
- ↑ https://www.rabbitmq.com/install-debian.html
- ↑ https://www.rabbitmq.com/management.html
- ↑ https://hub.docker.com/_/rabbitmq
- ↑ https://www.rabbitmq.com/tutorials/tutorial-three-php.html
- ↑ https://www.rabbitmq.com/tutorials/tutorial-five-php.html
- ↑ https://www.rabbitmq.com/dlx.html