From 75bf2bb0898a2207b4d309f3aaa071f4884133fe Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Wed, 13 Aug 2025 09:43:40 +0100 Subject: [PATCH 1/6] Fix actions --- .github/workflows/code-checks.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/code-checks.yml b/.github/workflows/code-checks.yml index 0771941..5c757f4 100644 --- a/.github/workflows/code-checks.yml +++ b/.github/workflows/code-checks.yml @@ -19,7 +19,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up PHP uses: shivammathur/setup-php@v2 @@ -29,7 +29,7 @@ jobs: - name: Cache Composer packages id: composer-cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: vendor key: ${{ runner.os }}-php${{ matrix.php }}-${{ hashFiles('**/composer.lock') }} @@ -46,4 +46,6 @@ jobs: run: vendor/bin/phpunit --coverage-clover=coverage.xml - name: "Upload coverage to Codecov" - uses: "codecov/codecov-action@v3" + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} From 85f37a1e7d9641493a3ebedd55983fecc3cca547 Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Wed, 13 Aug 2025 11:27:08 +0100 Subject: [PATCH 2/6] Add support for batching --- CHANGELOG.md | 15 + src/AwsSqs/JobQueue.php | 37 +- src/BatchableJobQueueInterface.php | 13 + src/Console/MonitorCommand.php | 34 +- src/Scheduler/BatchableSchedulerInterface.php | 20 + src/Scheduler/DbScheduler.php | 77 +++- tests/AwsSqs/JobQueueTest.php | 357 ++++++++++++++++++ tests/Scheduler/DbSchedulerTest.php | 181 ++++++++- 8 files changed, 702 insertions(+), 32 deletions(-) create mode 100644 src/BatchableJobQueueInterface.php create mode 100644 src/Scheduler/BatchableSchedulerInterface.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 885c91b..209c89f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches. + - Extends `JobQueueInterface`. + - Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`. +- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches. + - Extends `SchedulerInterface`. + - Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs. + - Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success. +### Changed +- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. + This defaults to `50` if not provided. +- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. +- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching. ## [2.0.0] - 2022-09-14 ### Added diff --git a/src/AwsSqs/JobQueue.php b/src/AwsSqs/JobQueue.php index 72dfc4b..7904b38 100644 --- a/src/AwsSqs/JobQueue.php +++ b/src/AwsSqs/JobQueue.php @@ -6,14 +6,14 @@ use Aws\Sqs\Exception\SqsException; use Aws\Sqs\SqsClient; +use Phlib\JobQueue\BatchableJobQueueInterface; use Phlib\JobQueue\Exception\InvalidArgumentException; use Phlib\JobQueue\Exception\RuntimeException; use Phlib\JobQueue\Job; use Phlib\JobQueue\JobInterface; -use Phlib\JobQueue\JobQueueInterface; use Phlib\JobQueue\Scheduler\SchedulerInterface; -class JobQueue implements JobQueueInterface +class JobQueue implements BatchableJobQueueInterface { private SqsClient $client; @@ -69,6 +69,39 @@ public function put(JobInterface $job): self } } + public function putBatch(array $jobs): self + { + try { + $queues = []; + + foreach ($jobs as $key => $job) { + if ($this->scheduler->shouldBeScheduled($job->getDelay())) { + $this->scheduler->store($job); + continue; + } + + $queues[$job->getQueue()][] = [ + 'Id' => (string) $key, + 'DelaySeconds' => $job->getDelay(), + 'MessageBody' => JobFactory::serializeBody($job), + ]; + } + + foreach ($queues as $queue => $jobs) { + foreach (array_chunk($jobs, 10) as $batch) { + $this->client->sendMessageBatch([ + 'QueueUrl' => $this->getQueueUrlWithPrefix($queue), + 'Entries' => $batch, + ]); + } + } + + return $this; + } catch (SqsException $exception) { + throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); + } + } + public function retrieve(string $queue): ?JobInterface { try { diff --git a/src/BatchableJobQueueInterface.php b/src/BatchableJobQueueInterface.php new file mode 100644 index 0000000..873d4d3 --- /dev/null +++ b/src/BatchableJobQueueInterface.php @@ -0,0 +1,13 @@ +logFile = $logFile; } - while ($jobData = $this->scheduler->retrieve()) { - $output->writeln("Job {$jobData['id']} added."); - $this->jobQueue->put($this->createJob($jobData)); - $this->scheduler->remove($jobData['id']); + if ($this->scheduler instanceof BatchableSchedulerInterface) { + while ($jobsData = $this->scheduler->retrieveBatch()) { + $jobs = []; + foreach ($jobsData as $jobData) { + $output->writeln("Job {$jobData['id']} added."); + $jobs[] = $this->createJob($jobData); + } + $this->putJobBatch($jobs); + $this->scheduler->removeBatch(array_column($jobsData, 'id')); + } + } else { + while ($jobData = $this->scheduler->retrieve()) { + $output->writeln("Job {$jobData['id']} added."); + $this->jobQueue->put($this->createJob($jobData)); + $this->scheduler->remove($jobData['id']); + } } return 0; @@ -71,6 +85,18 @@ protected function createJob(array $schedulerJob): JobInterface ); } + protected function putJobBatch(array $jobs): void + { + if ($this->jobQueue instanceof BatchableJobQueueInterface) { + $this->jobQueue->putBatch($jobs); + return; + } + + foreach ($jobs as $job) { + $this->jobQueue->put($job); + } + } + protected function createChildOutput(): OutputInterface { if (empty($this->logFile)) { diff --git a/src/Scheduler/BatchableSchedulerInterface.php b/src/Scheduler/BatchableSchedulerInterface.php new file mode 100644 index 0000000..21d96af --- /dev/null +++ b/src/Scheduler/BatchableSchedulerInterface.php @@ -0,0 +1,20 @@ +adapter = $adapter; $this->maximumDelay = $maximumDelay; $this->minimumPickup = $minimumPickup; + $this->batchSize = $batchSize; } public function shouldBeScheduled($delay): bool @@ -53,7 +58,24 @@ public function store(JobInterface $job): bool */ public function retrieve() { - $sql = ' + $job = $this->queryJobs(1); + return $job ? $job[0] : false; + } + + /** + * @return array|false + */ + public function retrieveBatch() + { + return $this->queryJobs($this->batchSize); + } + + /** + * @return array|false + */ + private function queryJobs(int $batchSize) + { + $sql = " UPDATE scheduled_queue SET picked_by = CONNECTION_ID(), picked_ts = NOW() @@ -62,31 +84,38 @@ public function retrieve() picked_by IS NULL ORDER BY scheduled_ts DESC - LIMIT 1'; + LIMIT {$batchSize}"; $stmt = $this->adapter->query($sql, [ ':minimumPickup' => $this->minimumPickup, ]); + if ($stmt->rowCount() === 0) { return false; // no jobs } - $sql = 'SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT 1'; - $row = $this->adapter->query($sql)->fetch(\PDO::FETCH_ASSOC); - - $scheduledTime = strtotime($row['scheduled_ts']); - $delay = $scheduledTime - time(); - if ($delay < 0) { - $delay = 0; + $sql = "SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT {$batchSize}"; + $rows = $this->adapter->query($sql)->fetchAll(\PDO::FETCH_ASSOC); + + $jobs = []; + + foreach ($rows as $row) { + $scheduledTime = strtotime($row['scheduled_ts']); + $delay = $scheduledTime - time(); + if ($delay < 0) { + $delay = 0; + } + + $jobs[] = [ + 'id' => (int) $row['id'], + 'queue' => $row['tube'], + 'data' => unserialize($row['data']), + 'delay' => $delay, + 'priority' => (int) $row['priority'], + 'ttr' => (int) $row['ttr'], + ]; } - return [ - 'id' => $row['id'], - 'queue' => $row['tube'], - 'data' => unserialize($row['data']), - 'delay' => $delay, - 'priority' => $row['priority'], - 'ttr' => $row['ttr'], - ]; + return $jobs; } /** @@ -102,6 +131,16 @@ public function remove($jobId): bool ->rowCount(); } + public function removeBatch(array $jobIds): bool + { + $table = $this->adapter->quote()->identifier('scheduled_queue'); + $sql = "DELETE FROM {$table} WHERE id IN ( " . implode(',', array_fill(0, count($jobIds), '?')) . ' )'; + + return $this->adapter + ->query($sql, $jobIds) + ->rowCount() === count($jobIds); + } + protected function insert(array $data): int { $table = $this->adapter->quote()->identifier('scheduled_queue'); diff --git a/tests/AwsSqs/JobQueueTest.php b/tests/AwsSqs/JobQueueTest.php index 6f19fdc..94e999b 100644 --- a/tests/AwsSqs/JobQueueTest.php +++ b/tests/AwsSqs/JobQueueTest.php @@ -118,6 +118,363 @@ public function testMarkAsErrorWithPrefix(): void $jobQueue->markAsError($job); } + public function testPutBatchWithSingleQueue(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], null, 0, 1024, 60); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], null, 5, 512, 30); + $jobs = [$job1, $job2]; + + $expectedEntries = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + [ + 'Id' => '1', + 'DelaySeconds' => 5, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 5, + 'priority' => 512, + 'ttr' => 30, + ] + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ] + )->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithMultipleQueues(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue1 = 'test-queue-1'; + $queue2 = 'test-queue-2'; + $queueUrl1 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-1'; + $queueUrl2 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-2'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue1, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl1]])); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue2, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl2]])); + + $job1 = new Job($queue1, [ + 'data' => 'test1', + ]); + $job2 = new Job($queue2, [ + 'data' => 'test2', + ]); + $job3 = new Job($queue1, [ + 'data' => 'test3', + ]); + $jobs = [$job1, $job2, $job3]; + + $expectedEntries1 = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue1, + 'body' => [ + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + [ + 'Id' => '2', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue1, + 'body' => [ + 'data' => 'test3', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + ]; + + $expectedEntries2 = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue2, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl1, + 'Entries' => $expectedEntries1, + ] + )->shouldBeCalledOnce(); + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl2, + 'Entries' => $expectedEntries2, + ] + )->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], null, 300); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], null, 0); + $jobs = [$job1, $job2]; + + $scheduler->expects($this->exactly(2)) + ->method('shouldBeScheduled') + ->willReturnCallback( + function ($delay) { + if ($delay === 300) { + return true; + } elseif ($delay === 0) { + return false; + } + throw new \InvalidArgumentException("Unexpected delay: {$delay}"); + } + ); + + $scheduler->expects($this->once()) + ->method('store') + ->with($job1); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $expectedEntries = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ] + ) + ->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithLargeNumberOfJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobs = []; + for ($i = 1; $i <= 25; $i++) { + $jobs[] = new Job($queue, [ + 'data' => "test{$i}", + ]); + } + + $sqsClient->sendMessageBatch( + Argument::that( + function ($args) use ($queueUrl) { + return $args['QueueUrl'] === $queueUrl && + is_array($args['Entries']) && + count($args['Entries']) === 10; + } + ) + ) + ->shouldBeCalledTimes(2); + + $sqsClient->sendMessageBatch( + Argument::that( + function ($args) use ($queueUrl) { + return $args['QueueUrl'] === $queueUrl && + is_array($args['Entries']) && + count($args['Entries']) === 5; + } + ) + ) + ->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithAllScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], 300); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], 600); + $jobs = [$job1, $job2]; + + $scheduler->method('shouldBeScheduled')->willReturn(true); + $scheduler->expects($this->exactly(2)) + ->method('store') + ->willReturnCallback( + function ($job) use ($job1, $job2) { + if ($job === $job1 || $job === $job2) { + return true; + } + throw new \InvalidArgumentException('Unexpected job'); + } + ); + + $sqsClient->getQueueUrl()->shouldNotBeCalled(); + $sqsClient->sendMessageBatch()->shouldNotBeCalled(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithEmptyArray(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->prophesize(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $jobs = []; + + $sqsClient->getQueueUrl()->shouldNotBeCalled(); + $sqsClient->sendMessageBatch()->shouldNotBeCalled(); + $scheduler->shouldBeScheduled()->shouldNotBeCalled(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler->reveal(), $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + private function mockAwsResult(array $valueMap): MockObject { $result = $this->createMock(Result::class); diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index f15e063..58ec4fe 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -96,19 +96,21 @@ public function testRetrieveWithNoResults(): void public function testRetrieveCalculatesDelay(): void { $rowData = [ - 'id' => 1, - 'tube' => 'queue', - 'data' => serialize('someData'), - 'scheduled_ts' => date('Y-m-d H:i:s'), - 'priority' => 1024, - 'ttr' => 60, + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('someData'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], ]; $stmt = $this->createMock(\PDOStatement::class); $stmt->expects(static::once()) ->method('rowCount') ->willReturn(1); $stmt->expects(static::atLeastOnce()) - ->method('fetch') + ->method('fetchAll') ->willReturn($rowData); $this->adapter->expects(static::atLeastOnce()) ->method('query') @@ -131,4 +133,169 @@ public function testRemove(): void $scheduler = new DbScheduler($this->adapter); static::assertTrue($scheduler->remove(234)); } + + public function testRetrieveBatchWithNoResults(): void + { + $stmt = $this->createMock(\PDOStatement::class); + $stmt->expects(static::once()) + ->method('rowCount') + ->willReturn(0); + $this->adapter->expects(static::once()) + ->method('query') + ->willReturn($stmt); + $scheduler = new DbScheduler($this->adapter); + static::assertFalse($scheduler->retrieveBatch()); + } + + public function testRetrieveBatchWithResults(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue1', + 'data' => serialize('data1'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + [ + 'id' => 2, + 'tube' => 'queue2', + 'data' => serialize('data2'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 512, + 'ttr' => 30, + ], + ]; + + $updateStmt = $this->createMock(\PDOStatement::class); + $updateStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + + $this->adapter->expects(static::exactly(2)) + ->method('query') + ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + + $scheduler = new DbScheduler($this->adapter); + $jobs = $scheduler->retrieveBatch(); + + static::assertIsArray($jobs); + static::assertCount(2, $jobs); + static::assertSame(1, $jobs[0]['id']); + static::assertSame('queue1', $jobs[0]['queue']); + static::assertSame('data1', $jobs[0]['data']); + static::assertSame(0, $jobs[0]['delay']); + static::assertSame(1024, $jobs[0]['priority']); + static::assertSame(60, $jobs[0]['ttr']); + } + + public function testRetrieveBatchCalculatesDelayCorrectly(): void + { + $futureTime = date('Y-m-d H:i:s', time() + 120); + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('data'), + 'scheduled_ts' => $futureTime, + 'priority' => 1024, + 'ttr' => 60, + ], + ]; + + $updateStmt = $this->createMock(\PDOStatement::class); + $updateStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + + $this->adapter->expects(static::exactly(2)) + ->method('query') + ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + + $scheduler = new DbScheduler($this->adapter); + $jobs = $scheduler->retrieveBatch(); + + static::assertIsArray($jobs); + static::assertCount(1, $jobs); + static::assertSame(120, $jobs[0]['delay']); + } + + public function testRemoveBatchWithSingleJob(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $this->adapter->expects(static::once()) + ->method('query') + ->with( + 'DELETE FROM `scheduled_queue` WHERE id IN ( ? )', + [123] + ) + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertTrue($scheduler->removeBatch([123])); + } + + public function testRemoveBatchWithMultipleJobs(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(3); + + $this->adapter->expects(static::once()) + ->method('query') + ->with( + 'DELETE FROM `scheduled_queue` WHERE id IN ( ?,?,? )', + [123, 456, 789] + ) + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertTrue($scheduler->removeBatch([123, 456, 789])); + } + + public function testRemoveBatchWithNoRowsAffected(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(0); + + $this->adapter->expects(static::once()) + ->method('query') + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertFalse($scheduler->removeBatch([999])); + } } From eac171d49e3b3d66192d1cf902fe6443a7393c2c Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Tue, 12 Aug 2025 13:36:10 +0100 Subject: [PATCH 3/6] Update query to reduce deadlocks --- CHANGELOG.md | 2 ++ src/Scheduler/DbScheduler.php | 43 +++++++++++++++++++++++------ tests/Scheduler/DbSchedulerTest.php | 24 ++++++++-------- 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 209c89f..261b020 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`. - `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor. + This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher. - `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. This defaults to `50` if not provided. - `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. diff --git a/src/Scheduler/DbScheduler.php b/src/Scheduler/DbScheduler.php index 587f0f9..e5d2ebd 100644 --- a/src/Scheduler/DbScheduler.php +++ b/src/Scheduler/DbScheduler.php @@ -20,18 +20,22 @@ class DbScheduler implements BatchableSchedulerInterface private int $minimumPickup; + private bool $skipLocked; + private int $batchSize; /** * @param integer $maximumDelay * @param integer $minimumPickup + * @param boolean $skipLocked * @param integer $batchSize */ - public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600, $batchSize = 50) + public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600, $skipLocked = false, $batchSize = 50) { $this->adapter = $adapter; $this->maximumDelay = $maximumDelay; $this->minimumPickup = $minimumPickup; + $this->skipLocked = $skipLocked; $this->batchSize = $batchSize; } @@ -75,26 +79,47 @@ public function retrieveBatch() */ private function queryJobs(int $batchSize) { - $sql = " - UPDATE scheduled_queue SET - picked_by = CONNECTION_ID(), - picked_ts = NOW() + $this->adapter->beginTransaction(); + + $sql = <<skipLocked) { + $sql .= ' SKIP LOCKED'; + } + $stmt = $this->adapter->query($sql, [ ':minimumPickup' => $this->minimumPickup, ]); - if ($stmt->rowCount() === 0) { + $rowCount = $stmt->rowCount(); + + if ($rowCount === 0) { + $this->adapter->rollBack(); return false; // no jobs } - $sql = "SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT {$batchSize}"; - $rows = $this->adapter->query($sql)->fetchAll(\PDO::FETCH_ASSOC); + $rows = $stmt->fetchAll(\PDO::FETCH_ASSOC); + + $placeholders = implode(',', array_fill(0, $rowCount, '?')); + $sql = <<adapter->query($sql, array_column($rows, 'id')); + + $this->adapter->commit(); $jobs = []; diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index 58ec4fe..eba1692 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -168,19 +168,19 @@ public function testRetrieveBatchWithResults(): void ], ]; - $updateStmt = $this->createMock(\PDOStatement::class); - $updateStmt->expects(static::once()) - ->method('rowCount') - ->willReturn(2); - $selectStmt = $this->createMock(\PDOStatement::class); $selectStmt->expects(static::once()) ->method('fetchAll') ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $updateStmt = $this->createMock(\PDOStatement::class); $this->adapter->expects(static::exactly(2)) ->method('query') - ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + ->willReturnOnConsecutiveCalls($selectStmt, $updateStmt); $scheduler = new DbScheduler($this->adapter); $jobs = $scheduler->retrieveBatch(); @@ -209,19 +209,19 @@ public function testRetrieveBatchCalculatesDelayCorrectly(): void ], ]; - $updateStmt = $this->createMock(\PDOStatement::class); - $updateStmt->expects(static::once()) - ->method('rowCount') - ->willReturn(1); - $selectStmt = $this->createMock(\PDOStatement::class); $selectStmt->expects(static::once()) ->method('fetchAll') ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $updateStmt = $this->createMock(\PDOStatement::class); $this->adapter->expects(static::exactly(2)) ->method('query') - ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + ->willReturnOnConsecutiveCalls($selectStmt, $updateStmt); $scheduler = new DbScheduler($this->adapter); $jobs = $scheduler->retrieveBatch(); From 51b5aaaf9a8e5a6052ed38757e2263ead61ade0d Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Thu, 14 Aug 2025 17:49:55 +0100 Subject: [PATCH 4/6] Add retry on deadlock support to DbScheduler --- CHANGELOG.md | 2 + README.md | 4 +- composer.json | 3 +- example/bootstrap.php | 3 +- src/Scheduler/DbScheduler.php | 68 ++++++++++- tests/Scheduler/DbSchedulerTest.php | 173 ++++++++++++++++++++++++++++ 6 files changed, 246 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 261b020..82f6a8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher. - `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. This defaults to `50` if not provided. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs. + This defaults to `null` and no retry will be attempted if not provided. - `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. - `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching. diff --git a/README.md b/README.md index 8ded5e3..6ccc084 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Bootstrap $beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost'); $db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']); -$scheduler = new \Phlib\JobQueue\DbScheduler($db, 300, 600); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true); $jobQueue = new \Phlib\JobQueue\Beanstalk\Scheduled($beanstalk, $scheduler); ``` @@ -51,6 +51,8 @@ do { } while (true); ``` +See [examples](example/) for more advance usage. + ## Jobqueue Script The script has a dependency on two constructed objects. The Job Queue interface and the Scheduler interface. In order diff --git a/composer.json b/composer.json index a7c7655..5dca8c4 100644 --- a/composer.json +++ b/composer.json @@ -23,7 +23,8 @@ "phlib/console-process": "^2", "psr/log": "^1", - "aws/aws-sdk-php": "^3.61" + "aws/aws-sdk-php": "^3.61", + "stechstudio/backoff": "^1.6" }, "require-dev": { "symplify/easy-coding-standard": "^11", diff --git a/example/bootstrap.php b/example/bootstrap.php index 68dcd63..3644c10 100644 --- a/example/bootstrap.php +++ b/example/bootstrap.php @@ -5,5 +5,6 @@ $queue = 'my-test-tube'; $beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost'); $db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'test']); -$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120); +$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120, true, 50, $backoff); $jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler); diff --git a/src/Scheduler/DbScheduler.php b/src/Scheduler/DbScheduler.php index e5d2ebd..8a0f648 100644 --- a/src/Scheduler/DbScheduler.php +++ b/src/Scheduler/DbScheduler.php @@ -6,6 +6,7 @@ use Phlib\Db\Adapter; use Phlib\JobQueue\JobInterface; +use STS\Backoff\Backoff; /** * Class DbScheduler @@ -24,19 +25,49 @@ class DbScheduler implements BatchableSchedulerInterface private int $batchSize; + private ?Backoff $backoff; + + private const MYSQL_SERIALIZATION_FAILURE = '40001'; + + private const MYSQL_DEADLOCK = '1213'; + /** * @param integer $maximumDelay * @param integer $minimumPickup * @param boolean $skipLocked * @param integer $batchSize + * @param ?Backoff $backoff */ - public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600, $skipLocked = false, $batchSize = 50) - { + public function __construct( + Adapter $adapter, + $maximumDelay = 300, + $minimumPickup = 600, + $skipLocked = false, + $batchSize = 50, + $backoff = null + ) { $this->adapter = $adapter; $this->maximumDelay = $maximumDelay; $this->minimumPickup = $minimumPickup; $this->skipLocked = $skipLocked; $this->batchSize = $batchSize; + $this->backoff = $backoff; + + if ($this->backoff) { + $this->backoff->setDecider( + function (int $attempt, int $maxAttempts, $result, ?\Throwable $e = null) { + if ($e === null) { + return false; + } + + if ($e instanceof \PDOException && $this->isDeadlock($e) && $attempt < $maxAttempts) { + return true; + } + + throw $e; + } + ); + } } public function shouldBeScheduled($delay): bool @@ -62,7 +93,7 @@ public function store(JobInterface $job): bool */ public function retrieve() { - $job = $this->queryJobs(1); + $job = $this->queryJobsWithRetry(1); return $job ? $job[0] : false; } @@ -71,7 +102,36 @@ public function retrieve() */ public function retrieveBatch() { - return $this->queryJobs($this->batchSize); + return $this->queryJobsWithRetry($this->batchSize); + } + + private function isDeadlock(\PDOException $exception): bool + { + $regex = '/SQLSTATE\[' . self::MYSQL_SERIALIZATION_FAILURE . '\].*\s' . self::MYSQL_DEADLOCK . '\s/'; + + return (string) $exception->getCode() === self::MYSQL_SERIALIZATION_FAILURE + && preg_match($regex, $exception->getMessage()); + } + + /** + * @return array|false + */ + private function queryJobsWithRetry(int $batchSize) + { + if ($this->backoff) { + return $this->backoff->run(function () use ($batchSize) { + try { + return $this->queryJobs($batchSize); + } catch (\PDOException $e) { + if ($this->adapter->getConnection()->inTransaction()) { + $this->adapter->rollBack(); + } + throw $e; + } + }); + } + + return $this->queryJobs($batchSize); } /** diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index eba1692..eb545b4 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -10,6 +10,7 @@ use Phlib\JobQueue\Scheduler\SchedulerInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use STS\Backoff\Backoff; class DbSchedulerTest extends TestCase { @@ -298,4 +299,176 @@ public function testRemoveBatchWithNoRowsAffected(): void $scheduler = new DbScheduler($this->adapter); static::assertFalse($scheduler->removeBatch([999])); } + + public function testQueryJobsWithRetrySuccessAfterDeadlock(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('data'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + ]; + + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(3)) + ->method('query') + ->willReturnCallback(function () use ($deadlockException, $selectStmt, $updateStmt) { + static $callCount = 0; + $callCount++; + if ($callCount === 1) { + throw $deadlockException; + } + return $callCount === 2 ? $selectStmt : $updateStmt; + }); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(2, 'constant', 0)); + $result = $scheduler->retrieve(); + + static::assertIsArray($result); + static::assertSame(1, $result['id']); + } + + public function testQueryJobsWithRetryThrowsExceptionAfterMaxAttempts(): void + { + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $this->adapter->expects(static::exactly(5)) + ->method('query') + ->willThrowException($deadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(5, 'constant', 0)); + + $this->expectException(\PDOException::class); + $this->expectExceptionMessage( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction' + ); + + $scheduler->retrieve(); + } + + public function testQueryJobsWithRetryThrowsNonDeadlockExceptionImmediately(): void + { + $nonDeadlockException = new \PDOException('Table does not exist'); + $nonDeadlockException->errorInfo = ['42S02', 1146, 'Table does not exist']; + + $this->adapter->expects(static::once()) + ->method('query') + ->willThrowException($nonDeadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(5, 'constant', 0)); + + $this->expectException(\PDOException::class); + $this->expectExceptionMessage('Table does not exist'); + + $scheduler->retrieve(); + } + + public function testQueryJobsWithRetryBatchSuccessAfterDeadlock(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue1', + 'data' => serialize('data1'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + [ + 'id' => 2, + 'tube' => 'queue2', + 'data' => serialize('data2'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 512, + 'ttr' => 30, + ], + ]; + + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(3)) + ->method('query') + ->willReturnCallback(function () use ($deadlockException, $selectStmt, $updateStmt) { + static $callCount = 0; + $callCount++; + if ($callCount === 1) { + throw $deadlockException; + } + return $callCount === 2 ? $selectStmt : $updateStmt; + }); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(2, 'constant', 0)); + $result = $scheduler->retrieveBatch(); + + static::assertIsArray($result); + static::assertCount(2, $result); + static::assertSame(1, $result[0]['id']); + static::assertSame(2, $result[1]['id']); + } + + public function testQueryJobsWithRetryRollsBackTransactionOnDeadlock(): void + { + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $connection = $this->createMock(\PDO::class); + $connection->expects(static::once()) + ->method('inTransaction') + ->willReturn(true); + + $this->adapter->expects(static::once()) + ->method('getConnection') + ->willReturn($connection); + + $this->adapter->expects(static::once()) + ->method('beginTransaction'); + + $this->adapter->expects(static::once()) + ->method('rollBack'); + + $this->adapter->expects(static::once()) + ->method('query') + ->willThrowException($deadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(1)); + + $this->expectException(\PDOException::class); + $scheduler->retrieve(); + } } From 7db4bc93d9657d49568896153053069b5af57c28 Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Tue, 19 Aug 2025 16:13:18 +0100 Subject: [PATCH 5/6] Tag for 2.1.0 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82f6a8e..b9dcd12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] + +## [2.1.0] - 2025-08-19 ### Added - Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches. - Extends `JobQueueInterface`. From 3e9de602380dc778cf8c52b81f4d228d46c84113 Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Wed, 20 Aug 2025 11:28:19 +0100 Subject: [PATCH 6/6] Update for PHP 8.1 and run ECS fix --- example/bootstrap.php | 4 +-- src/AwsSqs/JobQueue.php | 2 +- src/Scheduler/BatchableSchedulerInterface.php | 7 +--- src/Scheduler/DbScheduler.php | 31 +++++++----------- tests/AwsSqs/JobQueueTest.php | 32 +++++++++---------- tests/Scheduler/DbSchedulerTest.php | 14 ++++---- 6 files changed, 38 insertions(+), 52 deletions(-) diff --git a/example/bootstrap.php b/example/bootstrap.php index cc52129..7671b72 100644 --- a/example/bootstrap.php +++ b/example/bootstrap.php @@ -10,6 +10,6 @@ 'host' => '127.0.0.1', 'dbname' => 'test', ]); -$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true); +$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true); $scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120, true, 50, $backoff); -$jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler); +$jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler); diff --git a/src/AwsSqs/JobQueue.php b/src/AwsSqs/JobQueue.php index 0e0ccf4..fd0bca2 100644 --- a/src/AwsSqs/JobQueue.php +++ b/src/AwsSqs/JobQueue.php @@ -71,7 +71,7 @@ public function putBatch(array $jobs): self } $queues[$job->getQueue()][] = [ - 'Id' => (string) $key, + 'Id' => (string)$key, 'DelaySeconds' => $job->getDelay(), 'MessageBody' => JobFactory::serializeBody($job), ]; diff --git a/src/Scheduler/BatchableSchedulerInterface.php b/src/Scheduler/BatchableSchedulerInterface.php index 21d96af..eaaeb7b 100644 --- a/src/Scheduler/BatchableSchedulerInterface.php +++ b/src/Scheduler/BatchableSchedulerInterface.php @@ -5,16 +5,11 @@ namespace Phlib\JobQueue\Scheduler; /** - * Interface BatchableSchedulerInterface - * * @package Phlib\JobQueue */ interface BatchableSchedulerInterface extends SchedulerInterface { - /** - * @return array|false - */ - public function retrieveBatch(); + public function retrieveBatch(): array|false; public function removeBatch(array $jobId): bool; } diff --git a/src/Scheduler/DbScheduler.php b/src/Scheduler/DbScheduler.php index 848d208..f4f505e 100644 --- a/src/Scheduler/DbScheduler.php +++ b/src/Scheduler/DbScheduler.php @@ -23,7 +23,7 @@ public function __construct( private readonly int $minimumPickup = 600, private readonly bool $skipLocked = false, private readonly int $batchSize = 50, - private ?Backoff $backoff = null + private ?Backoff $backoff = null, ) { if ($this->backoff) { $this->backoff->setDecider( @@ -37,7 +37,7 @@ function (int $attempt, int $maxAttempts, $result, ?\Throwable $e = null) { } throw $e; - } + }, ); } } @@ -50,7 +50,7 @@ public function shouldBeScheduled($delay): bool public function store(JobInterface $job): bool { $dbTimestampFormat = 'Y-m-d H:i:s'; - return (bool) $this->insert([ + return (bool)$this->insert([ 'tube' => $job->getQueue(), 'data' => serialize($job->getBody()), 'scheduled_ts' => $job->getDatetimeDelay()->format($dbTimestampFormat), @@ -66,10 +66,7 @@ public function retrieve(): array|false return $job ? $job[0] : false; } - /** - * @return array|false - */ - public function retrieveBatch() + public function retrieveBatch(): array|false { return $this->queryJobsWithRetry($this->batchSize); } @@ -78,14 +75,11 @@ private function isDeadlock(\PDOException $exception): bool { $regex = '/SQLSTATE\[' . self::MYSQL_SERIALIZATION_FAILURE . '\].*\s' . self::MYSQL_DEADLOCK . '\s/'; - return (string) $exception->getCode() === self::MYSQL_SERIALIZATION_FAILURE + return (string)$exception->getCode() === self::MYSQL_SERIALIZATION_FAILURE && preg_match($regex, $exception->getMessage()); } - /** - * @return array|false - */ - private function queryJobsWithRetry(int $batchSize) + private function queryJobsWithRetry(int $batchSize): array|false { if ($this->backoff) { return $this->backoff->run(function () use ($batchSize) { @@ -103,10 +97,7 @@ private function queryJobsWithRetry(int $batchSize) return $this->queryJobs($batchSize); } - /** - * @return array|false - */ - private function queryJobs(int $batchSize) + private function queryJobs(int $batchSize): array|false { $this->adapter->beginTransaction(); @@ -160,12 +151,12 @@ private function queryJobs(int $batchSize) } $jobs[] = [ - 'id' => (int) $row['id'], + 'id' => (int)$row['id'], 'queue' => $row['tube'], 'data' => unserialize($row['data']), 'delay' => $delay, - 'priority' => (int) $row['priority'], - 'ttr' => (int) $row['ttr'], + 'priority' => (int)$row['priority'], + 'ttr' => (int)$row['ttr'], ]; } @@ -177,7 +168,7 @@ public function remove(int|string $jobId): bool $table = $this->adapter->quote()->identifier('scheduled_queue'); $sql = "DELETE FROM {$table} WHERE id = ?"; - return (bool) $this->adapter + return (bool)$this->adapter ->query($sql, [$jobId]) ->rowCount(); } diff --git a/tests/AwsSqs/JobQueueTest.php b/tests/AwsSqs/JobQueueTest.php index 2818067..0093943 100644 --- a/tests/AwsSqs/JobQueueTest.php +++ b/tests/AwsSqs/JobQueueTest.php @@ -158,7 +158,7 @@ public function testPutBatchWithSingleQueue(): void 'delay' => 0, 'priority' => 1024, 'ttr' => 60, - ] + ], ), ], [ @@ -173,7 +173,7 @@ public function testPutBatchWithSingleQueue(): void 'delay' => 5, 'priority' => 512, 'ttr' => 30, - ] + ], ), ], ]; @@ -182,7 +182,7 @@ public function testPutBatchWithSingleQueue(): void [ 'QueueUrl' => $queueUrl, 'Entries' => $expectedEntries, - ] + ], )->shouldBeCalledOnce(); $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); @@ -240,7 +240,7 @@ public function testPutBatchWithMultipleQueues(): void 'delay' => 0, 'priority' => 1024, 'ttr' => 60, - ] + ], ), ], [ @@ -255,7 +255,7 @@ public function testPutBatchWithMultipleQueues(): void 'delay' => 0, 'priority' => 1024, 'ttr' => 60, - ] + ], ), ], ]; @@ -273,7 +273,7 @@ public function testPutBatchWithMultipleQueues(): void 'delay' => 0, 'priority' => 1024, 'ttr' => 60, - ] + ], ), ], ]; @@ -282,14 +282,14 @@ public function testPutBatchWithMultipleQueues(): void [ 'QueueUrl' => $queueUrl1, 'Entries' => $expectedEntries1, - ] + ], )->shouldBeCalledOnce(); $sqsClient->sendMessageBatch( [ 'QueueUrl' => $queueUrl2, 'Entries' => $expectedEntries2, - ] + ], )->shouldBeCalledOnce(); $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); @@ -325,7 +325,7 @@ function ($delay) { return false; } throw new \InvalidArgumentException("Unexpected delay: {$delay}"); - } + }, ); $scheduler->expects($this->once()) @@ -351,7 +351,7 @@ function ($delay) { 'delay' => 0, 'priority' => 1024, 'ttr' => 60, - ] + ], ), ], ]; @@ -360,7 +360,7 @@ function ($delay) { [ 'QueueUrl' => $queueUrl, 'Entries' => $expectedEntries, - ] + ], ) ->shouldBeCalledOnce(); @@ -400,8 +400,8 @@ function ($args) use ($queueUrl) { return $args['QueueUrl'] === $queueUrl && is_array($args['Entries']) && count($args['Entries']) === 10; - } - ) + }, + ), ) ->shouldBeCalledTimes(2); @@ -411,8 +411,8 @@ function ($args) use ($queueUrl) { return $args['QueueUrl'] === $queueUrl && is_array($args['Entries']) && count($args['Entries']) === 5; - } - ) + }, + ), ) ->shouldBeCalledOnce(); @@ -447,7 +447,7 @@ function ($job) use ($job1, $job2) { return true; } throw new \InvalidArgumentException('Unexpected job'); - } + }, ); $sqsClient->getQueueUrl()->shouldNotBeCalled(); diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index f82a8b4..1e73015 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -243,7 +243,7 @@ public function testRemoveBatchWithSingleJob(): void ->method('query') ->with( 'DELETE FROM `scheduled_queue` WHERE id IN ( ? )', - [123] + [123], ) ->willReturn($pdoStatement); @@ -267,7 +267,7 @@ public function testRemoveBatchWithMultipleJobs(): void ->method('query') ->with( 'DELETE FROM `scheduled_queue` WHERE id IN ( ?,?,? )', - [123, 456, 789] + [123, 456, 789], ) ->willReturn($pdoStatement); @@ -310,7 +310,7 @@ public function testQueryJobsWithRetrySuccessAfterDeadlock(): void $deadlockException = new \PDOException( 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', - 40001 + 40001, ); $selectStmt = $this->createMock(\PDOStatement::class); @@ -345,7 +345,7 @@ public function testQueryJobsWithRetryThrowsExceptionAfterMaxAttempts(): void { $deadlockException = new \PDOException( 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', - 40001 + 40001, ); $this->adapter->expects(static::exactly(5)) @@ -356,7 +356,7 @@ public function testQueryJobsWithRetryThrowsExceptionAfterMaxAttempts(): void $this->expectException(\PDOException::class); $this->expectExceptionMessage( - 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction' + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', ); $scheduler->retrieve(); @@ -402,7 +402,7 @@ public function testQueryJobsWithRetryBatchSuccessAfterDeadlock(): void $deadlockException = new \PDOException( 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', - 40001 + 40001, ); $selectStmt = $this->createMock(\PDOStatement::class); @@ -439,7 +439,7 @@ public function testQueryJobsWithRetryRollsBackTransactionOnDeadlock(): void { $deadlockException = new \PDOException( 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', - 40001 + 40001, ); $connection = $this->createMock(\PDO::class);