diff --git a/.docs/README.md b/.docs/README.md index 5d78ae8..47fe411 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -1,36 +1,75 @@ # Contributte Messenger +Integration of [Symfony Messenger](https://symfony.com/doc/current/messenger.html) into [Nette Framework](https://nette.org). + ## Content -- [Setup](#usage) -- [Relying](#relying) -- [Configuration](#configuration) -- [Integrations](#integrations) -- [Limitations](#limitations) +- [Setup](#setup) +- [Minimal configuration](#minimal-configuration) +- [Full configuration](#full-configuration) +- [Messages](#messages) +- [Handlers](#handlers) + - [Using attributes](#using-attributes) + - [Using Neon tags](#using-neon-tags) + - [Multiple handlers in one class](#multiple-handlers-in-one-class) + - [Handler options](#handler-options) +- [Buses](#buses) + - [Default buses](#default-buses) + - [MessageBus](#messagebus) + - [CommandBus](#commandbus) + - [QueryBus](#querybus) + - [Custom bus](#custom-bus) + - [Bus registry](#bus-registry) +- [Transports](#transports) + - [Sync transport](#sync-transport) + - [In-memory transport](#in-memory-transport) + - [Redis transport](#redis-transport) + - [Doctrine transport](#doctrine-transport) + - [AMQP transport](#amqp-transport) + - [Custom transport factory](#custom-transport-factory) +- [Routing](#routing) + - [Basic routing](#basic-routing) + - [Interface routing](#interface-routing) + - [Wildcard routing](#wildcard-routing) +- [Console commands](#console-commands) + - [messenger:consume](#messengerconsume) + - [messenger:debug](#messengerdebug) + - [messenger:setup-transports](#messengersetup-transports) + - [messenger:stats](#messengerstats) + - [messenger:failed:show](#messengerfailedshow) + - [messenger:failed:retry](#messengerfailedretry) + - [messenger:failed:remove](#messengerfailedremove) +- [Advanced](#advanced) + - [Retry strategy](#retry-strategy) + - [Failure transport](#failure-transport) + - [Middleware](#middleware) + - [Batch handlers](#batch-handlers) + - [Event listeners](#event-listeners) + - [Serializers](#serializers) + - [Logging](#logging) +- [Testing](#testing) - [Examples](#examples) +--- + ## Setup +Install the package using [Composer](https://getcomposer.org): + ```bash composer require contributte/messenger ``` +Register the extension in your Neon configuration: + ```neon extensions: - messenger: Contributte\Messenger\DI\MessengerExtension + messenger: Contributte\Messenger\DI\MessengerExtension ``` -## Relying - -Take advantage of enpowering this package with 4 extra packages: - -- `symfony/console` via [contributte/console](https://github.com/contributte/console) -- `symfony/event-dispatcher` via [contributte/event-dispatcher](https://github.com/contributte/event-dispatcher) +This package works best with these additional Contributte packages: -### `symfony/console` - -This package relies on `symfony/console`, use prepared [contributte/console](https://github.com/contributte/console) -integration. +**Symfony Console** - provides console commands for consuming messages and managing transports: ```bash composer require contributte/console @@ -38,17 +77,10 @@ composer require contributte/console ```neon extensions: - console: Contributte\Console\DI\ConsoleExtension(%consoleMode%) + console: Contributte\Console\DI\ConsoleExtension(%consoleMode%) ``` -Since this moment when you type `bin/console`, there'll be registered commands from Doctrine DBAL. - -![Console Commands](https://raw.githubusercontent.com/contributte/messenger/master/.docs/assets/console.png) - -### `symfony/event-dispatcher` - -This package relies on `symfony/event-dispatcher`, use -prepared [contributte/event-dispatcher](https://github.com/contributte/event-dispatcher) integration. +**Symfony EventDispatcher** - provides lifecycle events: ```bash composer require contributte/event-dispatcher @@ -56,336 +88,1075 @@ composer require contributte/event-dispatcher ```neon extensions: - events: Contributte\EventDispatcher\DI\EventDispatcherExtension + events: Contributte\EventDispatcher\DI\EventDispatcherExtension ``` -## Configuration +--- -> At first please take a look at official documentation. -> https://symfony.com/doc/current/components/messenger.html -> https://symfony.com/doc/current/messenger.html - -Minimal configuration example: +## Minimal configuration ```neon -# Just register the handler as a service to DIC +extensions: + messenger: Contributte\Messenger\DI\MessengerExtension + +messenger: + transport: + sync: + dsn: sync:// + + routing: + App\Message\SendEmail: [sync] + services: - - App\Domain\SimpleMessageHandler + - App\Handler\SendEmailHandler ``` -Full configuration example: +--- + +## Full configuration ```neon +extensions: + messenger: Contributte\Messenger\DI\MessengerExtension + console: Contributte\Console\DI\ConsoleExtension(%consoleMode%) + events: Contributte\EventDispatcher\DI\EventDispatcherExtension + messenger: - # Enable or disable Tracy debug panel + # Debug panel (requires Tracy) debug: panel: %debugMode% - # Defines buses, default one are messageBus, queryBus and commandBus. + # Message buses bus: messageBus: - # To disable default middlewares stack (see https://symfony.com/doc/current/messenger.html#middleware) - defaultMiddlewares: false - - # Define middlewares just for this bus. - middlewares: - #- LoggerMiddleware() - #- @loggerMiddleware autowired: true + defaultMiddlewares: true allowNoHandlers: false allowNoSenders: true + middlewares: [] - # Defined class must implement MessageBusInterface - class: App\Model\Bus\MyMessageBus - - # Define wrapper class for easy autowiring between multiple buses (eventBus, messageBus, commandBus, ...) - wrapper: App\Model\Bus\CommandBus + commandBus: + wrapper: Contributte\Messenger\Bus\CommandBus queryBus: - autowired: false + wrapper: Contributte\Messenger\Bus\QueryBus - # Defines serializers. + # Serializers serializer: default: Symfony\Component\Messenger\Transport\Serialization\PhpSerializer - # custom: @customSerializer - # Defines loggers. - # - httpLogger is used in your presenters/controllers - # - consoleLogger is used in symfony/console commands + # Transport factories + transportFactory: + sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory + inMemory: Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory + redis: Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory + + # Loggers logger: httpLogger: Psr\Log\NullLogger - # httpLogger: @specialLogger consoleLogger: Symfony\Component\Console\Logger\ConsoleLogger - # consoleLogger: @specialLogger - # Defines transport factories. - transportFactory: - # redis: Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory - # sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactorya - # amqp: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory - # doctrine: Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory - # inMemory: Symfony\Component\Messenger\Transport\InMemoryTransportFactory - # inMemory: @customMemoryTransportFactory - - # Defines global failure transport. Default is none. - # - After retrying, messages will be sent to the "failed" transport. - # - By default if no "failureTransport" is configured inside a transport global will be used. + # Global failure transport failureTransport: failed - # Define transports (async or sync) + # Transports transport: - - # Redis (async) transport - redis: - dsn: "redis://localhost?dbIndex=1" - options: [] - serializer: default - failureTransport: db - - # Doctrine (async) transport - db: - dsn: doctrine://postgres:password@localhost:5432 - # to disable retry - retryStrategy: null - - # Sync transport sync: dsn: sync:// - # In memory (sync) transport - memory: - dsn: in-memory:// - serializer: @customSerializer - - # Since no failed transport is configured, the one used will be the global "failureTransport" set - # failureTransport: db - - # Retry configuration. + async: + dsn: redis://localhost:6379/messages retryStrategy: maxRetries: 3 - # milliseconds delay delay: 1000 - # causes the delay to be higher before each retry - # e.g. 1 second delay, 2 seconds, 4 seconds multiplier: 2 - maxDelay: 0 - # override all of this with a service that - # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface - # service: @App\RetryStrategy\CustomRetryStrategy + maxDelay: 60000 + failureTransport: failed - - # Doctrine (async) transport failed: - dsn: doctrine://postgres:password@localhost:5432?queue_name=failed + dsn: doctrine://default?queue_name=failed - # Defines routing (message -> transport) - # If the routing for message is missing, the message will be handled by handler immediately when dispatched + # Routing routing: - App\Domain\NewUserEmail: [redis] - App\Domain\ForgotPasswordEmail: [db, redis] - App\Domain\LogText: [db] - - # Route interface - App\Domain\SomeMessageInterface: [db] - - # Route wildcard - *: [sync] + App\Message\SendEmail: [async] + App\Message\SendSms: [async] + App\Message\LogEntry: [sync] + "*": [sync] services: - - App\Domain\LogTextHandler - - App\Domain\NewUserEmailHandler - - App\Domain\ForgotPasswordEmailHandler - - App\RetryStrategy\CustomRetryStrategy + - App\Handler\SendEmailHandler + - App\Handler\SendSmsHandler + - App\Handler\LogEntryHandler ``` -### Message +--- + +## Messages -All messages are just simple [POJO](https://stackoverflow.com/questions/41188002/what-does-the-term-plain-old-php-object-popo-exactly-mean). +Messages are simple PHP objects (POPOs) that carry data. They don't need to extend any class or implement any interface. ```php - [!NOTE] +> Messages should be immutable and contain only the data needed for the handler to process them. - public string $text; +--- + +## Handlers + +Handlers are services that process messages. Each handler must be registered in the DI container and marked as a message handler. + +### Using attributes + +The recommended way to define handlers is using the `#[AsMessageHandler]` attribute: + +```php +text = $text; - } +namespace App\Handler; +use App\Message\SendEmail; +use Symfony\Component\Messenger\Attribute\AsMessageHandler; + +#[AsMessageHandler] +final class SendEmailHandler +{ + public function __invoke(SendEmail $message): void + { + // Send the email... + } } ``` -### Handlers +Register the handler as a service: + +```neon +services: + - App\Handler\SendEmailHandler +``` + +> See [AsMessageHandler](https://symfony.com/doc/current/messenger.html#creating-a-message-handler) in Symfony documentation. + +### Using Neon tags + +Alternatively, you can use the `contributte.messenger.handler` tag: -All handlers must be registered to your [DIC container](https://doc.nette.org/en/dependency-injection) via [Neon files](https://doc.nette.org/en/neon/format).
-All handlers must also be marked as message handlers to handle messages. -There are 2 different ways to mark your handlers: -1. with the neon tag [`contributte.messenger.handler`]: ```neon services: - - class: App\SimpleMessageHandler + class: App\Handler\SendEmailHandler tags: - contributte.messenger.handler: # the configuration below is optional - bus: event - alias: simple + contributte.messenger.handler: +``` + +With additional options: + +```neon +services: + - + class: App\Handler\SendEmailHandler + tags: + contributte.messenger.handler: + bus: messageBus + alias: sendEmail method: __invoke - handles: App\SimpleMessage - priority: 0 - from_transport: sync + handles: App\Message\SendEmail + priority: 10 + from_transport: async ``` -2. with the attribute [`#[AsMessageHandler]`] (https://github.com/symfony/messenger/blob/6e749550d539f787023878fad675b744411db003/Attribute/AsMessageHandler.php). +### Multiple handlers in one class + +You can handle multiple message types in a single class: + +**Using attributes:** + ```php -messageBus->dispatch( + new SendEmail('john@example.com', 'Hello', 'World') + ); + } +} ``` + +### CommandBus + +The command bus is for fire-and-forget operations that don't return a result: + +```php +commandBus->handle( + new CreateUser('john@example.com', 'John Doe') + ); + } +} +``` + +### QueryBus + +The query bus is for operations that return a result: + ```php -queryBus->query(new GetUser($id)); + } +} +``` + +The handler must return a value via the `HandledStamp`: + +```php +userRepository->find($query->id); + } } ``` -### Errors +### Custom bus -Handling errors in async environments is little bit tricky. You need to setup logger to display errors in CLI environments. +You can configure custom buses with specific middleware and options: -## Integrations +```neon +messenger: + bus: + eventBus: + autowired: true + allowNoHandlers: true + allowNoSenders: true + middlewares: + - App\Middleware\LoggingMiddleware() -### Doctrine + customBus: + class: App\Bus\CustomMessageBus + wrapper: App\Bus\CustomBusWrapper + defaultMiddlewares: false + middlewares: + - @validationMiddleware +``` + +| Option | Description | +|--------|-------------| +| `autowired` | Enable autowiring for this bus (default: true for first bus) | +| `allowNoHandlers` | Don't throw exception if no handler found (default: false) | +| `allowNoSenders` | Don't throw exception if no sender configured (default: true) | +| `defaultMiddlewares` | Include default middleware stack (default: true) | +| `middlewares` | Custom middleware to add | +| `class` | Custom bus class implementing `MessageBusInterface` | +| `wrapper` | Wrapper class for easy autowiring | -Take advantage of enpowering this package with 6 extra packages: +### Bus registry + +Access any bus by name using the `BusRegistry`: + +```php +busRegistry->get($busName); + $bus->dispatch($message); + } +} ``` +--- + +## Transports + +Transports define how messages are sent and received. Messages can be processed synchronously or asynchronously. + +> See [Transports](https://symfony.com/doc/current/messenger.html#transports-async-queued-messages) in Symfony documentation. + +### Sync transport + +Process messages immediately (synchronously): + ```neon -# Extension > Nettrine -# => order is crucial -# -extensions: - # Common - nettrine.annotations: Nettrine\Annotations\DI\AnnotationsExtension - nettrine.cache: Nettrine\Cache\DI\CacheExtension - nettrine.migrations: Nettrine\Migrations\DI\MigrationsExtension - nettrine.fixtures: Nettrine\Fixtures\DI\FixturesExtension +messenger: + transport: + sync: + dsn: sync:// +``` + +### In-memory transport - # DBAL - nettrine.dbal: Nettrine\DBAL\DI\DbalExtension - nettrine.dbal.console: Nettrine\DBAL\DI\DbalConsoleExtension +Store messages in memory (useful for testing): - # ORM - nettrine.orm: Nettrine\ORM\DI\OrmExtension - nettrine.orm.cache: Nettrine\ORM\DI\OrmCacheExtension - nettrine.orm.console: Nettrine\ORM\DI\OrmConsoleExtension - nettrine.orm.annotations: Nettrine\ORM\DI\OrmAnnotationsExtension +```neon +messenger: + transport: + memory: + dsn: in-memory:// ``` -## Limitations +### Redis transport -**Roadmap** +Process messages asynchronously via Redis: -- No fallbackBus in RoutableMessageBus. -- No debug console commands. +```bash +composer require symfony/redis-messenger +``` -## Examples +```neon +messenger: + transportFactory: + redis: Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory -### 1. Manual example + transport: + redis: + dsn: redis://localhost:6379/messages + options: + stream: messenger + group: default + consumer: consumer-1 + serializer: default +``` -```sh -composer require contributte/messenger +### Doctrine transport + +Process messages asynchronously via database: + +```bash +composer require symfony/doctrine-messenger ``` ```neon -# Extension > Messenger -# -extensions: - messenger: Contributte\Messenger\DI\MessengerExtension +messenger: + transportFactory: + doctrine: Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory + + transport: + database: + dsn: doctrine://default + options: + table_name: messenger_messages + queue_name: default + auto_setup: true +``` + +> [!IMPORTANT] +> The Doctrine transport requires a configured Doctrine DBAL connection. Use [nettrine/dbal](https://github.com/nettrine/dbal) for Nette integration. + +### AMQP transport +Process messages asynchronously via RabbitMQ: + +```bash +composer require symfony/amqp-messenger +``` + +```neon messenger: + transportFactory: + amqp: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory + transport: - sync: - dsn: "sync://" + rabbitmq: + dsn: amqp://guest:guest@localhost:5672/%2f/messages +``` + +### Custom transport factory + +Register a custom transport factory: + +```neon +messenger: + transportFactory: + custom: App\Transport\CustomTransportFactory +``` +```php + See [Routing](https://symfony.com/doc/current/messenger.html#routing-messages-to-a-transport) in Symfony documentation. + +### Basic routing + +Route messages to specific transports: + +```neon +messenger: routing: - App\Domain\LogText: [sync] + App\Message\SendEmail: [async] + App\Message\SendSms: [async, sync] + App\Message\LogEntry: [sync] +``` + +> [!NOTE] +> If no route is defined for a message, it will be handled synchronously by dispatching it directly to handlers. +### Interface routing + +Route all messages implementing an interface: + +```php + See [messenger:consume](https://symfony.com/doc/current/messenger.html#consuming-messages-running-the-worker) in Symfony documentation. + +### messenger:debug + +Debug message routing and handlers: + +```bash +bin/console messenger:debug +``` + +Output shows all registered messages, their handlers, and routing configuration. + +### messenger:setup-transports + +Create/update transport infrastructure (database tables, Redis streams, etc.): + +```bash +# Setup all transports +bin/console messenger:setup-transports + +# Setup specific transport +bin/console messenger:setup-transports async +``` + +### messenger:stats + +Display transport statistics: + +```bash +bin/console messenger:stats +``` + +### messenger:failed:show + +Show failed messages: + +```bash +# Show all failed messages +bin/console messenger:failed:show + +# Show specific message details +bin/console messenger:failed:show 42 +``` + +### messenger:failed:retry + +Retry failed messages: + +```bash +# Retry all failed messages +bin/console messenger:failed:retry + +# Retry specific message +bin/console messenger:failed:retry 42 + +# Force retry without confirmation +bin/console messenger:failed:retry --force +``` + +### messenger:failed:remove + +Remove failed messages: + +```bash +# Remove specific message +bin/console messenger:failed:remove 42 + +# Remove all failed messages +bin/console messenger:failed:remove --all + +# Force removal without confirmation +bin/console messenger:failed:remove 42 --force +``` + +--- + +## Advanced + +### Retry strategy + +Configure automatic retries for failed messages: + +```neon +messenger: + transport: + async: + dsn: redis://localhost:6379/messages + retryStrategy: + maxRetries: 3 + delay: 1000 + multiplier: 2 + maxDelay: 60000 +``` + +| Option | Description | +|--------|-------------| +| `maxRetries` | Maximum number of retry attempts (default: 3) | +| `delay` | Initial delay in milliseconds (default: 1000) | +| `multiplier` | Delay multiplier for exponential backoff (default: 1) | +| `maxDelay` | Maximum delay in milliseconds (0 = unlimited) | +| `service` | Custom retry strategy service | + +With `multiplier: 2` and `delay: 1000`, the delays will be: 1s, 2s, 4s, 8s, etc. + +**Custom retry strategy:** + +```php + [!IMPORTANT] +> Always configure a failure transport to prevent losing messages. Use `messenger:failed:show` and `messenger:failed:retry` to manage failed messages. + +### Middleware + +Middleware allows you to hook into the message handling process: + +```php +logger->info('Handling message', [ + 'class' => get_class($envelope->getMessage()), + ]); + + $envelope = $stack->next()->handle($envelope, $stack); + + $this->logger->info('Message handled', [ + 'class' => get_class($envelope->getMessage()), + ]); + + return $envelope; + } +} +``` + +```neon +messenger: + bus: + messageBus: + middlewares: + - App\Middleware\LoggingMiddleware() +``` + +> See [Middleware](https://symfony.com/doc/current/messenger.html#middleware) in Symfony documentation. + +### Batch handlers + +Handle multiple messages in a single batch: + +```php +sendEmail($message); + $ack->ack($message); + } catch (\Throwable $e) { + $ack->nack($e); + } + } + } + + private function shouldFlush(): bool + { + return $this->getBatchSize() >= 10; + } +} +``` + +> See [Extracting Results](https://symfony.com/doc/current/messenger.html#extracting-results) in Symfony documentation. + +### Event listeners + +Hook into the message lifecycle using Symfony EventDispatcher: + +```php + 'onMessageHandled', + WorkerMessageFailedEvent::class => 'onMessageFailed', + ]; + } + + public function onMessageHandled(WorkerMessageHandledEvent $event): void + { + // Log successful handling... + } + + public function onMessageFailed(WorkerMessageFailedEvent $event): void + { + // Alert on failure... + } +} +``` + +```neon services: - - App\Domain\LogTextHandler + - + class: App\Listener\MessengerEventSubscriber + tags: [contributte.event_dispatcher.subscriber] +``` + +Available events: + +| Event | Description | +|-------|-------------| +| `WorkerStartedEvent` | Worker has started | +| `WorkerRunningEvent` | Worker is running (each loop) | +| `WorkerStoppedEvent` | Worker has stopped | +| `WorkerMessageReceivedEvent` | Message received from transport | +| `WorkerMessageHandledEvent` | Message was handled successfully | +| `WorkerMessageFailedEvent` | Message handling failed | +| `WorkerMessageRetriedEvent` | Message is being retried | +| `SendMessageToTransportsEvent` | Message is being sent to transports | + +### Serializers + +Configure how messages are serialized for transport: + +```neon +messenger: + serializer: + default: Symfony\Component\Messenger\Transport\Serialization\PhpSerializer + json: Symfony\Component\Messenger\Transport\Serialization\Serializer + + transport: + async: + dsn: redis://localhost:6379/messages + serializer: json +``` + +> [!NOTE] +> The default `PhpSerializer` uses PHP's native serialization. For interoperability with other systems, use the Symfony Serializer. + +### Logging + +Configure separate loggers for HTTP and CLI contexts: + +```neon +messenger: + logger: + # Logger for web requests + httpLogger: Psr\Log\NullLogger + + # Logger for console commands + consoleLogger: Symfony\Component\Console\Logger\ConsoleLogger +``` + +The package automatically switches between loggers based on the execution context. + +--- + +## Testing + +Test message dispatch using the in-memory transport: + +```neon +# config/test.neon +messenger: + transport: + async: + dsn: in-memory:// +``` + +```php +container->getByType(MessageBus::class); + $bus->dispatch(new SendEmail('test@example.com', 'Subject', 'Body')); + + /** @var InMemoryTransport $transport */ + $transport = $this->container->getService('messenger.transport.async'); + + $messages = $transport->getSent(); + Assert::count(1, $messages); + Assert::type(SendEmail::class, $messages[0]->getMessage()); + } +} ``` -### 2. Example projects +Use the `BufferLogger` for testing log output: -We've made a few skeletons with preconfigured Symfony Messenger nad Contributte packages. +```php +obtain(); +Assert::count(2, $logs); +Assert::contains('Message handled', $logs[0]['message']); +``` + +--- + +## Examples + +- [contributte/messenger-skeleton](https://github.com/contributte/messenger-skeleton) - Minimal working example +- [contributte/examples](https://contributte.org/examples.html) - More examples + +--- + +## Limitations -- https://github.com/contributte/messenger-skeleton +**Roadmap:** -### 3. Example playground +- No `fallbackBus` in `RoutableMessageBus` +- Debug Tracy panel (TODO) -- https://contributte.org/examples.html (more examples) +--- -## Other +## Credits -This repository is inspired by these packages. +This package is inspired by: -- https://github.com/fmasa/messenger -- https://gitlab.com/symfony/messenger -- https://gitlab.com/symfony/redis-messenger +- [fmasa/messenger](https://github.com/fmasa/messenger) +- [symfony/messenger](https://github.com/symfony/messenger) +- [symfony/redis-messenger](https://github.com/symfony/redis-messenger) -Thank you folks. +Thank you to all contributors!