diff --git a/composer.json b/composer.json index bd7c59f4..3fbcab70 100755 --- a/composer.json +++ b/composer.json @@ -34,13 +34,14 @@ "cakedc/cakephp-phpstan": "^4.0.0", "cakephp/bake": "^3.0.1", "cakephp/migrations": "^4.5.1", + "dereuromark/cakephp-dto": "^2.1.0", "dereuromark/cakephp-ide-helper": "^2.0.0", "dereuromark/cakephp-templating": "^0.2.7", "dereuromark/cakephp-tools": "^3.0.0", - "dereuromark/cakephp-dto": "^2.1.0", "fig-r/psr2r-sniffer": "dev-master", "friendsofcake/search": "^7.0.0", - "phpunit/phpunit": "^10.5 || ^11.5 || ^12.1" + "phpunit/phpunit": "^10.5 || ^11.5 || ^12.1", + "sentry/sentry": "^4.18" }, "suggest": { "dereuromark/cakephp-ide-helper": "For maximum IDE support, especially around createJob() usage.", diff --git a/docs/sections/misc.md b/docs/sections/misc.md index 585c6775..bd5d3d21 100644 --- a/docs/sections/misc.md +++ b/docs/sections/misc.md @@ -51,6 +51,64 @@ This includes also failed ones if not filtered further using `where()` condition ## Events The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle. +These events are useful for monitoring, logging, and integrating with external services like Sentry. + +### Queue.Job.created +This event is triggered when a new job is added to the queue (producer side). + +```php +use Cake\Event\EventInterface; +use Cake\Event\EventManager; + +EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) { + $job = $event->getData('job'); + // Track job creation for monitoring +}); +``` + +Event data: +- `job`: The `QueuedJob` entity that was created + +### Queue.Job.started +This event is triggered when a worker begins processing a job (consumer side). + +```php +EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) { + $job = $event->getData('job'); + // Start tracing/monitoring span +}); +``` + +Event data: +- `job`: The `QueuedJob` entity being processed + +### Queue.Job.completed +This event is triggered when a job finishes successfully. + +```php +EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) { + $job = $event->getData('job'); + // Mark trace as successful +}); +``` + +Event data: +- `job`: The `QueuedJob` entity that completed + +### Queue.Job.failed +This event is triggered when a job fails (on every failure attempt). + +```php +EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) { + $job = $event->getData('job'); + $failureMessage = $event->getData('failureMessage'); + // Mark trace as failed, log error +}); +``` + +Event data: +- `job`: The `QueuedJob` entity that failed +- `failureMessage`: The error message from the failure ### Queue.Job.maxAttemptsExhausted This event is triggered when a job has failed and exhausted all of its configured retry attempts. @@ -81,10 +139,52 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn }); ``` -The event data contains: +Event data: - `job`: The `QueuedJob` entity that failed - `failureMessage`: The error message from the last failure +### Sentry Integration + +The plugin provides built-in support for [Sentry's queue monitoring](https://docs.sentry.io/platforms/php/tracing/instrumentation/queues-module/) feature. +When enabled, it automatically creates producer and consumer spans for queue jobs. + +To enable Sentry integration, add to your configuration: + +```php +// In config/app.php or config/app_local.php +'Queue' => [ + 'sentry' => true, + // ... other queue config +], +``` + +Requirements: +- The `sentry/sentry` package must be installed +- Sentry must be initialized in your application (e.g., via `lordsimal/cakephp-sentry`) + +The integration automatically: +- Creates `queue.publish` spans when jobs are created +- Creates `queue.process` transactions when jobs are processed +- Propagates trace context between producer and consumer via job data +- Sets appropriate status (success/error) based on job outcome +- Includes all standard messaging attributes: + - `messaging.message.id` - Job ID + - `messaging.destination.name` - Task name + - `messaging.message.body.size` - Payload size in bytes + - `messaging.message.retry.count` - Attempt count + - `messaging.message.receive.latency` - Time from scheduled to fetched (ms) + +### Using Events for Custom Monitoring + +If you prefer to implement your own monitoring integration, you can use the events directly. +The job entity provides all necessary data for tracing: + +- `$job->id` - Message identifier (`messaging.message.id`) +- `$job->job_task` - Queue/topic name (`messaging.destination.name`) +- `$job->data` - Payload for calculating message size (`messaging.message.body.size`) +- `$job->attempts` - Retry count (`messaging.message.retry.count`) +- `$job->created`, `$job->notbefore`, `$job->fetched` - For calculating receive latency (`messaging.message.receive.latency`) + ## Notes `` is the complete class name without the Task suffix (e.g. Example or PluginName.Example). diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index e65704c0..de9c325c 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -17,6 +17,7 @@ use Queue\Model\Entity\QueuedJob; use Queue\Model\Filter\QueuedJobsCollection; use Queue\Queue\Config; +use Queue\Queue\SentryIntegration; use Queue\Queue\TaskFinder; use Queue\Utility\Memory; use RuntimeException; @@ -205,6 +206,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array throw new InvalidArgumentException('Data must be `array|null`, implement `' . FromArrayToArrayInterface::class . '` or provide a `toArray()` method'); } + // Add Sentry trace headers for trace propagation if available + $traceHeaders = SentryIntegration::getTraceHeaders(); + if ($traceHeaders && is_array($data)) { + $data = $traceHeaders + $data; + } elseif ($traceHeaders) { + $data = $traceHeaders; + } + $queuedJob = [ 'job_task' => $this->jobTask($jobTask), 'data' => $data, @@ -216,8 +225,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array } $queuedJob = $this->newEntity($queuedJob); + $queuedJob = $this->saveOrFail($queuedJob); + + $this->dispatchEvent('Queue.Job.created', [ + 'job' => $queuedJob, + ]); + SentryIntegration::startProducerSpan($queuedJob); - return $this->saveOrFail($queuedJob); + return $queuedJob; } /** diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index 36a632db..27dca797 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -213,6 +213,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false); $taskName = $queuedJob->job_task; + $event = new Event('Queue.Job.started', $this, [ + 'job' => $queuedJob, + ]); + EventManager::instance()->dispatch($event); + SentryIntegration::startConsumerTransaction($queuedJob); + $return = $failureMessage = null; try { $this->time = time(); @@ -242,6 +248,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid); $this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.'); + $event = new Event('Queue.Job.failed', $this, [ + 'job' => $queuedJob, + 'failureMessage' => $failureMessage, + ]); + EventManager::instance()->dispatch($event); + SentryIntegration::finishConsumerFailure($queuedJob, $failureMessage); + // Dispatch event when job has exhausted all retries if ($failedStatus === 'aborted') { $event = new Event('Queue.Job.maxAttemptsExhausted', $this, [ @@ -255,6 +268,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { } $this->QueuedJobs->markJobDone($queuedJob); + + $event = new Event('Queue.Job.completed', $this, [ + 'job' => $queuedJob, + ]); + EventManager::instance()->dispatch($event); + SentryIntegration::finishConsumerSuccess($queuedJob); + $this->io->out('Job Finished.'); $this->currentJob = null; } diff --git a/src/Queue/SentryIntegration.php b/src/Queue/SentryIntegration.php new file mode 100644 index 00000000..6b1175e7 --- /dev/null +++ b/src/Queue/SentryIntegration.php @@ -0,0 +1,252 @@ +getSpan(); + + if ($parentSpan === null) { + return; + } + + $context = SpanContext::make() + ->setOp('queue.publish') + ->setDescription($job->job_task); + + $span = $parentSpan->startChild($context); + $hub->setSpan($span); + + $span->setData([ + 'messaging.message.id' => (string)$job->id, + 'messaging.destination.name' => $job->job_task, + 'messaging.message.body.size' => static::getPayloadSize($job), + ]); + + $span->finish(); + $hub->setSpan($parentSpan); + } catch (Throwable $e) { + // Silently ignore Sentry errors to not disrupt queue operations + } + } + + /** + * Start a consumer transaction when a job begins processing. + * + * @param \Queue\Model\Entity\QueuedJob $job The job being processed + * + * @return void + */ + public static function startConsumerTransaction(QueuedJob $job): void { + if (!static::isAvailable()) { + return; + } + + try { + static::$jobStartTime = microtime(true); + + // Try to continue trace from job data if available + $sentryTrace = null; + $baggage = null; + if (is_array($job->data)) { + $sentryTrace = $job->data['_sentry_trace'] ?? null; + $baggage = $job->data['_sentry_baggage'] ?? null; + } + + if ($sentryTrace !== null) { + $context = \Sentry\continueTrace($sentryTrace, $baggage ?? ''); + } else { + $context = TransactionContext::make(); + } + + $context->setOp('queue.process'); + $context->setName($job->job_task); + + $transaction = \Sentry\startTransaction($context); + SentrySdk::getCurrentHub()->setSpan($transaction); + + static::$currentTransaction = $transaction; + } catch (Throwable $e) { + // Silently ignore Sentry errors + static::$currentTransaction = null; + } + } + + /** + * Finish the consumer transaction when a job completes successfully. + * + * @param \Queue\Model\Entity\QueuedJob $job The completed job + * + * @return void + */ + public static function finishConsumerSuccess(QueuedJob $job): void { + static::finishConsumerTransaction($job, true); + } + + /** + * Finish the consumer transaction when a job fails. + * + * @param \Queue\Model\Entity\QueuedJob $job The failed job + * @param string|null $failureMessage The error message + * + * @return void + */ + public static function finishConsumerFailure(QueuedJob $job, ?string $failureMessage = null): void { + static::finishConsumerTransaction($job, false, $failureMessage); + } + + /** + * Finish the consumer transaction. + * + * @param \Queue\Model\Entity\QueuedJob $job The job + * @param bool $success Whether the job succeeded + * @param string|null $failureMessage Optional failure message + * + * @return void + */ + protected static function finishConsumerTransaction( + QueuedJob $job, + bool $success, + ?string $failureMessage = null, + ): void { + if (!static::isAvailable() || static::$currentTransaction === null) { + return; + } + + try { + $transaction = static::$currentTransaction; + + if (!$success && class_exists('\Sentry\Tracing\SpanStatus')) { + $transaction->setStatus(SpanStatus::internalError()); + } + + $receiveLatency = null; + if ($job->fetched !== null && $job->created !== null) { + $scheduledTime = $job->notbefore ?? $job->created; + $receiveLatency = (float)$job->fetched->getTimestamp() - (float)$scheduledTime->getTimestamp(); + } + + $data = [ + 'messaging.message.id' => (string)$job->id, + 'messaging.destination.name' => $job->job_task, + 'messaging.message.body.size' => static::getPayloadSize($job), + 'messaging.message.retry.count' => $job->attempts, + ]; + + if ($receiveLatency !== null) { + $data['messaging.message.receive.latency'] = $receiveLatency * 1000; // milliseconds + } + + $transaction->setData($data); + $transaction->finish(); + } catch (Throwable $e) { + // Silently ignore Sentry errors + } finally { + static::$currentTransaction = null; + static::$jobStartTime = null; + } + } + + /** + * Get trace headers to include in job data for trace propagation. + * + * @return array + */ + public static function getTraceHeaders(): array { + if (!static::isAvailable()) { + return []; + } + + try { + $hub = SentrySdk::getCurrentHub(); + $span = $hub->getSpan(); + + if ($span === null) { + return []; + } + + return [ + '_sentry_trace' => \Sentry\getTraceparent(), + '_sentry_baggage' => \Sentry\getBaggage(), + ]; + } catch (Throwable $e) { + return []; + } + } + + /** + * Calculate payload size in bytes. + * + * @param \Queue\Model\Entity\QueuedJob $job + * + * @return int + */ + protected static function getPayloadSize(QueuedJob $job): int { + if ($job->data === null) { + return 0; + } + + $encoded = json_encode($job->data); + + return $encoded !== false ? strlen($encoded) : 0; + } + +} diff --git a/tests/TestCase/Model/Table/QueuedJobsTableTest.php b/tests/TestCase/Model/Table/QueuedJobsTableTest.php index 739a5c2b..39989bb7 100644 --- a/tests/TestCase/Model/Table/QueuedJobsTableTest.php +++ b/tests/TestCase/Model/Table/QueuedJobsTableTest.php @@ -11,6 +11,8 @@ use Cake\Core\Configure; use Cake\Datasource\ConnectionManager; +use Cake\Event\EventList; +use Cake\Event\EventManager; use Cake\I18n\DateTime; use Cake\ORM\TableRegistry; use Cake\TestSuite\TestCase; @@ -752,6 +754,23 @@ public function testGetStats() { $this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1); } + /** + * Test that Queue.Job.created event is fired when a job is created + * + * @return void + */ + public function testJobCreatedEvent() { + // Set up event tracking + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $job = $this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']); + + // Check that the created event was dispatched + $this->assertEventFired('Queue.Job.created'); + } + /** * Helper method for skipping tests that need a real connection. * diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index 63ecd9c1..06806d0e 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -15,6 +15,7 @@ use Queue\Model\Entity\QueuedJob; use Queue\Model\Table\QueuedJobsTable; use Queue\Queue\Processor; +use Queue\Queue\Task\ExampleTask; use Queue\Queue\Task\RetryExampleTask; use ReflectionClass; use RuntimeException; @@ -286,6 +287,117 @@ public function testWorkerTimeoutHandlingIntegration() { } } + /** + * Test that Queue.Job.started event is fired when job begins processing + * + * @return void + */ + public function testJobStartedEvent() { + // Set up event tracking + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.Example', [], ['priority' => 1]); + + // Create processor with mock task + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + // Create a mock task that succeeds (run method is void, so no return) + $mockTask = $this->getMockBuilder(ExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + + $processor->method('loadTask')->willReturn($mockTask); + + // Run the job + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + // Check that the started event was dispatched + $this->assertEventFired('Queue.Job.started'); + } + + /** + * Test that Queue.Job.completed event is fired when job finishes successfully + * + * @return void + */ + public function testJobCompletedEvent() { + // Set up event tracking + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.Example', [], ['priority' => 1]); + + // Create processor with mock task + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + // Create a mock task that succeeds (run method is void, so no return) + $mockTask = $this->getMockBuilder(ExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + + $processor->method('loadTask')->willReturn($mockTask); + + // Run the job + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + // Check that the completed event was dispatched + $this->assertEventFired('Queue.Job.completed'); + } + + /** + * Test that Queue.Job.failed event is fired when job fails + * + * @return void + */ + public function testJobFailedEvent() { + // Set up event tracking + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job that will fail + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.RetryExample', [], ['priority' => 1]); + + // Create processor with mock task that fails + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(RetryExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + $mockTask->method('run')->willThrowException(new RuntimeException('Task failed')); + + $processor->method('loadTask')->willReturn($mockTask); + + // Run the job (it will fail) + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + // Check that the failed event was dispatched + $this->assertEventFired('Queue.Job.failed'); + } + /** * Test setPhpTimeout with new config names *