Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor.
This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query.
This defaults to `50` if not provided.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs.
This defaults to `null` and no retry will be attempted if not provided.
- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`.
- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching.

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Bootstrap
$beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost');
$db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']);

$scheduler = new \Phlib\JobQueue\DbScheduler($db, 300, 600);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true);
$jobQueue = new \Phlib\JobQueue\Beanstalk\Scheduled($beanstalk, $scheduler);
```

Expand All @@ -51,6 +51,8 @@ do {
} while (true);
```

See [examples](example/) for more advance usage.

## Jobqueue Script

The script has a dependency on two constructed objects. The Job Queue interface and the Scheduler interface. In order
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"phlib/console-process": "^2",

"psr/log": "^1",
"aws/aws-sdk-php": "^3.61"
"aws/aws-sdk-php": "^3.61",
"stechstudio/backoff": "^1.6"
},
"require-dev": {
"symplify/easy-coding-standard": "^11",
Expand Down
3 changes: 2 additions & 1 deletion example/bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
$queue = 'my-test-tube';
$beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost');
$db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'test']);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120);
$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120, true, 50, $backoff);
$jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler);
109 changes: 97 additions & 12 deletions src/Scheduler/DbScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Phlib\Db\Adapter;
use Phlib\JobQueue\JobInterface;
use STS\Backoff\Backoff;

/**
* Class DbScheduler
Expand All @@ -20,19 +21,53 @@ class DbScheduler implements BatchableSchedulerInterface

private int $minimumPickup;

private bool $skipLocked;

private int $batchSize;

private ?Backoff $backoff;

private const MYSQL_SERIALIZATION_FAILURE = '40001';

private const MYSQL_DEADLOCK = '1213';

/**
* @param integer $maximumDelay
* @param integer $minimumPickup
* @param boolean $skipLocked
* @param integer $batchSize
* @param ?Backoff $backoff
*/
public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600, $batchSize = 50)
{
public function __construct(
Adapter $adapter,
$maximumDelay = 300,
$minimumPickup = 600,
$skipLocked = false,
$batchSize = 50,
$backoff = null
) {
$this->adapter = $adapter;
$this->maximumDelay = $maximumDelay;
$this->minimumPickup = $minimumPickup;
$this->skipLocked = $skipLocked;
$this->batchSize = $batchSize;
$this->backoff = $backoff;

if ($this->backoff) {
$this->backoff->setDecider(
function (int $attempt, int $maxAttempts, $result, ?\Throwable $e = null) {
if ($e === null) {
return false;
}

if ($e instanceof \PDOException && $this->isDeadlock($e) && $attempt < $maxAttempts) {
return true;
}

throw $e;
}
);
}
}

public function shouldBeScheduled($delay): bool
Expand All @@ -58,7 +93,7 @@ public function store(JobInterface $job): bool
*/
public function retrieve()
{
$job = $this->queryJobs(1);
$job = $this->queryJobsWithRetry(1);
return $job ? $job[0] : false;
}

Expand All @@ -67,34 +102,84 @@ public function retrieve()
*/
public function retrieveBatch()
{
return $this->queryJobs($this->batchSize);
return $this->queryJobsWithRetry($this->batchSize);
}

private function isDeadlock(\PDOException $exception): bool
{
$regex = '/SQLSTATE\[' . self::MYSQL_SERIALIZATION_FAILURE . '\].*\s' . self::MYSQL_DEADLOCK . '\s/';

return (string) $exception->getCode() === self::MYSQL_SERIALIZATION_FAILURE
&& preg_match($regex, $exception->getMessage());
}

/**
* @return array|false
*/
private function queryJobsWithRetry(int $batchSize)
{
if ($this->backoff) {
return $this->backoff->run(function () use ($batchSize) {
try {
return $this->queryJobs($batchSize);
} catch (\PDOException $e) {
if ($this->adapter->getConnection()->inTransaction()) {
$this->adapter->rollBack();
}
throw $e;
}
});
}

return $this->queryJobs($batchSize);
}

/**
* @return array|false
*/
private function queryJobs(int $batchSize)
{
$sql = "
UPDATE scheduled_queue SET
picked_by = CONNECTION_ID(),
picked_ts = NOW()
$this->adapter->beginTransaction();

$sql = <<<SQL
SELECT * FROM scheduled_queue
WHERE
scheduled_ts <= CURRENT_TIMESTAMP + INTERVAL :minimumPickup SECOND AND
picked_by IS NULL
ORDER BY
scheduled_ts DESC
LIMIT {$batchSize}";
LIMIT {$batchSize}
Copy link
Collaborator

@amaanhassim amaanhassim Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be binding batchSize just to be safe?

Limit :batchSize
and then add the batchSize through the stuff below where you are doing minimumPickup

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not quite that simple. The default type PDO uses for binding is string (which would be a syntax error here as it would be quoted) and phlib/db does not support setting the type of a bind.
This should be safe as-is, we are using strict types and $batchSize is type hinted as an int, so it shouldn't be possible to pass an unsafe value here. The :minimumPickup binding was added back in the PHP 5.4 days before strict typing was in the language.

FOR UPDATE
SQL;

if ($this->skipLocked) {
$sql .= ' SKIP LOCKED';
}

$stmt = $this->adapter->query($sql, [
':minimumPickup' => $this->minimumPickup,
]);

if ($stmt->rowCount() === 0) {
$rowCount = $stmt->rowCount();

if ($rowCount === 0) {
$this->adapter->rollBack();
return false; // no jobs
}

$sql = "SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT {$batchSize}";
$rows = $this->adapter->query($sql)->fetchAll(\PDO::FETCH_ASSOC);
$rows = $stmt->fetchAll(\PDO::FETCH_ASSOC);

$placeholders = implode(',', array_fill(0, $rowCount, '?'));
$sql = <<<SQL
UPDATE scheduled_queue SET
picked_by = CONNECTION_ID(),
picked_ts = NOW()
WHERE id IN ( {$placeholders} )
SQL;

$this->adapter->query($sql, array_column($rows, 'id'));

$this->adapter->commit();

$jobs = [];

Expand Down
Loading