diff --git a/CHANGELOG.md b/CHANGELOG.md index ff92901..d692e70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,25 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches. + - Extends `JobQueueInterface`. + - Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`. +- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches. + - Extends `SchedulerInterface`. + - Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs. + - Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success. +### Changed +- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor. + This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. + This defaults to `50` if not provided. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs. + This defaults to `null` and no retry will be attempted if not provided. +- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. +- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching. ## [3.0.3] - 2025-07-31 ### Fixed @@ -28,6 +47,27 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - **BC break**: Removed support for PHP versions <= v8.0 as they are no longer [actively supported](https://php.net/supported-versions.php) by the PHP project. +## [2.1.0] - 2025-08-19 +### Added +- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches. + - Extends `JobQueueInterface`. + - Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`. +- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches. + - Extends `SchedulerInterface`. + - Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs. + - Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success. +### Changed +- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor. + This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. + This defaults to `50` if not provided. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs. + This defaults to `null` and no retry will be attempted if not provided. +- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. +- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching. + ## [2.0.0] - 2022-09-14 ### Added - Add support for PHP v8 diff --git a/README.md b/README.md index 5531882..fb1bf18 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Bootstrap $beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost'); $db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']); -$scheduler = new \Phlib\JobQueue\DbScheduler($db, 300, 600); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true); $jobQueue = new \Phlib\JobQueue\Beanstalk\Scheduled($beanstalk, $scheduler); ``` @@ -51,6 +51,8 @@ do { } while (true); ``` +See [examples](example/) for more advance usage. + ## Jobqueue Script The script has a dependency on two constructed objects. The Job Queue interface and the Scheduler interface. In order diff --git a/composer.json b/composer.json index 91cf7fb..0751765 100644 --- a/composer.json +++ b/composer.json @@ -23,7 +23,8 @@ "phlib/console-process": "^3 || ^4", "psr/log": "^1 || ^2 || ^3", - "aws/aws-sdk-php": "^3.61" + "aws/aws-sdk-php": "^3.61", + "stechstudio/backoff": "^1.6" }, "require-dev": { "symplify/easy-coding-standard": "^12", diff --git a/example/bootstrap.php b/example/bootstrap.php index 092f9a7..7671b72 100644 --- a/example/bootstrap.php +++ b/example/bootstrap.php @@ -10,5 +10,6 @@ 'host' => '127.0.0.1', 'dbname' => 'test', ]); -$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120); +$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120, true, 50, $backoff); $jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler); diff --git a/src/AwsSqs/JobQueue.php b/src/AwsSqs/JobQueue.php index 31e5c5d..fd0bca2 100644 --- a/src/AwsSqs/JobQueue.php +++ b/src/AwsSqs/JobQueue.php @@ -6,17 +6,17 @@ use Aws\Sqs\Exception\SqsException; use Aws\Sqs\SqsClient; +use Phlib\JobQueue\BatchableJobQueueInterface; use Phlib\JobQueue\Exception\InvalidArgumentException; use Phlib\JobQueue\Exception\RuntimeException; use Phlib\JobQueue\Job; use Phlib\JobQueue\JobInterface; -use Phlib\JobQueue\JobQueueInterface; use Phlib\JobQueue\Scheduler\SchedulerInterface; /** * @package Phlib\JobQueue */ -class JobQueue implements JobQueueInterface +class JobQueue implements BatchableJobQueueInterface { private int $retrieveTimeout = 10; @@ -59,6 +59,39 @@ public function put(JobInterface $job): self } } + public function putBatch(array $jobs): self + { + try { + $queues = []; + + foreach ($jobs as $key => $job) { + if ($this->scheduler->shouldBeScheduled($job->getDelay())) { + $this->scheduler->store($job); + continue; + } + + $queues[$job->getQueue()][] = [ + 'Id' => (string)$key, + 'DelaySeconds' => $job->getDelay(), + 'MessageBody' => JobFactory::serializeBody($job), + ]; + } + + foreach ($queues as $queue => $jobs) { + foreach (array_chunk($jobs, 10) as $batch) { + $this->client->sendMessageBatch([ + 'QueueUrl' => $this->getQueueUrlWithPrefix($queue), + 'Entries' => $batch, + ]); + } + } + + return $this; + } catch (SqsException $exception) { + throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); + } + } + public function retrieve(string $queue): ?JobInterface { try { diff --git a/src/BatchableJobQueueInterface.php b/src/BatchableJobQueueInterface.php new file mode 100644 index 0000000..873d4d3 --- /dev/null +++ b/src/BatchableJobQueueInterface.php @@ -0,0 +1,13 @@ +logFile = $logFile; } - while ($jobData = $this->scheduler->retrieve()) { - $output->writeln("Job {$jobData['id']} added."); - $this->jobQueue->put($this->createJob($jobData)); - $this->scheduler->remove($jobData['id']); + if ($this->scheduler instanceof BatchableSchedulerInterface) { + while ($jobsData = $this->scheduler->retrieveBatch()) { + $jobs = []; + foreach ($jobsData as $jobData) { + $output->writeln("Job {$jobData['id']} added."); + $jobs[] = $this->createJob($jobData); + } + $this->putJobBatch($jobs); + $this->scheduler->removeBatch(array_column($jobsData, 'id')); + } + } else { + while ($jobData = $this->scheduler->retrieve()) { + $output->writeln("Job {$jobData['id']} added."); + $this->jobQueue->put($this->createJob($jobData)); + $this->scheduler->remove($jobData['id']); + } } return 0; @@ -70,6 +84,18 @@ protected function createJob(array $schedulerJob): JobInterface ); } + protected function putJobBatch(array $jobs): void + { + if ($this->jobQueue instanceof BatchableJobQueueInterface) { + $this->jobQueue->putBatch($jobs); + return; + } + + foreach ($jobs as $job) { + $this->jobQueue->put($job); + } + } + protected function createChildOutput(?string $childLogFilename): OutputInterface { if (!isset($this->logFile) || ($this->logFile === '' || $this->logFile === '0')) { diff --git a/src/Scheduler/BatchableSchedulerInterface.php b/src/Scheduler/BatchableSchedulerInterface.php new file mode 100644 index 0000000..eaaeb7b --- /dev/null +++ b/src/Scheduler/BatchableSchedulerInterface.php @@ -0,0 +1,15 @@ +backoff) { + $this->backoff->setDecider( + function (int $attempt, int $maxAttempts, $result, ?\Throwable $e = null) { + if ($e === null) { + return false; + } + + if ($e instanceof \PDOException && $this->isDeadlock($e) && $attempt < $maxAttempts) { + return true; + } + + throw $e; + }, + ); + } } public function shouldBeScheduled($delay): bool @@ -39,40 +62,105 @@ public function store(JobInterface $job): bool public function retrieve(): array|false { - $sql = ' - UPDATE scheduled_queue SET - picked_by = CONNECTION_ID(), - picked_ts = NOW() + $job = $this->queryJobsWithRetry(1); + return $job ? $job[0] : false; + } + + public function retrieveBatch(): array|false + { + return $this->queryJobsWithRetry($this->batchSize); + } + + private function isDeadlock(\PDOException $exception): bool + { + $regex = '/SQLSTATE\[' . self::MYSQL_SERIALIZATION_FAILURE . '\].*\s' . self::MYSQL_DEADLOCK . '\s/'; + + return (string)$exception->getCode() === self::MYSQL_SERIALIZATION_FAILURE + && preg_match($regex, $exception->getMessage()); + } + + private function queryJobsWithRetry(int $batchSize): array|false + { + if ($this->backoff) { + return $this->backoff->run(function () use ($batchSize) { + try { + return $this->queryJobs($batchSize); + } catch (\PDOException $e) { + if ($this->adapter->getConnection()->inTransaction()) { + $this->adapter->rollBack(); + } + throw $e; + } + }); + } + + return $this->queryJobs($batchSize); + } + + private function queryJobs(int $batchSize): array|false + { + $this->adapter->beginTransaction(); + + $sql = <<skipLocked) { + $sql .= ' SKIP LOCKED'; + } + $stmt = $this->adapter->query($sql, [ ':minimumPickup' => $this->minimumPickup, ]); - if ($stmt->rowCount() === 0) { + + $rowCount = $stmt->rowCount(); + + if ($rowCount === 0) { + $this->adapter->rollBack(); return false; // no jobs } - $sql = 'SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT 1'; - $row = $this->adapter->query($sql)->fetch(\PDO::FETCH_ASSOC); + $rows = $stmt->fetchAll(\PDO::FETCH_ASSOC); + + $placeholders = implode(',', array_fill(0, $rowCount, '?')); + $sql = <<adapter->query($sql, array_column($rows, 'id')); + + $this->adapter->commit(); + + $jobs = []; + + foreach ($rows as $row) { + $scheduledTime = strtotime($row['scheduled_ts']); + $delay = $scheduledTime - time(); + if ($delay < 0) { + $delay = 0; + } + + $jobs[] = [ + 'id' => (int)$row['id'], + 'queue' => $row['tube'], + 'data' => unserialize($row['data']), + 'delay' => $delay, + 'priority' => (int)$row['priority'], + 'ttr' => (int)$row['ttr'], + ]; } - return [ - 'id' => $row['id'], - 'queue' => $row['tube'], - 'data' => unserialize($row['data']), - 'delay' => $delay, - 'priority' => $row['priority'], - 'ttr' => $row['ttr'], - ]; + return $jobs; } public function remove(int|string $jobId): bool @@ -85,6 +173,16 @@ public function remove(int|string $jobId): bool ->rowCount(); } + public function removeBatch(array $jobIds): bool + { + $table = $this->adapter->quote()->identifier('scheduled_queue'); + $sql = "DELETE FROM {$table} WHERE id IN ( " . implode(',', array_fill(0, count($jobIds), '?')) . ' )'; + + return $this->adapter + ->query($sql, $jobIds) + ->rowCount() === count($jobIds); + } + protected function insert(array $data): int { $table = $this->adapter->quote()->identifier('scheduled_queue'); diff --git a/tests/AwsSqs/JobQueueTest.php b/tests/AwsSqs/JobQueueTest.php index d0d5b32..0093943 100644 --- a/tests/AwsSqs/JobQueueTest.php +++ b/tests/AwsSqs/JobQueueTest.php @@ -120,6 +120,363 @@ public function testMarkAsErrorWithPrefix(): void $jobQueue->markAsError($job); } + public function testPutBatchWithSingleQueue(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], null, 0, 1024, 60); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], null, 5, 512, 30); + $jobs = [$job1, $job2]; + + $expectedEntries = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ], + ), + ], + [ + 'Id' => '1', + 'DelaySeconds' => 5, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 5, + 'priority' => 512, + 'ttr' => 30, + ], + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ], + )->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithMultipleQueues(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue1 = 'test-queue-1'; + $queue2 = 'test-queue-2'; + $queueUrl1 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-1'; + $queueUrl2 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-2'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue1, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl1]])); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue2, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl2]])); + + $job1 = new Job($queue1, [ + 'data' => 'test1', + ]); + $job2 = new Job($queue2, [ + 'data' => 'test2', + ]); + $job3 = new Job($queue1, [ + 'data' => 'test3', + ]); + $jobs = [$job1, $job2, $job3]; + + $expectedEntries1 = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue1, + 'body' => [ + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ], + ), + ], + [ + 'Id' => '2', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue1, + 'body' => [ + 'data' => 'test3', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ], + ), + ], + ]; + + $expectedEntries2 = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue2, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ], + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl1, + 'Entries' => $expectedEntries1, + ], + )->shouldBeCalledOnce(); + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl2, + 'Entries' => $expectedEntries2, + ], + )->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], null, 300); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], null, 0); + $jobs = [$job1, $job2]; + + $scheduler->expects($this->exactly(2)) + ->method('shouldBeScheduled') + ->willReturnCallback( + function ($delay) { + if ($delay === 300) { + return true; + } elseif ($delay === 0) { + return false; + } + throw new \InvalidArgumentException("Unexpected delay: {$delay}"); + }, + ); + + $scheduler->expects($this->once()) + ->method('store') + ->with($job1); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $expectedEntries = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ], + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ], + ) + ->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithLargeNumberOfJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobs = []; + for ($i = 1; $i <= 25; $i++) { + $jobs[] = new Job($queue, [ + 'data' => "test{$i}", + ]); + } + + $sqsClient->sendMessageBatch( + Argument::that( + function ($args) use ($queueUrl) { + return $args['QueueUrl'] === $queueUrl && + is_array($args['Entries']) && + count($args['Entries']) === 10; + }, + ), + ) + ->shouldBeCalledTimes(2); + + $sqsClient->sendMessageBatch( + Argument::that( + function ($args) use ($queueUrl) { + return $args['QueueUrl'] === $queueUrl && + is_array($args['Entries']) && + count($args['Entries']) === 5; + }, + ), + ) + ->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithAllScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], 300); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], 600); + $jobs = [$job1, $job2]; + + $scheduler->method('shouldBeScheduled')->willReturn(true); + $scheduler->expects($this->exactly(2)) + ->method('store') + ->willReturnCallback( + function ($job) use ($job1, $job2) { + if ($job === $job1 || $job === $job2) { + return true; + } + throw new \InvalidArgumentException('Unexpected job'); + }, + ); + + $sqsClient->getQueueUrl()->shouldNotBeCalled(); + $sqsClient->sendMessageBatch()->shouldNotBeCalled(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithEmptyArray(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->prophesize(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $jobs = []; + + $sqsClient->getQueueUrl()->shouldNotBeCalled(); + $sqsClient->sendMessageBatch()->shouldNotBeCalled(); + $scheduler->shouldBeScheduled()->shouldNotBeCalled(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler->reveal(), $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + private function mockAwsResult(array $valueMap): MockObject { $result = $this->createMock(Result::class); diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index 1f0257d..1e73015 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -8,6 +8,7 @@ use Phlib\JobQueue\JobInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use STS\Backoff\Backoff; /** * @package Phlib\JobQueue @@ -91,19 +92,21 @@ public function testRetrieveWithNoResults(): void public function testRetrieveCalculatesDelay(): void { $rowData = [ - 'id' => 1, - 'tube' => 'queue', - 'data' => serialize('someData'), - 'scheduled_ts' => date('Y-m-d H:i:s'), - 'priority' => 1024, - 'ttr' => 60, + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('someData'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], ]; $stmt = $this->createMock(\PDOStatement::class); $stmt->expects(static::once()) ->method('rowCount') ->willReturn(1); $stmt->expects(static::atLeastOnce()) - ->method('fetch') + ->method('fetchAll') ->willReturn($rowData); $this->adapter->expects(static::atLeastOnce()) ->method('query') @@ -126,4 +129,341 @@ public function testRemove(): void $scheduler = new DbScheduler($this->adapter); static::assertTrue($scheduler->remove(234)); } + + public function testRetrieveBatchWithNoResults(): void + { + $stmt = $this->createMock(\PDOStatement::class); + $stmt->expects(static::once()) + ->method('rowCount') + ->willReturn(0); + $this->adapter->expects(static::once()) + ->method('query') + ->willReturn($stmt); + $scheduler = new DbScheduler($this->adapter); + static::assertFalse($scheduler->retrieveBatch()); + } + + public function testRetrieveBatchWithResults(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue1', + 'data' => serialize('data1'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + [ + 'id' => 2, + 'tube' => 'queue2', + 'data' => serialize('data2'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 512, + 'ttr' => 30, + ], + ]; + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(2)) + ->method('query') + ->willReturnOnConsecutiveCalls($selectStmt, $updateStmt); + + $scheduler = new DbScheduler($this->adapter); + $jobs = $scheduler->retrieveBatch(); + + static::assertIsArray($jobs); + static::assertCount(2, $jobs); + static::assertSame(1, $jobs[0]['id']); + static::assertSame('queue1', $jobs[0]['queue']); + static::assertSame('data1', $jobs[0]['data']); + static::assertSame(0, $jobs[0]['delay']); + static::assertSame(1024, $jobs[0]['priority']); + static::assertSame(60, $jobs[0]['ttr']); + } + + public function testRetrieveBatchCalculatesDelayCorrectly(): void + { + $futureTime = date('Y-m-d H:i:s', time() + 120); + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('data'), + 'scheduled_ts' => $futureTime, + 'priority' => 1024, + 'ttr' => 60, + ], + ]; + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(2)) + ->method('query') + ->willReturnOnConsecutiveCalls($selectStmt, $updateStmt); + + $scheduler = new DbScheduler($this->adapter); + $jobs = $scheduler->retrieveBatch(); + + static::assertIsArray($jobs); + static::assertCount(1, $jobs); + static::assertSame(120, $jobs[0]['delay']); + } + + public function testRemoveBatchWithSingleJob(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $this->adapter->expects(static::once()) + ->method('query') + ->with( + 'DELETE FROM `scheduled_queue` WHERE id IN ( ? )', + [123], + ) + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertTrue($scheduler->removeBatch([123])); + } + + public function testRemoveBatchWithMultipleJobs(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(3); + + $this->adapter->expects(static::once()) + ->method('query') + ->with( + 'DELETE FROM `scheduled_queue` WHERE id IN ( ?,?,? )', + [123, 456, 789], + ) + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertTrue($scheduler->removeBatch([123, 456, 789])); + } + + public function testRemoveBatchWithNoRowsAffected(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(0); + + $this->adapter->expects(static::once()) + ->method('query') + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertFalse($scheduler->removeBatch([999])); + } + + public function testQueryJobsWithRetrySuccessAfterDeadlock(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('data'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + ]; + + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001, + ); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(3)) + ->method('query') + ->willReturnCallback(function () use ($deadlockException, $selectStmt, $updateStmt) { + static $callCount = 0; + $callCount++; + if ($callCount === 1) { + throw $deadlockException; + } + return $callCount === 2 ? $selectStmt : $updateStmt; + }); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(2, 'constant', 0)); + $result = $scheduler->retrieve(); + + static::assertIsArray($result); + static::assertSame(1, $result['id']); + } + + public function testQueryJobsWithRetryThrowsExceptionAfterMaxAttempts(): void + { + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001, + ); + + $this->adapter->expects(static::exactly(5)) + ->method('query') + ->willThrowException($deadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(5, 'constant', 0)); + + $this->expectException(\PDOException::class); + $this->expectExceptionMessage( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + ); + + $scheduler->retrieve(); + } + + public function testQueryJobsWithRetryThrowsNonDeadlockExceptionImmediately(): void + { + $nonDeadlockException = new \PDOException('Table does not exist'); + $nonDeadlockException->errorInfo = ['42S02', 1146, 'Table does not exist']; + + $this->adapter->expects(static::once()) + ->method('query') + ->willThrowException($nonDeadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(5, 'constant', 0)); + + $this->expectException(\PDOException::class); + $this->expectExceptionMessage('Table does not exist'); + + $scheduler->retrieve(); + } + + public function testQueryJobsWithRetryBatchSuccessAfterDeadlock(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue1', + 'data' => serialize('data1'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + [ + 'id' => 2, + 'tube' => 'queue2', + 'data' => serialize('data2'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 512, + 'ttr' => 30, + ], + ]; + + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001, + ); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(3)) + ->method('query') + ->willReturnCallback(function () use ($deadlockException, $selectStmt, $updateStmt) { + static $callCount = 0; + $callCount++; + if ($callCount === 1) { + throw $deadlockException; + } + return $callCount === 2 ? $selectStmt : $updateStmt; + }); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(2, 'constant', 0)); + $result = $scheduler->retrieveBatch(); + + static::assertIsArray($result); + static::assertCount(2, $result); + static::assertSame(1, $result[0]['id']); + static::assertSame(2, $result[1]['id']); + } + + public function testQueryJobsWithRetryRollsBackTransactionOnDeadlock(): void + { + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001, + ); + + $connection = $this->createMock(\PDO::class); + $connection->expects(static::once()) + ->method('inTransaction') + ->willReturn(true); + + $this->adapter->expects(static::once()) + ->method('getConnection') + ->willReturn($connection); + + $this->adapter->expects(static::once()) + ->method('beginTransaction'); + + $this->adapter->expects(static::once()) + ->method('rollBack'); + + $this->adapter->expects(static::once()) + ->method('query') + ->willThrowException($deadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(1)); + + $this->expectException(\PDOException::class); + $scheduler->retrieve(); + } }