From 5bdfa82faa48ff2d7eccc5441ebd0308f5e101a7 Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Wed, 29 Oct 2025 21:21:23 +0000 Subject: [PATCH 1/3] Add support for MessageGroupId --- CHANGELOG.md | 2 + README.md | 12 + src/AwsSqs/JobQueue.php | 42 +++- tests/AwsSqs/JobQueueTest.php | 458 ++++++++++++++++++++++++++++++++++ 4 files changed, 509 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9dcd12..faa1597 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Changed +- `Phlib\JobQueue\AwsSqs\JobQueue` optionally accepts a `$groupKey` parameter to set `MessageGroupId` on the message ## [2.1.0] - 2025-08-19 ### Added diff --git a/README.md b/README.md index 6ccc084..aed4b46 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,18 @@ CREATE TABLE `scheduled_queue` ( ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4; ``` +## SQS Fair Queues +JobQueue can automatically set `MessageGroupId` from a parameter on the job's body see [the AWS documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html) for more information. +``` php +$client = new \Aws\Sqs\SqsClient(['region' => 'eu-west-1', 'credentials' => ['key' => 'foo', 'secret' => 'bar']]); +$db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true); + +$jobQueue = new \Phlib\JobQueue\AwsSqs\JobQueue($client, $scheduler, '', 'tenantId'); //tenantId from the job body will be used as the MessageGroupId +$job = new \Phlib\JobQueue\Job('my-queue', ['foo' => 'bar', 'tenantId' => 'tenant-26']); //MessageGroupId will get set to 'tenant-26' +$jobQueue->put($job); +``` + ## License This package is free software: you can redistribute it and/or modify diff --git a/src/AwsSqs/JobQueue.php b/src/AwsSqs/JobQueue.php index 7904b38..c657bc5 100644 --- a/src/AwsSqs/JobQueue.php +++ b/src/AwsSqs/JobQueue.php @@ -28,11 +28,18 @@ class JobQueue implements BatchableJobQueueInterface */ private $queuePrefix; - public function __construct(SqsClient $client, SchedulerInterface $scheduler, $queuePrefix = '') - { + private ?string $groupKey; + + public function __construct( + SqsClient $client, + SchedulerInterface $scheduler, + $queuePrefix = '', + ?string $groupKey = null + ) { $this->client = $client; $this->scheduler = $scheduler; $this->queuePrefix = $queuePrefix; + $this->groupKey = $groupKey; } /** @@ -58,11 +65,13 @@ public function put(JobInterface $job): self return $this; } - $this->client->sendMessage([ + $message = $this->getMessageWithGroupId($job, [ 'QueueUrl' => $this->getQueueUrlWithPrefix($job->getQueue()), 'DelaySeconds' => $job->getDelay(), 'MessageBody' => JobFactory::serializeBody($job), ]); + + $this->client->sendMessage($message); return $this; } catch (SqsException $exception) { throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); @@ -80,11 +89,11 @@ public function putBatch(array $jobs): self continue; } - $queues[$job->getQueue()][] = [ + $queues[$job->getQueue()][] = $this->getMessageWithGroupId($job, [ 'Id' => (string) $key, 'DelaySeconds' => $job->getDelay(), 'MessageBody' => JobFactory::serializeBody($job), - ]; + ]); } foreach ($queues as $queue => $jobs) { @@ -209,4 +218,27 @@ private function determineDeadletterQueue($queue) throw new RuntimeException("Specified queue '{$name}' does not have a Redrive Policy"); } } + + private function getMessageWithGroupId(JobInterface $job, array $message): array + { + if (!$this->groupKey) { + return $message; + } + + $body = $job->getBody(); + + $groupId = null; + + if (is_array($body) && isset($body[$this->groupKey])) { + $groupId = $body[$this->groupKey]; + } elseif (is_object($body)) { + $groupId = $body->{$this->groupKey} ?? null; + } + + if ($groupId !== null) { + $message['MessageGroupId'] = (string) $groupId; + } + + return $message; + } } diff --git a/tests/AwsSqs/JobQueueTest.php b/tests/AwsSqs/JobQueueTest.php index 94e999b..db21691 100644 --- a/tests/AwsSqs/JobQueueTest.php +++ b/tests/AwsSqs/JobQueueTest.php @@ -475,6 +475,464 @@ public function testPutBatchWithEmptyArray(): void $this->assertSame($jobQueue, $result); } + public function testPutWithMessageGroupIdObjectProperty(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'userId'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobData = (object) [ + 'userId' => 'user123', + 'action' => 'process', + ]; + $job = new Job($queue, $jobData); + + $expectedMessage = [ + 'QueueUrl' => $queueUrl, + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => $jobData, + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => 'user123', + ]; + + $sqsClient->sendMessage($expectedMessage)->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->put($job); + + $this->assertSame($jobQueue, $result); + } + + public function testPutWithMessageGroupIdArrayProperty(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'tenantId'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobData = [ + 'tenantId' => 456, + 'data' => 'process this', + ]; + $job = new Job($queue, $jobData); + + $expectedMessage = [ + 'QueueUrl' => $queueUrl, + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => $jobData, + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => '456', + ]; + + $sqsClient->sendMessage($expectedMessage)->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->put($job); + + $this->assertSame($jobQueue, $result); + } + + public function testPutWithoutMessageGroupIdWhenGroupKeyMissing(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'missingKey'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobData = [ + 'userId' => 'user123', + 'action' => 'process', + ]; + $job = new Job($queue, $jobData); + + $expectedMessage = [ + 'QueueUrl' => $queueUrl, + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => $jobData, + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + ]; + + $sqsClient->sendMessage($expectedMessage)->shouldBeCalledOnce(); + + // groupKey is set but doesn't exist in job data, so no MessageGroupId should be set + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->put($job); + + $this->assertSame($jobQueue, $result); + } + + public function testPutWithMessageGroupIdNullValue(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'userId'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobData = [ + 'userId' => null, + 'action' => 'process', + ]; + $job = new Job($queue, $jobData); + + $expectedMessage = [ + 'QueueUrl' => $queueUrl, + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => $jobData, + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + ]; + + $sqsClient->sendMessage($expectedMessage)->shouldBeCalledOnce(); + + // groupKey exists but value is null, so no MessageGroupId should be set + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->put($job); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithMessageGroupId(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'userId'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $job1 = new Job($queue, [ + 'userId' => 'user123', + 'data' => 'test1', + ]); + $job2 = new Job($queue, (object) [ + 'userId' => 'user456', + 'data' => 'test2', + ]); + $job3 = new Job($queue, [ + 'data' => 'test3', + ]); // No userId - should not have MessageGroupId + $job4 = new Job($queue, (object) [ + 'data' => 'test4', + ]); // No userId - should not have MessageGroupId + $jobs = [$job1, $job2, $job3, $job4]; + + $expectedEntries = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => [ + 'userId' => 'user123', + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => 'user123', + ], + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => (object) [ + 'userId' => 'user456', + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => 'user456', + ], + [ + 'Id' => '2', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => [ + 'data' => 'test3', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + // No MessageGroupId for this entry + ], + [ + 'Id' => '3', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => (object) [ + 'data' => 'test4', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + // No MessageGroupId for this entry + ], + ]; + + $sqsClient->sendMessageBatch([ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ])->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithMessageGroupIdMixedQueues(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'tenantId'; + $queue1 = 'test-queue-1'; + $queue2 = 'test-queue-2'; + $queueUrl1 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-1'; + $queueUrl2 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-2'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue1, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl1]])); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue2, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl2]])); + + $job1 = new Job($queue1, [ + 'tenantId' => 100, + 'data' => 'test1', + ]); + $job2 = new Job($queue2, [ + 'tenantId' => 200, + 'data' => 'test2', + ]); + $job3 = new Job($queue1, [ + 'tenantId' => 100, + 'data' => 'test3', + ]); + $jobs = [$job1, $job2, $job3]; + + $expectedEntries1 = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue1, + 'body' => [ + 'tenantId' => 100, + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => '100', + ], + [ + 'Id' => '2', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue1, + 'body' => [ + 'tenantId' => 100, + 'data' => 'test3', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => '100', + ], + ]; + + $expectedEntries2 = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue2, + 'body' => [ + 'tenantId' => 200, + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => '200', + ], + ]; + + $sqsClient->sendMessageBatch([ + 'QueueUrl' => $queueUrl1, + 'Entries' => $expectedEntries1, + ])->shouldBeCalledOnce(); + + $sqsClient->sendMessageBatch([ + 'QueueUrl' => $queueUrl2, + 'Entries' => $expectedEntries2, + ])->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithMessageGroupIdScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $groupKey = 'userId'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $job1 = new Job($queue, [ + 'userId' => 'user123', + 'data' => 'test1', + ], null, 300); + $job2 = new Job($queue, [ + 'userId' => 'user456', + 'data' => 'test2', + ], null, 0); + $jobs = [$job1, $job2]; + + $scheduler->expects($this->exactly(2)) + ->method('shouldBeScheduled') + ->willReturnCallback(function ($delay) { + return $delay === 300; + }); + + $scheduler->expects($this->once()) + ->method('store') + ->with($job1); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $expectedEntries = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode([ + 'queue' => $queue, + 'body' => [ + 'userId' => 'user456', + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ]), + 'MessageGroupId' => 'user456', + ], + ]; + + $sqsClient->sendMessageBatch([ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ])->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix, $groupKey); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + private function mockAwsResult(array $valueMap): MockObject { $result = $this->createMock(Result::class); From 1fec47784c71af7eb5cbdcb6ea31dd34dce89fa6 Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Tue, 11 Nov 2025 13:24:25 +0000 Subject: [PATCH 2/3] Tag for 2.2.0 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index faa1597..2e69bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] + +## [2.2.0] - 2025-11-11 ### Changed - `Phlib\JobQueue\AwsSqs\JobQueue` optionally accepts a `$groupKey` parameter to set `MessageGroupId` on the message From 96ba4af0a9b4fc24d614db42d56880cd2060146c Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Tue, 11 Nov 2025 13:58:37 +0000 Subject: [PATCH 3/3] Tag for 3.2.0 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f43cb7..107901f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] + +## [3.2.0] - 2025-11-11 ### Changed - `Phlib\JobQueue\AwsSqs\JobQueue` optionally accepts a `$groupKey` parameter to set `MessageGroupId` on the message