Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The backward compatibility promise has the following exceptions:
* Units of code that are annotated with `@internal`.

## [Unreleased]
### Added
- Replay tube selections when reconnecting a failed Pool connection.

## [3.0.2] - 2024-07-12
### Fixed
Expand Down
49 changes: 48 additions & 1 deletion src/Pool/ManagedConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Phlib\Beanstalk\Pool;

use Phlib\Beanstalk\Connection;
use Phlib\Beanstalk\ConnectionInterface;
use Phlib\Beanstalk\Exception\RuntimeException;
use Psr\Log\LoggerInterface;
Expand All @@ -17,11 +18,19 @@ class ManagedConnection implements ConnectionInterface
{
private ConnectionInterface $connection;

private LoggerInterface $logger;

private int $retryDelay;

private int $retryAt;

private LoggerInterface $logger;
private bool $hasFailed = false;

private string $using = Connection::DEFAULT_TUBE;

private array $watching = [Connection::DEFAULT_TUBE => true];

private array $ignoring = [];

public function __construct(
ConnectionInterface $connection,
Expand Down Expand Up @@ -54,6 +63,8 @@ public function disconnect(): bool

public function useTube(string $tube): void
{
$this->using = $tube;

$this->tryCommand(
fn() => $this->connection->useTube($tube)
);
Expand Down Expand Up @@ -105,13 +116,19 @@ public function delete($id): void

public function watch(string $tube): int
{
unset($this->ignoring[$tube]);
$this->watching[$tube] = true;

return $this->tryCommand(
fn() => $this->connection->watch($tube)
);
}

public function ignore(string $tube): int
{
unset($this->watching[$tube]);
$this->ignoring[$tube] = true;

return $this->tryCommand(
fn() => $this->connection->ignore($tube)
);
Expand Down Expand Up @@ -200,6 +217,11 @@ public function listTubesWatched(): array
private function tryCommand(\Closure $commandFn)
{
try {
if ($this->hasFailed) {
// Trying to reconnect; need to replay tube selections
$this->replayTubes();
}

$result = $commandFn();
$this->reset();

Expand All @@ -217,13 +239,38 @@ public function isAvailable(): bool
return !isset($this->retryAt) || $this->retryAt <= time();
}

private function replayTubes(): void
{
$this->logger->debug(
sprintf('Replay tube selections for connection \'%s\'', $this->getName()),
[
'connectionName' => $this->getName(),
'using' => $this->using,
'watching' => array_keys($this->watching),
'ignoring' => array_keys($this->ignoring),
]
);

$this->connection->useTube($this->using);

foreach (array_keys($this->watching) as $watchTube) {
$this->connection->watch($watchTube);
}

foreach (array_keys($this->ignoring) as $ignoreTube) {
$this->connection->ignore($ignoreTube);
}
}

private function reset(): void
{
unset($this->retryAt);
$this->hasFailed = false;
}

private function delay(): void
{
$this->hasFailed = true;
$this->retryAt = time() + $this->retryDelay;
$this->logger->notice(
sprintf('Connection \'%s\' failed; delay for %ds', $this->getName(), $this->retryDelay),
Expand Down
94 changes: 94 additions & 0 deletions tests/Pool/ManagedConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Phlib\Beanstalk\Pool;

use ColinODell\PsrTestLogger\TestLogger;
use Phlib\Beanstalk\Connection;
use Phlib\Beanstalk\ConnectionInterface;
use Phlib\Beanstalk\Exception\RuntimeException;
use phpmock\phpunit\PHPMock;
Expand Down Expand Up @@ -208,4 +209,97 @@ public function dataCommandPassThrough(): iterable
yield $command => [$command, $map];
}
}

public function testConnectionGetsRestored(): void
{
$useTube = sha1(uniqid('use'));
$watchTube = sha1(uniqid('watch'));
$ignoreTube = Connection::DEFAULT_TUBE;
$jobId = rand();

$releaseCalled = false;

$this->connection->expects(static::exactly(2))
->method('useTube')
->with($useTube)
->willReturnCallback(function () use (&$releaseCalled) {
if ($releaseCalled) {
// This should be called its second time to re-select, before the primary command is called
self::fail('Command was called before connection was reinitialised');
}
});

$this->connection->expects(static::exactly(2))
->method('watch')
->with($watchTube)
->willReturnCallback(function () use (&$releaseCalled) {
if ($releaseCalled) {
// This should be called its second time to re-select, before the primary command is called
self::fail('Command was called before connection was reinitialised');
}
return 1;
});

$this->connection->expects(static::exactly(2))
->method('ignore')
->with($ignoreTube)
->willReturnCallback(function () use (&$releaseCalled) {
if ($releaseCalled) {
// This should be called its second time to re-select, before the primary command is called
self::fail('Command was called before connection was reinitialised');
}
return 1;
});

// Force a connection error so it is treated as unavailable
$this->connection->expects(static::once())
->method('stats')
->willThrowException(new RuntimeException('connection error'));

// Command should only be called on the connection after the tube selections are replayed
$this->connection->expects(static::once())
->method('release')
->willReturnCallback(function (int $withJobId) use (&$releaseCalled, $jobId): void {
$releaseCalled = true;
static::assertSame($jobId, $withJobId);
});

$logger = new TestLogger();

$managed = new ManagedConnection($this->connection, 0, $logger);

// Make the tube selections that should be repeated after failure
$managed->useTube($useTube);
$managed->watch($watchTube);
$managed->ignore($ignoreTube);

try {
// Trigger the connection error
$managed->stats();
} catch (RuntimeException $e) {
// ignore
}

// Call the command that should trigger the reinitialisation
$managed->release($jobId);

// Expected retry log
$logMsg = sprintf(
'Replay tube selections for connection \'%s\'',
$this->connectionName,
);
$logCtxt = [
'connectionName' => $this->connectionName,
'using' => $useTube,
'watching' => [$watchTube],
'ignoring' => [$ignoreTube],
];
static::assertCount(2, $logger->records);
$log = $logger->records[1];
static::assertSame(LogLevel::DEBUG, $log['level']);
static::assertSame($logMsg, $log['message']);
foreach ($logCtxt as $key => $expectedCtxt) {
static::assertSame($expectedCtxt, $log['context'][$key]);
}
}
}