Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches.
- Extends `JobQueueInterface`.
- Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`.
- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches.
- Extends `SchedulerInterface`.
- Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs.
- Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success.
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor.
This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query.
This defaults to `50` if not provided.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs.
This defaults to `null` and no retry will be attempted if not provided.
- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`.
- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching.

## [3.0.3] - 2025-07-31
### Fixed
Expand All @@ -28,6 +47,27 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- **BC break**: Removed support for PHP versions <= v8.0 as they are no longer
[actively supported](https://php.net/supported-versions.php) by the PHP project.

## [2.1.0] - 2025-08-19
### Added
- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches.
- Extends `JobQueueInterface`.
- Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`.
- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches.
- Extends `SchedulerInterface`.
- Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs.
- Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success.
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `skipLocked` argument to indicate whether locked rows should be skipped when claiming jobs. Enabling this will reduce the risk of deadlocks occurring when running multiple instances of the monitor.
This defaults to `false` if not provided and should only be enabled if you are running MySQL 8.0.1 or higher.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query.
This defaults to `50` if not provided.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `backoff` argument which should be a `STS\Backoff\Backoff` object used to retry if a deadlock occurs.
This defaults to `null` and no retry will be attempted if not provided.
- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`.
- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching.

## [2.0.0] - 2022-09-14
### Added
- Add support for PHP v8
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Bootstrap
$beanstalk = (new \Phlib\Beanstalk\Factory())->create('localhost');
$db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']);

$scheduler = new \Phlib\JobQueue\DbScheduler($db, 300, 600);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true);
$jobQueue = new \Phlib\JobQueue\Beanstalk\Scheduled($beanstalk, $scheduler);
```

Expand All @@ -51,6 +51,8 @@ do {
} while (true);
```

See [examples](example/) for more advance usage.

## Jobqueue Script

The script has a dependency on two constructed objects. The Job Queue interface and the Scheduler interface. In order
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"phlib/console-process": "^3 || ^4",

"psr/log": "^1 || ^2 || ^3",
"aws/aws-sdk-php": "^3.61"
"aws/aws-sdk-php": "^3.61",
"stechstudio/backoff": "^1.6"
},
"require-dev": {
"symplify/easy-coding-standard": "^12",
Expand Down
3 changes: 2 additions & 1 deletion example/bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
'host' => '127.0.0.1',
'dbname' => 'test',
]);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120);
$backoff = new \STS\Backoff\Backoff(5, 'constant', 2000, true);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 60, 120, true, 50, $backoff);
$jobQueue = new \Phlib\JobQueue\Beanstalk\JobQueue($beanstalk, $scheduler);
37 changes: 35 additions & 2 deletions src/AwsSqs/JobQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@

use Aws\Sqs\Exception\SqsException;
use Aws\Sqs\SqsClient;
use Phlib\JobQueue\BatchableJobQueueInterface;
use Phlib\JobQueue\Exception\InvalidArgumentException;
use Phlib\JobQueue\Exception\RuntimeException;
use Phlib\JobQueue\Job;
use Phlib\JobQueue\JobInterface;
use Phlib\JobQueue\JobQueueInterface;
use Phlib\JobQueue\Scheduler\SchedulerInterface;

/**
* @package Phlib\JobQueue
*/
class JobQueue implements JobQueueInterface
class JobQueue implements BatchableJobQueueInterface
{
private int $retrieveTimeout = 10;

Expand Down Expand Up @@ -59,6 +59,39 @@ public function put(JobInterface $job): self
}
}

public function putBatch(array $jobs): self
{
try {
$queues = [];

foreach ($jobs as $key => $job) {
if ($this->scheduler->shouldBeScheduled($job->getDelay())) {
$this->scheduler->store($job);
continue;
}

$queues[$job->getQueue()][] = [
'Id' => (string)$key,
'DelaySeconds' => $job->getDelay(),
'MessageBody' => JobFactory::serializeBody($job),
];
}

foreach ($queues as $queue => $jobs) {
foreach (array_chunk($jobs, 10) as $batch) {
$this->client->sendMessageBatch([
'QueueUrl' => $this->getQueueUrlWithPrefix($queue),
'Entries' => $batch,
]);
}
}

return $this;
} catch (SqsException $exception) {
throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
}
}

public function retrieve(string $queue): ?JobInterface
{
try {
Expand Down
13 changes: 13 additions & 0 deletions src/BatchableJobQueueInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Phlib\JobQueue;

interface BatchableJobQueueInterface extends JobQueueInterface
{
/**
* @param JobInterface[] $jobs
*/
public function putBatch(array $jobs): self;
}
34 changes: 30 additions & 4 deletions src/Console/MonitorCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
namespace Phlib\JobQueue\Console;

use Phlib\ConsoleProcess\Command\DaemonCommand;
use Phlib\JobQueue\BatchableJobQueueInterface;
use Phlib\JobQueue\Exception\InvalidArgumentException;
use Phlib\JobQueue\JobInterface;
use Phlib\JobQueue\JobQueueInterface;
use Phlib\JobQueue\Scheduler\BatchableSchedulerInterface;
use Phlib\JobQueue\Scheduler\SchedulerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
Expand Down Expand Up @@ -49,10 +51,22 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$this->logFile = $logFile;
}

while ($jobData = $this->scheduler->retrieve()) {
$output->writeln("Job {$jobData['id']} added.");
$this->jobQueue->put($this->createJob($jobData));
$this->scheduler->remove($jobData['id']);
if ($this->scheduler instanceof BatchableSchedulerInterface) {
while ($jobsData = $this->scheduler->retrieveBatch()) {
$jobs = [];
foreach ($jobsData as $jobData) {
$output->writeln("Job {$jobData['id']} added.");
$jobs[] = $this->createJob($jobData);
}
$this->putJobBatch($jobs);
$this->scheduler->removeBatch(array_column($jobsData, 'id'));
}
} else {
while ($jobData = $this->scheduler->retrieve()) {
$output->writeln("Job {$jobData['id']} added.");
$this->jobQueue->put($this->createJob($jobData));
$this->scheduler->remove($jobData['id']);
}
}

return 0;
Expand All @@ -70,6 +84,18 @@ protected function createJob(array $schedulerJob): JobInterface
);
}

protected function putJobBatch(array $jobs): void
{
if ($this->jobQueue instanceof BatchableJobQueueInterface) {
$this->jobQueue->putBatch($jobs);
return;
}

foreach ($jobs as $job) {
$this->jobQueue->put($job);
}
}

protected function createChildOutput(?string $childLogFilename): OutputInterface
{
if (!isset($this->logFile) || ($this->logFile === '' || $this->logFile === '0')) {
Expand Down
15 changes: 15 additions & 0 deletions src/Scheduler/BatchableSchedulerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Phlib\JobQueue\Scheduler;

/**
* @package Phlib\JobQueue
*/
interface BatchableSchedulerInterface extends SchedulerInterface
{
public function retrieveBatch(): array|false;

public function removeBatch(array $jobId): bool;
}
Loading