diff --git a/CHANGELOG.md b/CHANGELOG.md index 1935d4a..0a3916a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## [Unreleased] +### Added +- `RouterPublisher` added for using `direct` and `topic` RabbitMQ exchanges with routing key. ## [2.1.0] - 2021-05-06 ### Added diff --git a/Command/UpdateDefinitionCommand.php b/Command/UpdateDefinitionCommand.php index 675ff08..9117293 100644 --- a/Command/UpdateDefinitionCommand.php +++ b/Command/UpdateDefinitionCommand.php @@ -12,6 +12,8 @@ use Symfony\Component\Console\Output\OutputInterface; use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface; use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum; +use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum; +use Wakeapp\Bundle\RabbitQueueBundle\Exception\RouteStructureException; class UpdateDefinitionCommand extends Command { @@ -54,10 +56,49 @@ protected function configure(): void /** * {@inheritDoc} + * + * @throws RouteStructureException */ protected function execute(InputInterface $input, OutputInterface $output): int { + $routersToInit = []; + $initializedRouters = []; + foreach ($this->definitionList as $definition) { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER) { + if (method_exists($definition, 'dependsOn') && !empty($definition->dependsOn())) { + $routersToInit[$definition::getQueueName()] = $definition; + } else { + $definition->init($this->connection); + $initializedRouters[] = $definition::getQueueName(); + } + } + } + + $successLoop = true; + while ($successLoop && !empty($routersToInit)) { + $successLoop = false; + + foreach ($routersToInit as $router) { + if (empty(array_diff($router->dependsOn(), $initializedRouters))) { + $successLoop = true; + $router->init($this->connection); + unset($routersToInit[$router::getQueueName()]); + $initializedRouters[] = $router::getQueueName(); + } + } + } + + if (!$successLoop) { + throw new RouteStructureException('Router definitions have cyclic dependencies'); + } + + + foreach ($this->definitionList as $definition) { + if ($definition->getQueueType() & QueueTypeEnum::ROUTER) { + continue; + } + $definition->init($this->connection); $this->bindRetryExchange($definition); diff --git a/Enum/QueueTypeEnum.php b/Enum/QueueTypeEnum.php index 4f19255..ee04396 100644 --- a/Enum/QueueTypeEnum.php +++ b/Enum/QueueTypeEnum.php @@ -10,4 +10,5 @@ class QueueTypeEnum public const DELAY = 2; public const REPLACE = 4; public const DEDUPLICATE = 8; + public const ROUTER = 16; } diff --git a/Exception/RouteStructureException.php b/Exception/RouteStructureException.php new file mode 100644 index 0000000..db5e0bf --- /dev/null +++ b/Exception/RouteStructureException.php @@ -0,0 +1,9 @@ +hydratorRegistry->getHydrator($this->hydratorName)->dehydrate($data); @@ -44,6 +44,6 @@ public function put(string $queueName, $data, array $options = []): void $publisher = $this->publisherRegistry->getPublisher($queueType); - $publisher->publish($definition, $dataString, $options); + $publisher->publish($definition, $dataString, $options, $routingKey); } } diff --git a/Producer/RabbitMqProducerInterface.php b/Producer/RabbitMqProducerInterface.php index 92d57b6..267dd44 100644 --- a/Producer/RabbitMqProducerInterface.php +++ b/Producer/RabbitMqProducerInterface.php @@ -6,5 +6,5 @@ interface RabbitMqProducerInterface { - public function put(string $queueName, $data, array $options = []); + public function put(string $queueName, $data, array $options = [], string $routingKey = ''); } diff --git a/Publisher/AbstractPublisher.php b/Publisher/AbstractPublisher.php index 3d2da4f..e805a57 100644 --- a/Publisher/AbstractPublisher.php +++ b/Publisher/AbstractPublisher.php @@ -29,10 +29,10 @@ public function __construct(RabbitMqClient $client, HydratorRegistry $hydratorRe abstract protected function prepareOptions(DefinitionInterface $definition, array $options): array; - public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void + public function publish(DefinitionInterface $definition, string $dataString, array $options = [], string $routingKey = ''): void { $exchangeName = $this->getDefinitionExchangeName($definition); - $queueName = $this->getDefinitionQueueName($definition); + $route = $routingKey !== '' ? $routingKey : $this->getDefinitionQueueName($definition); $message = new AMQPMessage($dataString, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, @@ -45,32 +45,24 @@ public function publish(DefinitionInterface $definition, string $dataString, arr $message->set('application_headers', new AMQPTable($amqpTableOptions)); } - $this->client->publish($message, $exchangeName, $queueName); + $this->client->publish($message, $exchangeName, $route); } abstract public static function getQueueType(): string; protected function getDefinitionExchangeName(DefinitionInterface $definition): string { - if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) { - return self::DEFAULT_NAME; - } - - return $definition->getQueueType() === QueueTypeEnum::FIFO - ? self::DEFAULT_NAME - : $definition->getEntryPointName() + return $definition->getQueueType() & (QueueTypeEnum::ROUTER | QueueTypeEnum::DELAY) + ? $definition->getEntryPointName() + : self::DEFAULT_NAME ; } protected function getDefinitionQueueName(DefinitionInterface $definition): string { - if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) { - return $definition::getQueueName(); - } - - return $definition->getQueueType() === QueueTypeEnum::FIFO - ? $definition::getQueueName() - : self::DEFAULT_NAME + return $definition->getQueueType() & QueueTypeEnum::DELAY + ? self::DEFAULT_NAME + : $definition::getQueueName() ; } } diff --git a/Publisher/PublisherInterface.php b/Publisher/PublisherInterface.php index bf6cb6a..bc46a32 100644 --- a/Publisher/PublisherInterface.php +++ b/Publisher/PublisherInterface.php @@ -8,7 +8,7 @@ interface PublisherInterface { public const TAG = 'wakeapp_rabbit_queue.publisher'; - public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void; + public function publish(DefinitionInterface $definition, string $dataString, array $options = [], string $routingKey = ''): void; public static function getQueueType(): string; } diff --git a/Publisher/RouterPublisher.php b/Publisher/RouterPublisher.php new file mode 100644 index 0000000..da5db60 --- /dev/null +++ b/Publisher/RouterPublisher.php @@ -0,0 +1,37 @@ +put('queue_name', $data, $options); Соответственно на каждый новый тип очереди требуется свой класс `Publisher` с кастомной логикой обработки/валидации и публикации сообщений в канал. -Бандл поддерживает следующие типы очередей: +Бандл поддерживает следующие типы очередей и обменников: - FIFO - Delay - Deduplicate - Deduplicate + Delay + - Router + +Router используется для создания разветвленной топологии как описано [тут](https://www.rabbitmq.com/tutorials/tutorial-four-php.html) и [тут](https://www.rabbitmq.com/tutorials/tutorial-five-php.html) При желании добавить собственный тип очереди, необходимо создать класс `Publisher` наследующий [AbstractPublisher](Publisher/AbstractPublisher.php) или реализующий [PublisherInterface](Publisher/PublisherInterface.php). @@ -495,6 +499,158 @@ php bin/console rabbit:consumer:run example Для просмотра списка всех зарегистрированных `consumer`'ов достаточно выполнить команду `rabbit:consumer:list`. +Использование `RouterPublisher` +-------- + +`RouterPublisher` следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать +сразу в некоторое их подмножество, определяемое по `routingKey` сообщения. Для таких целей нужно создать `Definition`, +в котором будет определена только `exchange` типа `direct`, `topic` или `fanout`. Эта `Definition` будет использоваться +в качестве точки входя для сообщений. После этого нужно создать по одной `Definition` на каждую очередь, и все их +биндить на первую `Definition`. Можно создать сложную маршрутизацию, если вместо очередей создавать и биндить +`Definition` типа первой. + +### Пример `Definition` с `exchange`: +```php +channel(); + + $channel->exchange_declare( + self::QUEUE_NAME, + 'topic', + false, + true, + ); + } + + /** + * {@inheritDoc} + */ + public function getEntryPointName(): string + { + return self::ENTRY_POINT; + } + + /** + * {@inheritDoc} + */ + public function getQueueType(): int + { + return QueueTypeEnum::ROUTER; + } + + /** + * {@inheritDoc} + */ + public static function getQueueName(): string + { + return self::QUEUE_NAME; + } +} +``` + +### Пример `Definition` для очереди +```php +channel(); + + $channel->queue_declare( + self::QUEUE_NAME, + false, + true, + false, + false + ); + + foreach (self::ROUTING as $route) { + $channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition + } + } + + /** + * {@inheritDoc} + */ + public function getEntryPointName(): string + { + return self::ENTRY_POINT; + } + + /** + * {@inheritDoc} + */ + public function getQueueType(): int + { + return QueueTypeEnum::FIFO; + } + + /** + * {@inheritDoc} + */ + public static function getQueueName(): string + { + return self::QUEUE_NAME; + } +} +``` + +После определения биржи и очередей отправка сообщений будет выглядеть как и раньше, но сообщения будут попадать в +очереди только при подходящем routingKey (четвертый параметр в методе put()). + +```php + 'example']; # Сообщение +$options = []; + +/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */ +$producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*' +$producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#' +$producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера +``` + +**Важно!!! Длина routeKey не должна превышать 255 символов** + Лицензия -------- diff --git a/Registry/PublisherRegistry.php b/Registry/PublisherRegistry.php index 52bce17..68c884a 100644 --- a/Registry/PublisherRegistry.php +++ b/Registry/PublisherRegistry.php @@ -24,8 +24,8 @@ public function __construct(ServiceProviderInterface $publisherRegistry) */ public function getPublisher(int $queueType): AbstractPublisher { - if ($this->publisherRegistry->has($queueType)) { - return $this->publisherRegistry->get($queueType); + if ($this->publisherRegistry->has((string) $queueType)) { + return $this->publisherRegistry->get((string) $queueType); } throw new PublisherNotFoundException(sprintf('Publisher for queue type "%s" not found', $queueType)); diff --git a/Resources/config/services.yaml b/Resources/config/services.yaml index 79e80cb..c52972d 100644 --- a/Resources/config/services.yaml +++ b/Resources/config/services.yaml @@ -66,6 +66,11 @@ services: tags: - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + Wakeapp\Bundle\RabbitQueueBundle\Publisher\RouterPublisher: + parent: 'Wakeapp\Bundle\RabbitQueueBundle\Publisher\AbstractPublisher' + tags: + - { name: !php/const Wakeapp\Bundle\RabbitQueueBundle\Publisher\PublisherInterface::TAG, default_index_method: 'getQueueType' } + Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer: arguments: - '@Wakeapp\Bundle\RabbitQueueBundle\Registry\DefinitionRegistry' diff --git a/Tests/Publisher/DeduplicateDelayPublisherTest.php b/Tests/Publisher/DeduplicateDelayPublisherTest.php index 8c6acb3..8eb170a 100644 --- a/Tests/Publisher/DeduplicateDelayPublisherTest.php +++ b/Tests/Publisher/DeduplicateDelayPublisherTest.php @@ -35,6 +35,24 @@ public function testPublish(): void self::assertTrue(true); } + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, '') + ; + + $publisher = new DeduplicateDelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS); + + self::assertTrue(true); + } + /** * @dataProvider invalidOptionsProvider */ diff --git a/Tests/Publisher/DeduplicatePublisherTest.php b/Tests/Publisher/DeduplicatePublisherTest.php index 301571b..3b6498c 100644 --- a/Tests/Publisher/DeduplicatePublisherTest.php +++ b/Tests/Publisher/DeduplicatePublisherTest.php @@ -35,27 +35,21 @@ public function testPublish(): void self::assertTrue(true); } - /** - * @dataProvider invalidOptionsProvider - */ - public function testPublishInvalidOptions(array $options): void + public function testPublishWithRouting(): void { - $this->expectException(RabbitQueueException::class); - - $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_QUEUE_NAME, self::QUEUE_TYPE); + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); $hydratorRegistry = $this->createHydratorRegistryMock(); + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), '', self::TEST_ROUTING) + ; $publisher = new DeduplicatePublisher($client, $hydratorRegistry, JsonHydrator::KEY); - $publisher->publish($definition, self::TEST_MESSAGE, $options); - } + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS, self::TEST_ROUTING); - public function invalidOptionsProvider(): array - { - return [ - 'empty options' => [[]], - 'invalid key option' => [['key' => 1]], - ]; + self::assertTrue(true); } } diff --git a/Tests/Publisher/DelayPublisherTest.php b/Tests/Publisher/DelayPublisherTest.php index a3439f0..972f985 100644 --- a/Tests/Publisher/DelayPublisherTest.php +++ b/Tests/Publisher/DelayPublisherTest.php @@ -35,6 +35,24 @@ public function testPublish(): void self::assertTrue(true); } + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, self::TEST_ROUTING) + ; + + $publisher = new DelayPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, self::TEST_OPTIONS, self::TEST_ROUTING); + + self::assertTrue(true); + } + /** * @dataProvider invalidOptionsProvider */ diff --git a/Tests/Publisher/FifoPublisherTest.php b/Tests/Publisher/FifoPublisherTest.php index acf0a96..a852a13 100644 --- a/Tests/Publisher/FifoPublisherTest.php +++ b/Tests/Publisher/FifoPublisherTest.php @@ -32,4 +32,22 @@ public function testPublish(): void self::assertTrue(true); } + + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), '', self::TEST_ROUTING) + ; + + $publisher = new FifoPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, [], self::TEST_ROUTING); + + self::assertTrue(true); + } } diff --git a/Tests/Publisher/RouterPublisherTest.php b/Tests/Publisher/RouterPublisherTest.php new file mode 100644 index 0000000..23510b8 --- /dev/null +++ b/Tests/Publisher/RouterPublisherTest.php @@ -0,0 +1,54 @@ +createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, self::TEST_QUEUE_NAME) + ; + + $publisher = new RouterPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE); + + self::assertTrue(true); + } + + public function testPublishWithRouting(): void + { + $definition = $this->createDefinitionMock(self::TEST_QUEUE_NAME, self::TEST_EXCHANGE, self::QUEUE_TYPE); + $hydratorRegistry = $this->createHydratorRegistryMock(); + + $client = $this->createMock(RabbitMqClient::class); + $client->expects(self::once()) + ->method('publish') + ->with(self::isInstanceOf(AMQPMessage::class), self::TEST_EXCHANGE, self::TEST_ROUTING) + ; + + $publisher = new RouterPublisher($client, $hydratorRegistry, JsonHydrator::KEY); + + $publisher->publish($definition, self::TEST_MESSAGE, [], self::TEST_ROUTING); + + self::assertTrue(true); + } +} diff --git a/Tests/TestCase/AbstractTestCase.php b/Tests/TestCase/AbstractTestCase.php index 1779384..b04325d 100644 --- a/Tests/TestCase/AbstractTestCase.php +++ b/Tests/TestCase/AbstractTestCase.php @@ -15,6 +15,7 @@ class AbstractTestCase extends TestCase protected const TEST_MESSAGE = '{"test": "test"}'; protected const TEST_EXCHANGE = 'test_exchange'; protected const TEST_QUEUE_NAME = 'test_queue'; + protected const TEST_ROUTING = 'test.routing'; public function createDefinitionMock(string $queueName, string $entryPointName, int $queueType): DefinitionInterface {