Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [3.2.0] - 2025-11-11
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` optionally accepts a `$groupKey` parameter to set `MessageGroupId` on the message

## [3.1.0] - 2025-08-20
### Added
- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches.
Expand Down Expand Up @@ -49,6 +53,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- **BC break**: Removed support for PHP versions <= v8.0 as they are no longer
[actively supported](https://php.net/supported-versions.php) by the PHP project.

## [2.2.0] - 2025-11-11
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` optionally accepts a `$groupKey` parameter to set `MessageGroupId` on the message

## [2.1.0] - 2025-08-19
### Added
- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches.
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ CREATE TABLE `scheduled_queue` (
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
```

## SQS Fair Queues
JobQueue can automatically set `MessageGroupId` from a parameter on the job's body see [the AWS documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html) for more information.
``` php
$client = new \Aws\Sqs\SqsClient(['region' => 'eu-west-1', 'credentials' => ['key' => 'foo', 'secret' => 'bar']]);
$db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true);

$jobQueue = new \Phlib\JobQueue\AwsSqs\JobQueue($client, $scheduler, '', 'tenantId'); //tenantId from the job body will be used as the MessageGroupId
$job = new \Phlib\JobQueue\Job('my-queue', ['foo' => 'bar', 'tenantId' => 'tenant-26']); //MessageGroupId will get set to 'tenant-26'
$jobQueue->put($job);
```

## License

This package is free software: you can redistribute it and/or modify
Expand Down
32 changes: 29 additions & 3 deletions src/AwsSqs/JobQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function __construct(
private readonly SqsClient $client,
private readonly SchedulerInterface $scheduler,
private string $queuePrefix = '',
private ?string $groupKey = null,
) {
}

Expand All @@ -48,11 +49,13 @@ public function put(JobInterface $job): self
return $this;
}

$this->client->sendMessage([
$message = $this->getMessageWithGroupId($job, [
'QueueUrl' => $this->getQueueUrlWithPrefix($job->getQueue()),
'DelaySeconds' => $job->getDelay(),
'MessageBody' => JobFactory::serializeBody($job),
]);

$this->client->sendMessage($message);
return $this;
} catch (SqsException $exception) {
throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
Expand All @@ -70,11 +73,11 @@ public function putBatch(array $jobs): self
continue;
}

$queues[$job->getQueue()][] = [
$queues[$job->getQueue()][] = $this->getMessageWithGroupId($job, [
'Id' => (string)$key,
'DelaySeconds' => $job->getDelay(),
'MessageBody' => JobFactory::serializeBody($job),
];
]);
}

foreach ($queues as $queue => $jobs) {
Expand Down Expand Up @@ -199,4 +202,27 @@ private function determineDeadletterQueue(string $queue): string
throw new RuntimeException("Specified queue '{$name}' does not have a Redrive Policy");
}
}

private function getMessageWithGroupId(JobInterface $job, array $message): array
{
if (!$this->groupKey) {
return $message;
}

$body = $job->getBody();

$groupId = null;

if (is_array($body) && isset($body[$this->groupKey])) {
$groupId = $body[$this->groupKey];
} elseif (is_object($body)) {
$groupId = $body->{$this->groupKey} ?? null;
}

if ($groupId !== null) {
$message['MessageGroupId'] = (string)$groupId;
}

return $message;
}
}
Loading