Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ jobs:
strategy:
matrix:
php:
- '7.4'
- '8.0'
- '8.1'
- '8.2'
- '8.3'
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- Type declarations for parameters and return types.
### Changed
- Upgraded `phlib/beanstalk` dependency to require v3.
- Upgraded `phlib/console-process` dependency to require v3 or v4.
- Allow `phlib/console-configuration` v3 dependency.
### Removed
- **BC break**: Removed support for PHP versions <= v8.0 as they are no longer
[actively supported](https://php.net/supported-versions.php) by the PHP project.

## [2.0.0] - 2022-09-14
### Added
Expand Down
8 changes: 4 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
}
],
"require": {
"php": "^7.4 || ^8.0",
"php": "^8.1",
"ext-json": "*",

"phlib/beanstalk": "^2",
"phlib/beanstalk": "^3",
"phlib/db": "^2",
"phlib/console-configuration": "^2",
"phlib/console-process": "^2",
"phlib/console-configuration": "^2 || ^3",
"phlib/console-process": "^3 || ^4",

"psr/log": "^1",
"aws/aws-sdk-php": "^3.61"
Expand Down
5 changes: 1 addition & 4 deletions src/AwsSqs/JobFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
*/
class JobFactory
{
/**
* @return string
*/
public static function serializeBody(JobInterface $job)
public static function serializeBody(JobInterface $job): string
{
return json_encode([
'queue' => $job->getQueue(),
Expand Down
31 changes: 9 additions & 22 deletions src/AwsSqs/JobQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,21 @@
*/
class JobQueue implements JobQueueInterface
{
private SqsClient $client;

private SchedulerInterface $scheduler;

private int $retrieveTimeout = 10;

private array $queues = [];

/**
* @var string
*/
private $queuePrefix;

public function __construct(SqsClient $client, SchedulerInterface $scheduler, $queuePrefix = '')
{
$this->client = $client;
$this->scheduler = $scheduler;
$this->queuePrefix = $queuePrefix;
public function __construct(
private readonly SqsClient $client,
private readonly SchedulerInterface $scheduler,
private string $queuePrefix = '',
) {
}

/**
* @param mixed $data
* @param int|string|null $id
*/
public function createJob(
string $queue,
$data,
$id = null,
mixed $data,
int|string|null $id,
$delay = Job::DEFAULT_DELAY,
$priority = Job::DEFAULT_PRIORITY,
$ttr = Job::DEFAULT_TTR
Expand Down Expand Up @@ -160,7 +147,7 @@ private function getQueueUrl($name)
return $this->queues[$name];
}

private function determineDeadletterQueue($queue)
private function determineDeadletterQueue(string $queue): string
{
$name = $this->queuePrefix . $queue;
try {
Expand All @@ -175,7 +162,7 @@ private function determineDeadletterQueue($queue)

$targetArn = json_decode($arnJson, true, 512, JSON_THROW_ON_ERROR)['deadLetterTargetArn'];
return substr($targetArn, strrpos($targetArn, ':') + 1);
} catch (SqsException $exception) {
} catch (SqsException) {
throw new RuntimeException("Specified queue '{$name}' does not have a Redrive Policy");
}
}
Expand Down
48 changes: 25 additions & 23 deletions src/Beanstalk/JobQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace Phlib\JobQueue\Beanstalk;

use Phlib\Beanstalk\Connection\ConnectionInterface;
use Phlib\Beanstalk\ConnectionInterface;
use Phlib\Beanstalk\Exception\NotFoundException as BeanstalkNotFoundException;
use Phlib\JobQueue\Exception\InvalidArgumentException;
use Phlib\JobQueue\Job;
use Phlib\JobQueue\JobInterface;
Expand All @@ -16,24 +17,22 @@
*/
class JobQueue implements JobQueueInterface
{
protected ConnectionInterface $beanstalk;

protected SchedulerInterface $scheduler;

protected ?int $retrieveTimeout = 5;

public function __construct(ConnectionInterface $beanstalk, SchedulerInterface $scheduler)
{
$this->beanstalk = $beanstalk;
$this->scheduler = $scheduler;
public function __construct(
protected ConnectionInterface $beanstalk,
protected SchedulerInterface $scheduler,
) {
}

/**
* @param mixed $data
* @param int|string|null $id
*/
public function createJob(string $queue, $data, $id, int $delay, int $priority, int $ttr): JobInterface
{
public function createJob(
string $queue,
mixed $data,
int|string|null $id,
int $delay,
int $priority,
int $ttr,
): JobInterface {
return new Job($queue, $data, $id, $delay, $priority, $ttr);
}

Expand All @@ -44,9 +43,8 @@ public function put(JobInterface $job): self
return $this;
}

$this->beanstalk
->useTube($job->getQueue())
->put(JobFactory::serializeBody($job), $job->getPriority(), $job->getDelay(), $job->getTtr());
$this->beanstalk->useTube($job->getQueue());
$this->beanstalk->put(JobFactory::serializeBody($job), $job->getPriority(), $job->getDelay(), $job->getTtr());
return $this;
}

Expand Down Expand Up @@ -74,10 +72,15 @@ public function retrieve(string $queue): ?JobInterface
$this->beanstalk->watch($queue);
$this->beanstalk->ignore('default');

$data = $this->beanstalk->reserve($this->retrieveTimeout);
if ($data === null) {
try {
$data = $this->beanstalk->reserve($this->retrieveTimeout);
} catch (BeanstalkNotFoundException $e) {
if ($e->getCode() !== BeanstalkNotFoundException::RESERVE_NO_JOBS_AVAILABLE_CODE) {
throw $e;
}
return null;
}

return JobFactory::createFromRaw($data);
}

Expand All @@ -97,9 +100,8 @@ public function markAsIncomplete(JobInterface $job): self
return $this;
}

$this->beanstalk
->useTube($job->getQueue())
->release($job->getId(), $job->getPriority(), $job->getDelay());
$this->beanstalk->useTube($job->getQueue());
$this->beanstalk->release($job->getId(), $job->getPriority(), $job->getDelay());
return $this;
}

Expand Down
11 changes: 7 additions & 4 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class WorkerCommand extends DaemonCommand implements LoggerAwareInterface

protected bool $exitOnException = false;

protected bool $queueContinue = true;

protected function execute(InputInterface $input, OutputInterface $output): int
{
if ($this->queue === null) {
Expand All @@ -37,7 +39,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$jobQueue = $this->getJobQueue();
$logger = $this->getLogger($output);

while ($this->continue && ($job = $this->retrieve($jobQueue, $logger)) instanceof JobInterface) {
while ($this->queueContinue && ($job = $this->retrieve($jobQueue, $logger)) instanceof JobInterface) {
try {
$logger->info("Retrieved job {$job->getId()} for {$this->queue}");
$workStarted = microtime(true);
Expand Down Expand Up @@ -74,7 +76,8 @@ private function retrieve(JobQueueInterface $jobQueue, LoggerInterface $logger):
return $jobQueue->retrieve($this->queue);
} catch (\Exception $e) {
$this->logException($logger, "Failed to retrieve job due to error '{$e->getMessage()}'", $e);
$this->continue = false;
$this->queueContinue = false;
$this->shutdown();
throw $e;
}
}
Expand All @@ -97,10 +100,10 @@ protected function getLogger(OutputInterface $output): LoggerInterface
return $this->logger;
}

private function logException(LoggerInterface $logger, $message, \Exception $exception, $job = null): void
private function logException(LoggerInterface $logger, string $message, \Exception $exception, $job = null): void
{
$context = [
'qClass' => get_class($this->getJobQueue()),
'qClass' => $this->getJobQueue()::class,
'xMessage' => $exception->getMessage(),
'xFile' => $exception->getFile(),
'xLine' => $exception->getLine(),
Expand Down
6 changes: 3 additions & 3 deletions src/Console/MonitorCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ protected function createJob(array $schedulerJob): JobInterface
);
}

protected function createChildOutput(): OutputInterface
protected function createChildOutput(?string $childLogFilename): OutputInterface
{
if (empty($this->logFile)) {
return parent::createChildOutput();
if (!isset($this->logFile) || ($this->logFile === '' || $this->logFile === '0')) {
return parent::createChildOutput($childLogFilename);
}
return new StreamOutput(fopen($this->logFile, 'ab'));
}
Expand Down
12 changes: 4 additions & 8 deletions src/Console/MonitorDependencies.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
*/
class MonitorDependencies
{
protected JobQueueInterface $jobQueue;

protected SchedulerInterface $scheduler;

public function __construct(JobQueueInterface $jobQueue, SchedulerInterface $scheduler)
{
$this->jobQueue = $jobQueue;
$this->scheduler = $scheduler;
public function __construct(
protected JobQueueInterface $jobQueue,
protected SchedulerInterface $scheduler,
) {
}

public function getJobQueue(): JobQueueInterface
Expand Down
16 changes: 7 additions & 9 deletions src/Exception/JobRuntimeException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@
*/
class JobRuntimeException extends RuntimeException
{
/**
* @var JobInterface
*/
protected $job;

public function __construct(?JobInterface $job, string $message, int $code = 0, \Throwable $previous = null)
{
public function __construct(
protected JobInterface $job,
string $message,
int $code = 0,
\Throwable $previous = null,
) {
parent::__construct($message, $code, $previous);
$this->job = $job;
}

public function hasJob(): bool
{
return $this->job !== null;
}

public function getJob(): ?JobInterface
public function getJob(): JobInterface
{
return $this->job;
}
Expand Down
42 changes: 7 additions & 35 deletions src/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,23 @@ class Job implements JobInterface

public const DEFAULT_TTR = 60;

protected string $queue;

/**
* @var int|string|null
*/
protected $id;

protected int $delay;

protected int $priority;

protected int $ttr;

/**
* @var mixed
*/
protected $body;

/**
* @param mixed $body
* @param int|string|null $id
*/
public function __construct(
string $queue,
$body,
$id = null,
int $delay = self::DEFAULT_DELAY,
protected string $queue,
protected mixed $body,
protected int|string|null $id = null,
protected int $delay = self::DEFAULT_DELAY,
int $priority = self::DEFAULT_PRIORITY,
int $ttr = self::DEFAULT_TTR
) {
$this->queue = $queue;
$this->body = $body;
$this->id = $id;
$this->delay = $delay;
$this->setPriority($priority);
$this->setTtr($ttr);
}

/**
* @return int|string|null
*/
public function getId()
public function getId(): int|string|null
{
return $this->id;
}
Expand All @@ -66,10 +41,7 @@ public function getQueue(): string
return $this->queue;
}

/**
* @return mixed
*/
public function getBody()
public function getBody(): mixed
{
return $this->body;
}
Expand All @@ -86,7 +58,7 @@ public function getDatetimeDelay(): \DateTimeImmutable

public function setDelay(int $value): self
{
$this->delay = (int)$value;
$this->delay = $value;
return $this;
}

Expand Down
Loading