Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- Added `BatchableJobQueueInterface` which defines a JobQueue capable of putting jobs to a queue in batches.
- Extends `JobQueueInterface`.
- Adds `putBatch()` method, which accepts an array of `JobInterface` and returns `self`.
- Added `BatchableSchedulerInterface` which defines a scheduler capable of retrieving and removing jobs in batches.
- Extends `SchedulerInterface`.
- Adds `retrieveBatch()` which will return a array of jobs or `false` if there are no jobs.
- Adds `removeBatch()` which accepts an array of job ids and returns a bool to indicate success.
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` supports batching and implements `BatchableJobQueueInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` supports batching and implements `BatchableSchedulerInterface`.
- `Phlib\JobQueue\Scheduler\DbScheduler` optionally accepts a `batchSize` argument to specify how many jobs should be fetched per query.
This defaults to `50` if not provided.
- `MonitorCommand` will fetch jobs in batches if the scheduler implements `BatchableSchedulerInterface`.
- `MonitorCommand` will put jobs in batches if the JobQueue implements `BatchableJobQueueInterface` and the scheduler support batching.

## [2.0.0] - 2022-09-14
### Added
Expand Down
37 changes: 35 additions & 2 deletions src/AwsSqs/JobQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

use Aws\Sqs\Exception\SqsException;
use Aws\Sqs\SqsClient;
use Phlib\JobQueue\BatchableJobQueueInterface;
use Phlib\JobQueue\Exception\InvalidArgumentException;
use Phlib\JobQueue\Exception\RuntimeException;
use Phlib\JobQueue\Job;
use Phlib\JobQueue\JobInterface;
use Phlib\JobQueue\JobQueueInterface;
use Phlib\JobQueue\Scheduler\SchedulerInterface;

class JobQueue implements JobQueueInterface
class JobQueue implements BatchableJobQueueInterface
{
private SqsClient $client;

Expand Down Expand Up @@ -69,6 +69,39 @@ public function put(JobInterface $job): self
}
}

public function putBatch(array $jobs): self
{
try {
$queues = [];

foreach ($jobs as $key => $job) {
if ($this->scheduler->shouldBeScheduled($job->getDelay())) {
$this->scheduler->store($job);
continue;
}

$queues[$job->getQueue()][] = [
'Id' => (string) $key,
'DelaySeconds' => $job->getDelay(),
'MessageBody' => JobFactory::serializeBody($job),
];
}

foreach ($queues as $queue => $jobs) {
foreach (array_chunk($jobs, 10) as $batch) {
$this->client->sendMessageBatch([
'QueueUrl' => $this->getQueueUrlWithPrefix($queue),
'Entries' => $batch,
]);
}
}

return $this;
} catch (SqsException $exception) {
throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
}
}

public function retrieve(string $queue): ?JobInterface
{
try {
Expand Down
13 changes: 13 additions & 0 deletions src/BatchableJobQueueInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Phlib\JobQueue;

interface BatchableJobQueueInterface extends JobQueueInterface
{
/**
* @param JobInterface[] $jobs
*/
public function putBatch(array $jobs): self;
}
34 changes: 30 additions & 4 deletions src/Console/MonitorCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
namespace Phlib\JobQueue\Console;

use Phlib\ConsoleProcess\Command\DaemonCommand;
use Phlib\JobQueue\BatchableJobQueueInterface;
use Phlib\JobQueue\Exception\InvalidArgumentException;
use Phlib\JobQueue\JobInterface;
use Phlib\JobQueue\JobQueueInterface;
use Phlib\JobQueue\Scheduler\BatchableSchedulerInterface;
use Phlib\JobQueue\Scheduler\SchedulerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
Expand Down Expand Up @@ -50,10 +52,22 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$this->logFile = $logFile;
}

while ($jobData = $this->scheduler->retrieve()) {
$output->writeln("Job {$jobData['id']} added.");
$this->jobQueue->put($this->createJob($jobData));
$this->scheduler->remove($jobData['id']);
if ($this->scheduler instanceof BatchableSchedulerInterface) {
while ($jobsData = $this->scheduler->retrieveBatch()) {
$jobs = [];
foreach ($jobsData as $jobData) {
$output->writeln("Job {$jobData['id']} added.");
$jobs[] = $this->createJob($jobData);
}
$this->putJobBatch($jobs);
$this->scheduler->removeBatch(array_column($jobsData, 'id'));
}
} else {
while ($jobData = $this->scheduler->retrieve()) {
$output->writeln("Job {$jobData['id']} added.");
$this->jobQueue->put($this->createJob($jobData));
$this->scheduler->remove($jobData['id']);
}
}

return 0;
Expand All @@ -71,6 +85,18 @@ protected function createJob(array $schedulerJob): JobInterface
);
}

protected function putJobBatch(array $jobs): void
{
if ($this->jobQueue instanceof BatchableJobQueueInterface) {
$this->jobQueue->putBatch($jobs);
return;
}

foreach ($jobs as $job) {
$this->jobQueue->put($job);
}
}

protected function createChildOutput(): OutputInterface
{
if (empty($this->logFile)) {
Expand Down
20 changes: 20 additions & 0 deletions src/Scheduler/BatchableSchedulerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Phlib\JobQueue\Scheduler;

/**
* Interface BatchableSchedulerInterface
*
* @package Phlib\JobQueue
*/
interface BatchableSchedulerInterface extends SchedulerInterface
{
/**
* @return array|false
*/
public function retrieveBatch();

public function removeBatch(array $jobId): bool;
}
77 changes: 58 additions & 19 deletions src/Scheduler/DbScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,30 @@

/**
* Class DbScheduler
*
* @package Phlib\JobQueue
*/
class DbScheduler implements SchedulerInterface
class DbScheduler implements BatchableSchedulerInterface
{
protected Adapter $adapter;

protected int $maximumDelay;

private int $minimumPickup;

private int $batchSize;

/**
* @param integer $maximumDelay
* @param integer $minimumPickup
* @param integer $batchSize
*/
public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600)
public function __construct(Adapter $adapter, $maximumDelay = 300, $minimumPickup = 600, $batchSize = 50)
{
$this->adapter = $adapter;
$this->maximumDelay = $maximumDelay;
$this->minimumPickup = $minimumPickup;
$this->batchSize = $batchSize;
}

public function shouldBeScheduled($delay): bool
Expand All @@ -53,7 +58,24 @@ public function store(JobInterface $job): bool
*/
public function retrieve()
{
$sql = '
$job = $this->queryJobs(1);
return $job ? $job[0] : false;
}

/**
* @return array|false
*/
public function retrieveBatch()
{
return $this->queryJobs($this->batchSize);
}

/**
* @return array|false
*/
private function queryJobs(int $batchSize)
{
$sql = "
UPDATE scheduled_queue SET
picked_by = CONNECTION_ID(),
picked_ts = NOW()
Expand All @@ -62,31 +84,38 @@ public function retrieve()
picked_by IS NULL
ORDER BY
scheduled_ts DESC
LIMIT 1';
LIMIT {$batchSize}";
$stmt = $this->adapter->query($sql, [
':minimumPickup' => $this->minimumPickup,
]);

if ($stmt->rowCount() === 0) {
return false; // no jobs
}

$sql = 'SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT 1';
$row = $this->adapter->query($sql)->fetch(\PDO::FETCH_ASSOC);

$scheduledTime = strtotime($row['scheduled_ts']);
$delay = $scheduledTime - time();
if ($delay < 0) {
$delay = 0;
$sql = "SELECT * FROM `scheduled_queue` WHERE picked_by = CONNECTION_ID() LIMIT {$batchSize}";
$rows = $this->adapter->query($sql)->fetchAll(\PDO::FETCH_ASSOC);

$jobs = [];

foreach ($rows as $row) {
$scheduledTime = strtotime($row['scheduled_ts']);
$delay = $scheduledTime - time();
if ($delay < 0) {
$delay = 0;
}

$jobs[] = [
'id' => (int) $row['id'],
'queue' => $row['tube'],
'data' => unserialize($row['data']),
'delay' => $delay,
'priority' => (int) $row['priority'],
'ttr' => (int) $row['ttr'],
];
}

return [
'id' => $row['id'],
'queue' => $row['tube'],
'data' => unserialize($row['data']),
'delay' => $delay,
'priority' => $row['priority'],
'ttr' => $row['ttr'],
];
return $jobs;
}

/**
Expand All @@ -102,6 +131,16 @@ public function remove($jobId): bool
->rowCount();
}

public function removeBatch(array $jobIds): bool
{
$table = $this->adapter->quote()->identifier('scheduled_queue');
$sql = "DELETE FROM {$table} WHERE id IN ( " . implode(',', array_fill(0, count($jobIds), '?')) . ' )';

return $this->adapter
->query($sql, $jobIds)
->rowCount() === count($jobIds);
}

protected function insert(array $data): int
{
$table = $this->adapter->quote()->identifier('scheduled_queue');
Expand Down
Loading