Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
"cakedc/cakephp-phpstan": "^4.0.0",
"cakephp/bake": "^3.0.1",
"cakephp/migrations": "^4.5.1",
"dereuromark/cakephp-dto": "^2.1.0",
"dereuromark/cakephp-ide-helper": "^2.0.0",
"dereuromark/cakephp-templating": "^0.2.7",
"dereuromark/cakephp-tools": "^3.0.0",
"dereuromark/cakephp-dto": "^2.1.0",
"fig-r/psr2r-sniffer": "dev-master",
"friendsofcake/search": "^7.0.0",
"phpunit/phpunit": "^10.5 || ^11.5 || ^12.1"
"phpunit/phpunit": "^10.5 || ^11.5 || ^12.1",
"sentry/sentry": "^4.18"
},
"suggest": {
"dereuromark/cakephp-ide-helper": "For maximum IDE support, especially around createJob() usage.",
Expand Down
102 changes: 101 additions & 1 deletion docs/sections/misc.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,64 @@ This includes also failed ones if not filtered further using `where()` condition

## Events
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
These events are useful for monitoring, logging, and integrating with external services like Sentry.

### Queue.Job.created
This event is triggered when a new job is added to the queue (producer side).

```php
use Cake\Event\EventInterface;
use Cake\Event\EventManager;

EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) {
$job = $event->getData('job');
// Track job creation for monitoring
});
```

Event data:
- `job`: The `QueuedJob` entity that was created

### Queue.Job.started
This event is triggered when a worker begins processing a job (consumer side).

```php
EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) {
$job = $event->getData('job');
// Start tracing/monitoring span
});
```

Event data:
- `job`: The `QueuedJob` entity being processed

### Queue.Job.completed
This event is triggered when a job finishes successfully.

```php
EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) {
$job = $event->getData('job');
// Mark trace as successful
});
```

Event data:
- `job`: The `QueuedJob` entity that completed

### Queue.Job.failed
This event is triggered when a job fails (on every failure attempt).

```php
EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) {
$job = $event->getData('job');
$failureMessage = $event->getData('failureMessage');
// Mark trace as failed, log error
});
```

Event data:
- `job`: The `QueuedJob` entity that failed
- `failureMessage`: The error message from the failure

### Queue.Job.maxAttemptsExhausted
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
Expand Down Expand Up @@ -81,10 +139,52 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn
});
```

The event data contains:
Event data:
- `job`: The `QueuedJob` entity that failed
- `failureMessage`: The error message from the last failure

### Sentry Integration

The plugin provides built-in support for [Sentry's queue monitoring](https://docs.sentry.io/platforms/php/tracing/instrumentation/queues-module/) feature.
When enabled, it automatically creates producer and consumer spans for queue jobs.

To enable Sentry integration, add to your configuration:

```php
// In config/app.php or config/app_local.php
'Queue' => [
'sentry' => true,
// ... other queue config
],
```

Requirements:
- The `sentry/sentry` package must be installed
- Sentry must be initialized in your application (e.g., via `lordsimal/cakephp-sentry`)

The integration automatically:
- Creates `queue.publish` spans when jobs are created
- Creates `queue.process` transactions when jobs are processed
- Propagates trace context between producer and consumer via job data
- Sets appropriate status (success/error) based on job outcome
- Includes all standard messaging attributes:
- `messaging.message.id` - Job ID
- `messaging.destination.name` - Task name
- `messaging.message.body.size` - Payload size in bytes
- `messaging.message.retry.count` - Attempt count
- `messaging.message.receive.latency` - Time from scheduled to fetched (ms)

### Using Events for Custom Monitoring

If you prefer to implement your own monitoring integration, you can use the events directly.
The job entity provides all necessary data for tracing:

- `$job->id` - Message identifier (`messaging.message.id`)
- `$job->job_task` - Queue/topic name (`messaging.destination.name`)
- `$job->data` - Payload for calculating message size (`messaging.message.body.size`)
- `$job->attempts` - Retry count (`messaging.message.retry.count`)
- `$job->created`, `$job->notbefore`, `$job->fetched` - For calculating receive latency (`messaging.message.receive.latency`)

## Notes

`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).
Expand Down
17 changes: 16 additions & 1 deletion src/Model/Table/QueuedJobsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Queue\Model\Entity\QueuedJob;
use Queue\Model\Filter\QueuedJobsCollection;
use Queue\Queue\Config;
use Queue\Queue\SentryIntegration;
use Queue\Queue\TaskFinder;
use Queue\Utility\Memory;
use RuntimeException;
Expand Down Expand Up @@ -205,6 +206,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
throw new InvalidArgumentException('Data must be `array|null`, implement `' . FromArrayToArrayInterface::class . '` or provide a `toArray()` method');
}

// Add Sentry trace headers for trace propagation if available
$traceHeaders = SentryIntegration::getTraceHeaders();
if ($traceHeaders && is_array($data)) {
$data = $traceHeaders + $data;
} elseif ($traceHeaders) {
$data = $traceHeaders;
}

$queuedJob = [
'job_task' => $this->jobTask($jobTask),
'data' => $data,
Expand All @@ -216,8 +225,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
}

$queuedJob = $this->newEntity($queuedJob);
$queuedJob = $this->saveOrFail($queuedJob);

$this->dispatchEvent('Queue.Job.created', [
'job' => $queuedJob,
]);
SentryIntegration::startProducerSpan($queuedJob);

return $this->saveOrFail($queuedJob);
return $queuedJob;
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
$taskName = $queuedJob->job_task;

$event = new Event('Queue.Job.started', $this, [
'job' => $queuedJob,
]);
EventManager::instance()->dispatch($event);
SentryIntegration::startConsumerTransaction($queuedJob);

$return = $failureMessage = null;
try {
$this->time = time();
Expand Down Expand Up @@ -242,6 +248,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');

$event = new Event('Queue.Job.failed', $this, [
'job' => $queuedJob,
'failureMessage' => $failureMessage,
]);
EventManager::instance()->dispatch($event);
SentryIntegration::finishConsumerFailure($queuedJob, $failureMessage);

// Dispatch event when job has exhausted all retries
if ($failedStatus === 'aborted') {
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
Expand All @@ -255,6 +268,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
}

$this->QueuedJobs->markJobDone($queuedJob);

$event = new Event('Queue.Job.completed', $this, [
'job' => $queuedJob,
]);
EventManager::instance()->dispatch($event);
SentryIntegration::finishConsumerSuccess($queuedJob);

$this->io->out('Job Finished.');
$this->currentJob = null;
}
Expand Down
Loading