From 85f37a1e7d9641493a3ebedd55983fecc3cca547 Mon Sep 17 00:00:00 2001 From: Xnopyt Date: Wed, 13 Aug 2025 11:27:08 +0100 Subject: [PATCH] Add support for batching --- CHANGELOG.md | 15 + src/AwsSqs/JobQueue.php | 37 +- src/BatchableJobQueueInterface.php | 13 + src/Console/MonitorCommand.php | 34 +- src/Scheduler/BatchableSchedulerInterface.php | 20 + src/Scheduler/DbScheduler.php | 77 +++- tests/AwsSqs/JobQueueTest.php | 357 ++++++++++++++++++ tests/Scheduler/DbSchedulerTest.php | 181 ++++++++- 8 files changed, 702 insertions(+), 32 deletions(-) create mode 100644 src/BatchableJobQueueInterface.php create mode 100644 src/Scheduler/BatchableSchedulerInterface.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 885c91b..209c89f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches. + - Extends `JobQueueInterface`. + - Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`. +- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches. + - Extends `SchedulerInterface`. + - Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs. + - Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success. +### Changed +- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. + This defaults to `50` if not provided. +- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. +- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching. ## [2.0.0] - 2022-09-14 ### Added diff --git a/src/AwsSqs/JobQueue.php b/src/AwsSqs/JobQueue.php index 72dfc4b..7904b38 100644 --- a/src/AwsSqs/JobQueue.php +++ b/src/AwsSqs/JobQueue.php @@ -6,14 +6,14 @@ use Aws\Sqs\Exception\SqsException; use Aws\Sqs\SqsClient; +use Phlib\JobQueue\BatchableJobQueueInterface; use Phlib\JobQueue\Exception\InvalidArgumentException; use Phlib\JobQueue\Exception\RuntimeException; use Phlib\JobQueue\Job; use Phlib\JobQueue\JobInterface; -use Phlib\JobQueue\JobQueueInterface; use Phlib\JobQueue\Scheduler\SchedulerInterface; -class JobQueue implements JobQueueInterface +class JobQueue implements BatchableJobQueueInterface { private SqsClient $client; @@ -69,6 +69,39 @@ public function put(JobInterface $job): self } } + public function putBatch(array $jobs): self + { + try { + $queues = []; + + foreach ($jobs as $key => $job) { + if ($this->scheduler->shouldBeScheduled($job->getDelay())) { + $this->scheduler->store($job); + continue; + } + + $queues[$job->getQueue()][] = [ + 'Id' => (string) $key, + 'DelaySeconds' => $job->getDelay(), + 'MessageBody' => JobFactory::serializeBody($job), + ]; + } + + foreach ($queues as $queue => $jobs) { + foreach (array_chunk($jobs, 10) as $batch) { + $this->client->sendMessageBatch([ + 'QueueUrl' => $this->getQueueUrlWithPrefix($queue), + 'Entries' => $batch, + ]); + } + } + + return $this; + } catch (SqsException $exception) { + throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); + } + } + public function retrieve(string $queue): ?JobInterface { try { diff --git a/src/BatchableJobQueueInterface.php b/src/BatchableJobQueueInterface.php new file mode 100644 index 0000000..873d4d3 --- /dev/null +++ b/src/BatchableJobQueueInterface.php @@ -0,0 +1,13 @@ +logFile = $logFile; } - while ($jobData = $this->scheduler->retrieve()) { - $output->writeln("Job {$jobData['id']} added."); - $this->jobQueue->put($this->createJob($jobData)); - $this->scheduler->remove($jobData['id']); + if ($this->scheduler instanceof BatchableSchedulerInterface) { + while ($jobsData = $this->scheduler->retrieveBatch()) { + $jobs = []; + foreach ($jobsData as $jobData) { + $output->writeln("Job {$jobData['id']} added."); + $jobs[] = $this->createJob($jobData); + } + $this->putJobBatch($jobs); + $this->scheduler->removeBatch(array_column($jobsData, 'id')); + } + } else { + while ($jobData = $this->scheduler->retrieve()) { + $output->writeln("Job {$jobData['id']} added."); + $this->jobQueue->put($this->createJob($jobData)); + $this->scheduler->remove($jobData['id']); + } } return 0; @@ -71,6 +85,18 @@ protected function createJob(array $schedulerJob): JobInterface ); } + protected function putJobBatch(array $jobs): void + { + if ($this->jobQueue instanceof BatchableJobQueueInterface) { + $this->jobQueue->putBatch($jobs); + return; + } + + foreach ($jobs as $job) { + $this->jobQueue->put($job); + } + } + protected function createChildOutput(): OutputInterface { if (empty($this->logFile)) { diff --git a/src/Scheduler/BatchableSchedulerInterface.php b/src/Scheduler/BatchableSchedulerInterface.php new file mode 100644 index 0000000..21d96af --- /dev/null +++ b/src/Scheduler/BatchableSchedulerInterface.php @@ -0,0 +1,20 @@ +adapter = $adapter; $this->maximumDelay = $maximumDelay; $this->minimumPickup = $minimumPickup; + $this->batchSize = $batchSize; } public function shouldBeScheduled($delay): bool @@ -53,7 +58,24 @@ public function store(JobInterface $job): bool */ public function retrieve() { - $sql = ' + $job = $this->queryJobs(1); + return $job ? $job[0] : false; + } + + /** + * @return array|false + */ + public function retrieveBatch() + { + return $this->queryJobs($this->batchSize); + } + + /** + * @return array|false + */ + private function queryJobs(int $batchSize) + { + $sql = " UPDATE scheduled_queue SET picked_by = CONNECTION_ID(), picked_ts = NOW() @@ -62,31 +84,38 @@ public function retrieve() picked_by IS NULL ORDER BY scheduled_ts DESC - LIMIT 1'; + LIMIT {$batchSize}"; $stmt = $this->adapter->query($sql, [ ':minimumPickup' => $this->minimumPickup, ]); + if ($stmt->rowCount() === 0) { return false; // no jobs } - $sql = 'SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT 1'; - $row = $this->adapter->query($sql)->fetch(\PDO::FETCH_ASSOC); - - $scheduledTime = strtotime($row['scheduled_ts']); - $delay = $scheduledTime - time(); - if ($delay < 0) { - $delay = 0; + $sql = "SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT {$batchSize}"; + $rows = $this->adapter->query($sql)->fetchAll(\PDO::FETCH_ASSOC); + + $jobs = []; + + foreach ($rows as $row) { + $scheduledTime = strtotime($row['scheduled_ts']); + $delay = $scheduledTime - time(); + if ($delay < 0) { + $delay = 0; + } + + $jobs[] = [ + 'id' => (int) $row['id'], + 'queue' => $row['tube'], + 'data' => unserialize($row['data']), + 'delay' => $delay, + 'priority' => (int) $row['priority'], + 'ttr' => (int) $row['ttr'], + ]; } - return [ - 'id' => $row['id'], - 'queue' => $row['tube'], - 'data' => unserialize($row['data']), - 'delay' => $delay, - 'priority' => $row['priority'], - 'ttr' => $row['ttr'], - ]; + return $jobs; } /** @@ -102,6 +131,16 @@ public function remove($jobId): bool ->rowCount(); } + public function removeBatch(array $jobIds): bool + { + $table = $this->adapter->quote()->identifier('scheduled_queue'); + $sql = "DELETE FROM {$table} WHERE id IN ( " . implode(',', array_fill(0, count($jobIds), '?')) . ' )'; + + return $this->adapter + ->query($sql, $jobIds) + ->rowCount() === count($jobIds); + } + protected function insert(array $data): int { $table = $this->adapter->quote()->identifier('scheduled_queue'); diff --git a/tests/AwsSqs/JobQueueTest.php b/tests/AwsSqs/JobQueueTest.php index 6f19fdc..94e999b 100644 --- a/tests/AwsSqs/JobQueueTest.php +++ b/tests/AwsSqs/JobQueueTest.php @@ -118,6 +118,363 @@ public function testMarkAsErrorWithPrefix(): void $jobQueue->markAsError($job); } + public function testPutBatchWithSingleQueue(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], null, 0, 1024, 60); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], null, 5, 512, 30); + $jobs = [$job1, $job2]; + + $expectedEntries = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + [ + 'Id' => '1', + 'DelaySeconds' => 5, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 5, + 'priority' => 512, + 'ttr' => 30, + ] + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ] + )->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithMultipleQueues(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue1 = 'test-queue-1'; + $queue2 = 'test-queue-2'; + $queueUrl1 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-1'; + $queueUrl2 = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue-2'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue1, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl1]])); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue2, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl2]])); + + $job1 = new Job($queue1, [ + 'data' => 'test1', + ]); + $job2 = new Job($queue2, [ + 'data' => 'test2', + ]); + $job3 = new Job($queue1, [ + 'data' => 'test3', + ]); + $jobs = [$job1, $job2, $job3]; + + $expectedEntries1 = [ + [ + 'Id' => '0', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue1, + 'body' => [ + 'data' => 'test1', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + [ + 'Id' => '2', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue1, + 'body' => [ + 'data' => 'test3', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + ]; + + $expectedEntries2 = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue2, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl1, + 'Entries' => $expectedEntries1, + ] + )->shouldBeCalledOnce(); + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl2, + 'Entries' => $expectedEntries2, + ] + )->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], null, 300); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], null, 0); + $jobs = [$job1, $job2]; + + $scheduler->expects($this->exactly(2)) + ->method('shouldBeScheduled') + ->willReturnCallback( + function ($delay) { + if ($delay === 300) { + return true; + } elseif ($delay === 0) { + return false; + } + throw new \InvalidArgumentException("Unexpected delay: {$delay}"); + } + ); + + $scheduler->expects($this->once()) + ->method('store') + ->with($job1); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $expectedEntries = [ + [ + 'Id' => '1', + 'DelaySeconds' => 0, + 'MessageBody' => json_encode( + [ + 'queue' => $queue, + 'body' => [ + 'data' => 'test2', + ], + 'delay' => 0, + 'priority' => 1024, + 'ttr' => 60, + ] + ), + ], + ]; + + $sqsClient->sendMessageBatch( + [ + 'QueueUrl' => $queueUrl, + 'Entries' => $expectedEntries, + ] + ) + ->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithLargeNumberOfJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + $queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/prefix-test-queue'; + + $scheduler->method('shouldBeScheduled')->willReturn(false); + + $sqsClient->getQueueUrl([ + 'QueueName' => $queuePrefix . $queue, + ]) + ->shouldBeCalledOnce() + ->willReturn($this->mockAwsResult([['QueueUrl', $queueUrl]])); + + $jobs = []; + for ($i = 1; $i <= 25; $i++) { + $jobs[] = new Job($queue, [ + 'data' => "test{$i}", + ]); + } + + $sqsClient->sendMessageBatch( + Argument::that( + function ($args) use ($queueUrl) { + return $args['QueueUrl'] === $queueUrl && + is_array($args['Entries']) && + count($args['Entries']) === 10; + } + ) + ) + ->shouldBeCalledTimes(2); + + $sqsClient->sendMessageBatch( + Argument::that( + function ($args) use ($queueUrl) { + return $args['QueueUrl'] === $queueUrl && + is_array($args['Entries']) && + count($args['Entries']) === 5; + } + ) + ) + ->shouldBeCalledOnce(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithAllScheduledJobs(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->createMock(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $queue = 'test-queue'; + + $job1 = new Job($queue, [ + 'data' => 'test1', + ], 300); + $job2 = new Job($queue, [ + 'data' => 'test2', + ], 600); + $jobs = [$job1, $job2]; + + $scheduler->method('shouldBeScheduled')->willReturn(true); + $scheduler->expects($this->exactly(2)) + ->method('store') + ->willReturnCallback( + function ($job) use ($job1, $job2) { + if ($job === $job1 || $job === $job2) { + return true; + } + throw new \InvalidArgumentException('Unexpected job'); + } + ); + + $sqsClient->getQueueUrl()->shouldNotBeCalled(); + $sqsClient->sendMessageBatch()->shouldNotBeCalled(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler, $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + + public function testPutBatchWithEmptyArray(): void + { + $sqsClient = $this->prophesize(SqsClient::class); + $scheduler = $this->prophesize(SchedulerInterface::class); + + $queuePrefix = 'prefix-'; + $jobs = []; + + $sqsClient->getQueueUrl()->shouldNotBeCalled(); + $sqsClient->sendMessageBatch()->shouldNotBeCalled(); + $scheduler->shouldBeScheduled()->shouldNotBeCalled(); + + $jobQueue = new JobQueue($sqsClient->reveal(), $scheduler->reveal(), $queuePrefix); + $result = $jobQueue->putBatch($jobs); + + $this->assertSame($jobQueue, $result); + } + private function mockAwsResult(array $valueMap): MockObject { $result = $this->createMock(Result::class); diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index f15e063..58ec4fe 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -96,19 +96,21 @@ public function testRetrieveWithNoResults(): void public function testRetrieveCalculatesDelay(): void { $rowData = [ - 'id' => 1, - 'tube' => 'queue', - 'data' => serialize('someData'), - 'scheduled_ts' => date('Y-m-d H:i:s'), - 'priority' => 1024, - 'ttr' => 60, + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('someData'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], ]; $stmt = $this->createMock(\PDOStatement::class); $stmt->expects(static::once()) ->method('rowCount') ->willReturn(1); $stmt->expects(static::atLeastOnce()) - ->method('fetch') + ->method('fetchAll') ->willReturn($rowData); $this->adapter->expects(static::atLeastOnce()) ->method('query') @@ -131,4 +133,169 @@ public function testRemove(): void $scheduler = new DbScheduler($this->adapter); static::assertTrue($scheduler->remove(234)); } + + public function testRetrieveBatchWithNoResults(): void + { + $stmt = $this->createMock(\PDOStatement::class); + $stmt->expects(static::once()) + ->method('rowCount') + ->willReturn(0); + $this->adapter->expects(static::once()) + ->method('query') + ->willReturn($stmt); + $scheduler = new DbScheduler($this->adapter); + static::assertFalse($scheduler->retrieveBatch()); + } + + public function testRetrieveBatchWithResults(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue1', + 'data' => serialize('data1'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + [ + 'id' => 2, + 'tube' => 'queue2', + 'data' => serialize('data2'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 512, + 'ttr' => 30, + ], + ]; + + $updateStmt = $this->createMock(\PDOStatement::class); + $updateStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + + $this->adapter->expects(static::exactly(2)) + ->method('query') + ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + + $scheduler = new DbScheduler($this->adapter); + $jobs = $scheduler->retrieveBatch(); + + static::assertIsArray($jobs); + static::assertCount(2, $jobs); + static::assertSame(1, $jobs[0]['id']); + static::assertSame('queue1', $jobs[0]['queue']); + static::assertSame('data1', $jobs[0]['data']); + static::assertSame(0, $jobs[0]['delay']); + static::assertSame(1024, $jobs[0]['priority']); + static::assertSame(60, $jobs[0]['ttr']); + } + + public function testRetrieveBatchCalculatesDelayCorrectly(): void + { + $futureTime = date('Y-m-d H:i:s', time() + 120); + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('data'), + 'scheduled_ts' => $futureTime, + 'priority' => 1024, + 'ttr' => 60, + ], + ]; + + $updateStmt = $this->createMock(\PDOStatement::class); + $updateStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + + $this->adapter->expects(static::exactly(2)) + ->method('query') + ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + + $scheduler = new DbScheduler($this->adapter); + $jobs = $scheduler->retrieveBatch(); + + static::assertIsArray($jobs); + static::assertCount(1, $jobs); + static::assertSame(120, $jobs[0]['delay']); + } + + public function testRemoveBatchWithSingleJob(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $this->adapter->expects(static::once()) + ->method('query') + ->with( + 'DELETE FROM `scheduled_queue` WHERE id IN ( ? )', + [123] + ) + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertTrue($scheduler->removeBatch([123])); + } + + public function testRemoveBatchWithMultipleJobs(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(3); + + $this->adapter->expects(static::once()) + ->method('query') + ->with( + 'DELETE FROM `scheduled_queue` WHERE id IN ( ?,?,? )', + [123, 456, 789] + ) + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertTrue($scheduler->removeBatch([123, 456, 789])); + } + + public function testRemoveBatchWithNoRowsAffected(): void + { + $this->quote->expects(static::once()) + ->method('identifier') + ->with('scheduled_queue') + ->willReturn('`scheduled_queue`'); + + $pdoStatement = $this->createMock(\PDOStatement::class); + $pdoStatement->expects(static::once()) + ->method('rowCount') + ->willReturn(0); + + $this->adapter->expects(static::once()) + ->method('query') + ->willReturn($pdoStatement); + + $scheduler = new DbScheduler($this->adapter); + static::assertFalse($scheduler->removeBatch([999])); + } }