diff --git a/CHANGELOG.md b/CHANGELOG.md index 2db9076..2120285 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ The backward compatibility promise has the following exceptions: * Units of code that are annotated with `@internal`. ## [Unreleased] +### Added +- Replay tube selections when reconnecting a failed Pool connection. ## [3.0.2] - 2024-07-12 ### Fixed diff --git a/src/Pool/ManagedConnection.php b/src/Pool/ManagedConnection.php index 61e5d6c..02b20c6 100644 --- a/src/Pool/ManagedConnection.php +++ b/src/Pool/ManagedConnection.php @@ -4,6 +4,7 @@ namespace Phlib\Beanstalk\Pool; +use Phlib\Beanstalk\Connection; use Phlib\Beanstalk\ConnectionInterface; use Phlib\Beanstalk\Exception\RuntimeException; use Psr\Log\LoggerInterface; @@ -17,11 +18,19 @@ class ManagedConnection implements ConnectionInterface { private ConnectionInterface $connection; + private LoggerInterface $logger; + private int $retryDelay; private int $retryAt; - private LoggerInterface $logger; + private bool $hasFailed = false; + + private string $using = Connection::DEFAULT_TUBE; + + private array $watching = [Connection::DEFAULT_TUBE => true]; + + private array $ignoring = []; public function __construct( ConnectionInterface $connection, @@ -54,6 +63,8 @@ public function disconnect(): bool public function useTube(string $tube): void { + $this->using = $tube; + $this->tryCommand( fn() => $this->connection->useTube($tube) ); @@ -105,6 +116,9 @@ public function delete($id): void public function watch(string $tube): int { + unset($this->ignoring[$tube]); + $this->watching[$tube] = true; + return $this->tryCommand( fn() => $this->connection->watch($tube) ); @@ -112,6 +126,9 @@ public function watch(string $tube): int public function ignore(string $tube): int { + unset($this->watching[$tube]); + $this->ignoring[$tube] = true; + return $this->tryCommand( fn() => $this->connection->ignore($tube) ); @@ -200,6 +217,11 @@ public function listTubesWatched(): array private function tryCommand(\Closure $commandFn) { try { + if ($this->hasFailed) { + // Trying to reconnect; need to replay tube selections + $this->replayTubes(); + } + $result = $commandFn(); $this->reset(); @@ -217,13 +239,38 @@ public function isAvailable(): bool return !isset($this->retryAt) || $this->retryAt <= time(); } + private function replayTubes(): void + { + $this->logger->debug( + sprintf('Replay tube selections for connection \'%s\'', $this->getName()), + [ + 'connectionName' => $this->getName(), + 'using' => $this->using, + 'watching' => array_keys($this->watching), + 'ignoring' => array_keys($this->ignoring), + ] + ); + + $this->connection->useTube($this->using); + + foreach (array_keys($this->watching) as $watchTube) { + $this->connection->watch($watchTube); + } + + foreach (array_keys($this->ignoring) as $ignoreTube) { + $this->connection->ignore($ignoreTube); + } + } + private function reset(): void { unset($this->retryAt); + $this->hasFailed = false; } private function delay(): void { + $this->hasFailed = true; $this->retryAt = time() + $this->retryDelay; $this->logger->notice( sprintf('Connection \'%s\' failed; delay for %ds', $this->getName(), $this->retryDelay), diff --git a/tests/Pool/ManagedConnectionTest.php b/tests/Pool/ManagedConnectionTest.php index 75a6dab..c51d40a 100644 --- a/tests/Pool/ManagedConnectionTest.php +++ b/tests/Pool/ManagedConnectionTest.php @@ -5,6 +5,7 @@ namespace Phlib\Beanstalk\Pool; use ColinODell\PsrTestLogger\TestLogger; +use Phlib\Beanstalk\Connection; use Phlib\Beanstalk\ConnectionInterface; use Phlib\Beanstalk\Exception\RuntimeException; use phpmock\phpunit\PHPMock; @@ -208,4 +209,97 @@ public function dataCommandPassThrough(): iterable yield $command => [$command, $map]; } } + + public function testConnectionGetsRestored(): void + { + $useTube = sha1(uniqid('use')); + $watchTube = sha1(uniqid('watch')); + $ignoreTube = Connection::DEFAULT_TUBE; + $jobId = rand(); + + $releaseCalled = false; + + $this->connection->expects(static::exactly(2)) + ->method('useTube') + ->with($useTube) + ->willReturnCallback(function () use (&$releaseCalled) { + if ($releaseCalled) { + // This should be called its second time to re-select, before the primary command is called + self::fail('Command was called before connection was reinitialised'); + } + }); + + $this->connection->expects(static::exactly(2)) + ->method('watch') + ->with($watchTube) + ->willReturnCallback(function () use (&$releaseCalled) { + if ($releaseCalled) { + // This should be called its second time to re-select, before the primary command is called + self::fail('Command was called before connection was reinitialised'); + } + return 1; + }); + + $this->connection->expects(static::exactly(2)) + ->method('ignore') + ->with($ignoreTube) + ->willReturnCallback(function () use (&$releaseCalled) { + if ($releaseCalled) { + // This should be called its second time to re-select, before the primary command is called + self::fail('Command was called before connection was reinitialised'); + } + return 1; + }); + + // Force a connection error so it is treated as unavailable + $this->connection->expects(static::once()) + ->method('stats') + ->willThrowException(new RuntimeException('connection error')); + + // Command should only be called on the connection after the tube selections are replayed + $this->connection->expects(static::once()) + ->method('release') + ->willReturnCallback(function (int $withJobId) use (&$releaseCalled, $jobId): void { + $releaseCalled = true; + static::assertSame($jobId, $withJobId); + }); + + $logger = new TestLogger(); + + $managed = new ManagedConnection($this->connection, 0, $logger); + + // Make the tube selections that should be repeated after failure + $managed->useTube($useTube); + $managed->watch($watchTube); + $managed->ignore($ignoreTube); + + try { + // Trigger the connection error + $managed->stats(); + } catch (RuntimeException $e) { + // ignore + } + + // Call the command that should trigger the reinitialisation + $managed->release($jobId); + + // Expected retry log + $logMsg = sprintf( + 'Replay tube selections for connection \'%s\'', + $this->connectionName, + ); + $logCtxt = [ + 'connectionName' => $this->connectionName, + 'using' => $useTube, + 'watching' => [$watchTube], + 'ignoring' => [$ignoreTube], + ]; + static::assertCount(2, $logger->records); + $log = $logger->records[1]; + static::assertSame(LogLevel::DEBUG, $log['level']); + static::assertSame($logMsg, $log['message']); + foreach ($logCtxt as $key => $expectedCtxt) { + static::assertSame($expectedCtxt, $log['context'][$key]); + } + } }