diff --git a/CHANGELOG.md b/CHANGELOG.md index 209c89f..82f6a8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`. - `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor. + This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher. - `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query. This defaults to `50` if not provided. +- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs. + This defaults to `null` and no retry will be attempted if not provided. - `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`. - `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching. diff --git a/README.md b/README.md index 8ded5e3..6ccc084 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Bootstrap $beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost'); $db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']); -$scheduler = new \Phlib\JobQueue\DbScheduler($db, 300, 600); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true); $jobQueue = new \Phlib\JobQueue\Beanstalk\Scheduled($beanstalk, $scheduler); ``` @@ -51,6 +51,8 @@ do { } while (true); ``` +See [examples](example/) for more advance usage. + ## Jobqueue Script The script has a dependency on two constructed objects. The Job Queue interface and the Scheduler interface. In order diff --git a/composer.json b/composer.json index a7c7655..5dca8c4 100644 --- a/composer.json +++ b/composer.json @@ -23,7 +23,8 @@ "phlib/console-process": "^2", "psr/log": "^1", - "aws/aws-sdk-php": "^3.61" + "aws/aws-sdk-php": "^3.61", + "stechstudio/backoff": "^1.6" }, "require-dev": { "symplify/easy-coding-standard": "^11", diff --git a/example/bootstrap.php b/example/bootstrap.php index 68dcd63..3644c10 100644 --- a/example/bootstrap.php +++ b/example/bootstrap.php @@ -5,5 +5,6 @@ $queue = 'my-test-tube'; $beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost'); $db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'test']); -$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120); +$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true); +$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120, true, 50, $backoff); $jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler); diff --git a/src/Scheduler/DbScheduler.php b/src/Scheduler/DbScheduler.php index 587f0f9..8a0f648 100644 --- a/src/Scheduler/DbScheduler.php +++ b/src/Scheduler/DbScheduler.php @@ -6,6 +6,7 @@ use Phlib\Db\Adapter; use Phlib\JobQueue\JobInterface; +use STS\Backoff\Backoff; /** * Class DbScheduler @@ -20,19 +21,53 @@ class DbScheduler implements BatchableSchedulerInterface private int $minimumPickup; + private bool $skipLocked; + private int $batchSize; + private ?Backoff $backoff; + + private const MYSQL_SERIALIZATION_FAILURE = '40001'; + + private const MYSQL_DEADLOCK = '1213'; + /** * @param integer $maximumDelay * @param integer $minimumPickup + * @param boolean $skipLocked * @param integer $batchSize + * @param ?Backoff $backoff */ - public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600, $batchSize = 50) - { + public function __construct( + Adapter $adapter, + $maximumDelay = 300, + $minimumPickup = 600, + $skipLocked = false, + $batchSize = 50, + $backoff = null + ) { $this->adapter = $adapter; $this->maximumDelay = $maximumDelay; $this->minimumPickup = $minimumPickup; + $this->skipLocked = $skipLocked; $this->batchSize = $batchSize; + $this->backoff = $backoff; + + if ($this->backoff) { + $this->backoff->setDecider( + function (int $attempt, int $maxAttempts, $result, ?\Throwable $e = null) { + if ($e === null) { + return false; + } + + if ($e instanceof \PDOException && $this->isDeadlock($e) && $attempt < $maxAttempts) { + return true; + } + + throw $e; + } + ); + } } public function shouldBeScheduled($delay): bool @@ -58,7 +93,7 @@ public function store(JobInterface $job): bool */ public function retrieve() { - $job = $this->queryJobs(1); + $job = $this->queryJobsWithRetry(1); return $job ? $job[0] : false; } @@ -67,7 +102,36 @@ public function retrieve() */ public function retrieveBatch() { - return $this->queryJobs($this->batchSize); + return $this->queryJobsWithRetry($this->batchSize); + } + + private function isDeadlock(\PDOException $exception): bool + { + $regex = '/SQLSTATE\[' . self::MYSQL_SERIALIZATION_FAILURE . '\].*\s' . self::MYSQL_DEADLOCK . '\s/'; + + return (string) $exception->getCode() === self::MYSQL_SERIALIZATION_FAILURE + && preg_match($regex, $exception->getMessage()); + } + + /** + * @return array|false + */ + private function queryJobsWithRetry(int $batchSize) + { + if ($this->backoff) { + return $this->backoff->run(function () use ($batchSize) { + try { + return $this->queryJobs($batchSize); + } catch (\PDOException $e) { + if ($this->adapter->getConnection()->inTransaction()) { + $this->adapter->rollBack(); + } + throw $e; + } + }); + } + + return $this->queryJobs($batchSize); } /** @@ -75,26 +139,47 @@ public function retrieveBatch() */ private function queryJobs(int $batchSize) { - $sql = " - UPDATE scheduled_queue SET - picked_by = CONNECTION_ID(), - picked_ts = NOW() + $this->adapter->beginTransaction(); + + $sql = <<skipLocked) { + $sql .= ' SKIP LOCKED'; + } + $stmt = $this->adapter->query($sql, [ ':minimumPickup' => $this->minimumPickup, ]); - if ($stmt->rowCount() === 0) { + $rowCount = $stmt->rowCount(); + + if ($rowCount === 0) { + $this->adapter->rollBack(); return false; // no jobs } - $sql = "SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT {$batchSize}"; - $rows = $this->adapter->query($sql)->fetchAll(\PDO::FETCH_ASSOC); + $rows = $stmt->fetchAll(\PDO::FETCH_ASSOC); + + $placeholders = implode(',', array_fill(0, $rowCount, '?')); + $sql = <<adapter->query($sql, array_column($rows, 'id')); + + $this->adapter->commit(); $jobs = []; diff --git a/tests/Scheduler/DbSchedulerTest.php b/tests/Scheduler/DbSchedulerTest.php index 58ec4fe..eb545b4 100644 --- a/tests/Scheduler/DbSchedulerTest.php +++ b/tests/Scheduler/DbSchedulerTest.php @@ -10,6 +10,7 @@ use Phlib\JobQueue\Scheduler\SchedulerInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use STS\Backoff\Backoff; class DbSchedulerTest extends TestCase { @@ -168,19 +169,19 @@ public function testRetrieveBatchWithResults(): void ], ]; - $updateStmt = $this->createMock(\PDOStatement::class); - $updateStmt->expects(static::once()) - ->method('rowCount') - ->willReturn(2); - $selectStmt = $this->createMock(\PDOStatement::class); $selectStmt->expects(static::once()) ->method('fetchAll') ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $updateStmt = $this->createMock(\PDOStatement::class); $this->adapter->expects(static::exactly(2)) ->method('query') - ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + ->willReturnOnConsecutiveCalls($selectStmt, $updateStmt); $scheduler = new DbScheduler($this->adapter); $jobs = $scheduler->retrieveBatch(); @@ -209,19 +210,19 @@ public function testRetrieveBatchCalculatesDelayCorrectly(): void ], ]; - $updateStmt = $this->createMock(\PDOStatement::class); - $updateStmt->expects(static::once()) - ->method('rowCount') - ->willReturn(1); - $selectStmt = $this->createMock(\PDOStatement::class); $selectStmt->expects(static::once()) ->method('fetchAll') ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $updateStmt = $this->createMock(\PDOStatement::class); $this->adapter->expects(static::exactly(2)) ->method('query') - ->willReturnOnConsecutiveCalls($updateStmt, $selectStmt); + ->willReturnOnConsecutiveCalls($selectStmt, $updateStmt); $scheduler = new DbScheduler($this->adapter); $jobs = $scheduler->retrieveBatch(); @@ -298,4 +299,176 @@ public function testRemoveBatchWithNoRowsAffected(): void $scheduler = new DbScheduler($this->adapter); static::assertFalse($scheduler->removeBatch([999])); } + + public function testQueryJobsWithRetrySuccessAfterDeadlock(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue', + 'data' => serialize('data'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + ]; + + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(1); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(3)) + ->method('query') + ->willReturnCallback(function () use ($deadlockException, $selectStmt, $updateStmt) { + static $callCount = 0; + $callCount++; + if ($callCount === 1) { + throw $deadlockException; + } + return $callCount === 2 ? $selectStmt : $updateStmt; + }); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(2, 'constant', 0)); + $result = $scheduler->retrieve(); + + static::assertIsArray($result); + static::assertSame(1, $result['id']); + } + + public function testQueryJobsWithRetryThrowsExceptionAfterMaxAttempts(): void + { + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $this->adapter->expects(static::exactly(5)) + ->method('query') + ->willThrowException($deadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(5, 'constant', 0)); + + $this->expectException(\PDOException::class); + $this->expectExceptionMessage( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction' + ); + + $scheduler->retrieve(); + } + + public function testQueryJobsWithRetryThrowsNonDeadlockExceptionImmediately(): void + { + $nonDeadlockException = new \PDOException('Table does not exist'); + $nonDeadlockException->errorInfo = ['42S02', 1146, 'Table does not exist']; + + $this->adapter->expects(static::once()) + ->method('query') + ->willThrowException($nonDeadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(5, 'constant', 0)); + + $this->expectException(\PDOException::class); + $this->expectExceptionMessage('Table does not exist'); + + $scheduler->retrieve(); + } + + public function testQueryJobsWithRetryBatchSuccessAfterDeadlock(): void + { + $rowData = [ + [ + 'id' => 1, + 'tube' => 'queue1', + 'data' => serialize('data1'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 1024, + 'ttr' => 60, + ], + [ + 'id' => 2, + 'tube' => 'queue2', + 'data' => serialize('data2'), + 'scheduled_ts' => date('Y-m-d H:i:s'), + 'priority' => 512, + 'ttr' => 30, + ], + ]; + + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $selectStmt = $this->createMock(\PDOStatement::class); + $selectStmt->expects(static::once()) + ->method('fetchAll') + ->willReturn($rowData); + $selectStmt->expects(static::once()) + ->method('rowCount') + ->willReturn(2); + + $updateStmt = $this->createMock(\PDOStatement::class); + + $this->adapter->expects(static::exactly(3)) + ->method('query') + ->willReturnCallback(function () use ($deadlockException, $selectStmt, $updateStmt) { + static $callCount = 0; + $callCount++; + if ($callCount === 1) { + throw $deadlockException; + } + return $callCount === 2 ? $selectStmt : $updateStmt; + }); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(2, 'constant', 0)); + $result = $scheduler->retrieveBatch(); + + static::assertIsArray($result); + static::assertCount(2, $result); + static::assertSame(1, $result[0]['id']); + static::assertSame(2, $result[1]['id']); + } + + public function testQueryJobsWithRetryRollsBackTransactionOnDeadlock(): void + { + $deadlockException = new \PDOException( + 'SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction', + 40001 + ); + + $connection = $this->createMock(\PDO::class); + $connection->expects(static::once()) + ->method('inTransaction') + ->willReturn(true); + + $this->adapter->expects(static::once()) + ->method('getConnection') + ->willReturn($connection); + + $this->adapter->expects(static::once()) + ->method('beginTransaction'); + + $this->adapter->expects(static::once()) + ->method('rollBack'); + + $this->adapter->expects(static::once()) + ->method('query') + ->willThrowException($deadlockException); + + $scheduler = new DbScheduler($this->adapter, 300, 600, false, 50, new Backoff(1)); + + $this->expectException(\PDOException::class); + $scheduler->retrieve(); + } }