-
Notifications
You must be signed in to change notification settings - Fork 15
/
AmqpConnectionFactory.php
116 lines (96 loc) · 3.69 KB
/
AmqpConnectionFactory.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
<?php
namespace Enqueue\AmqpExt;
use Enqueue\AmqpTools\ConnectionConfig;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
use Interop\Queue\Context;
class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
{
use DelayStrategyAwareTrait;
/**
* @var ConnectionConfig
*/
private $config;
/**
* @var \AMQPConnection
*/
private $connection;
/**
* @see \Enqueue\AmqpTools\ConnectionConfig for possible config formats and values
*
* @param array|string|null $config
*/
public function __construct($config = 'amqp:')
{
$this->config = (new ConnectionConfig($config))
->addSupportedScheme('amqp+ext')
->addSupportedScheme('amqps+ext')
->parse()
;
if (in_array('rabbitmq', $this->config->getSchemeExtensions(), true)) {
$this->setDelayStrategy(new RabbitMqDlxDelayStrategy());
}
}
/**
* @return AmqpContext
*/
public function createContext(): Context
{
if ($this->config->isLazy()) {
$context = new AmqpContext(function () {
$extContext = $this->createExtContext($this->establishConnection());
$extContext->qos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount());
return $extContext;
});
$context->setDelayStrategy($this->delayStrategy);
return $context;
}
$context = new AmqpContext($this->createExtContext($this->establishConnection()));
$context->setDelayStrategy($this->delayStrategy);
$context->setQos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());
return $context;
}
public function getConfig(): ConnectionConfig
{
return $this->config;
}
private function createExtContext(\AMQPConnection $extConnection): \AMQPChannel
{
return new \AMQPChannel($extConnection);
}
private function establishConnection(): \AMQPConnection
{
if (false == $this->connection) {
$extConfig = [];
$extConfig['host'] = $this->config->getHost();
$extConfig['port'] = $this->config->getPort();
$extConfig['vhost'] = $this->config->getVHost();
$extConfig['login'] = $this->config->getUser();
$extConfig['password'] = $this->config->getPass();
$extConfig['read_timeout'] = $this->config->getReadTimeout();
$extConfig['write_timeout'] = $this->config->getWriteTimeout();
$extConfig['connect_timeout'] = $this->config->getConnectionTimeout();
$extConfig['heartbeat'] = $this->config->getHeartbeat();
if ($this->config->isSslOn()) {
$extConfig['verify'] = $this->config->isSslVerify();
$extConfig['cacert'] = $this->config->getSslCaCert();
$extConfig['cert'] = $this->config->getSslCert();
$extConfig['key'] = $this->config->getSslKey();
}
$this->connection = new \AMQPConnection($extConfig);
$this->config->isPersisted() ?
$this->connection->pconnect() :
$this->connection->connect()
;
}
if (false == $this->connection->isConnected()) {
$this->config->isPersisted() ?
$this->connection->preconnect() :
$this->connection->reconnect()
;
}
return $this->connection;
}
}