From 641859a1293e2eaf8197d7805c0760ea2c9a8984 Mon Sep 17 00:00:00 2001 From: Luke Richards Date: Fri, 23 Dec 2016 14:53:52 +0000 Subject: [PATCH 1/2] Replay tube selections when resetting a ManagedConnection --- CHANGELOG.md | 2 + src/Pool/ManagedConnection.php | 43 +++++++++++++++- tests/Pool/ManagedConnectionTest.php | 73 ++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 423d7de..5142ae0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ The backward compatibility promise has the following exceptions: * Units of code that are annotated with `@internal`. ## [Unreleased] +### Added +- Replay tube selections when reconnecting a failed Pool connection. ## [3.0.1] - 2023-10-14 ### Changed diff --git a/src/Pool/ManagedConnection.php b/src/Pool/ManagedConnection.php index 1599173..da6bb95 100644 --- a/src/Pool/ManagedConnection.php +++ b/src/Pool/ManagedConnection.php @@ -4,6 +4,7 @@ namespace Phlib\Beanstalk\Pool; +use Phlib\Beanstalk\Connection; use Phlib\Beanstalk\ConnectionInterface; use Phlib\Beanstalk\Exception\RuntimeException; use Psr\Log\LoggerInterface; @@ -17,11 +18,19 @@ class ManagedConnection implements ConnectionInterface { private ConnectionInterface $connection; + private LoggerInterface $logger; + private int $retryDelay; private int $retryAt; - private LoggerInterface $logger; + private bool $hasFailed = false; + + private string $using = Connection::DEFAULT_TUBE; + + private array $watching = [Connection::DEFAULT_TUBE => true]; + + private array $ignoring = []; public function __construct( ConnectionInterface $connection, @@ -54,7 +63,11 @@ public function disconnect(): bool public function useTube(string $tube): void { - $this->connection->useTube($tube); + $this->using = $tube; + + $this->tryCommand( + fn() => $this->connection->useTube($tube) + ); } public function put( @@ -103,6 +116,9 @@ public function delete($id): void public function watch(string $tube): int { + unset($this->ignoring[$tube]); + $this->watching[$tube] = true; + return $this->tryCommand( fn() => $this->connection->watch($tube) ); @@ -110,6 +126,9 @@ public function watch(string $tube): int public function ignore(string $tube): int { + unset($this->watching[$tube]); + $this->ignoring[$tube] = true; + return $this->tryCommand( fn() => $this->connection->ignore($tube) ); @@ -198,6 +217,11 @@ public function listTubesWatched(): array private function tryCommand(\Closure $commandFn) { try { + if ($this->hasFailed) { + // Trying to reconnect; need to replay tube selections + $this->replayTubes(); + } + $result = $commandFn(); $this->reset(); @@ -215,13 +239,28 @@ public function isAvailable(): bool return !isset($this->retryAt) || $this->retryAt <= time(); } + private function replayTubes(): void + { + $this->connection->useTube($this->using); + + foreach (array_keys($this->watching) as $watchTube) { + $this->connection->watch($watchTube); + } + + foreach (array_keys($this->ignoring) as $ignoreTube) { + $this->connection->ignore($ignoreTube); + } + } + private function reset(): void { unset($this->retryAt); + $this->hasFailed = false; } private function delay(): void { + $this->hasFailed = true; $this->retryAt = time() + $this->retryDelay; $this->logger->notice( sprintf('Connection \'%s\' failed; delay for %ds', $this->getName(), $this->retryDelay), diff --git a/tests/Pool/ManagedConnectionTest.php b/tests/Pool/ManagedConnectionTest.php index 75a6dab..1c6e979 100644 --- a/tests/Pool/ManagedConnectionTest.php +++ b/tests/Pool/ManagedConnectionTest.php @@ -5,6 +5,7 @@ namespace Phlib\Beanstalk\Pool; use ColinODell\PsrTestLogger\TestLogger; +use Phlib\Beanstalk\Connection; use Phlib\Beanstalk\ConnectionInterface; use Phlib\Beanstalk\Exception\RuntimeException; use phpmock\phpunit\PHPMock; @@ -208,4 +209,76 @@ public function dataCommandPassThrough(): iterable yield $command => [$command, $map]; } } + + public function testConnectionGetsRestored(): void + { + $useTube = sha1(uniqid('use')); + $watchTube = sha1(uniqid('watch')); + $ignoreTube = Connection::DEFAULT_TUBE; + $jobId = rand(); + + $releaseCalled = false; + + $this->connection->expects(static::exactly(2)) + ->method('useTube') + ->with($useTube) + ->willReturnCallback(function () use (&$releaseCalled) { + if ($releaseCalled) { + // This should be called its second time to re-select, before the primary command is called + self::fail('Command was called before connection was reinitialised'); + } + }); + + $this->connection->expects(static::exactly(2)) + ->method('watch') + ->with($watchTube) + ->willReturnCallback(function () use (&$releaseCalled) { + if ($releaseCalled) { + // This should be called its second time to re-select, before the primary command is called + self::fail('Command was called before connection was reinitialised'); + } + return 1; + }); + + $this->connection->expects(static::exactly(2)) + ->method('ignore') + ->with($ignoreTube) + ->willReturnCallback(function () use (&$releaseCalled) { + if ($releaseCalled) { + // This should be called its second time to re-select, before the primary command is called + self::fail('Command was called before connection was reinitialised'); + } + return 1; + }); + + // Force a connection error so it is treated as unavailable + $this->connection->expects(static::once()) + ->method('stats') + ->willThrowException(new RuntimeException('connection error')); + + // Command should only be called on the connection after the tube selections are replayed + $this->connection->expects(static::once()) + ->method('release') + ->willReturnCallback(function (int $withJobId) use (&$releaseCalled, $jobId): void { + $releaseCalled = true; + static::assertSame($jobId, $withJobId); + }); + + $managed = new ManagedConnection($this->connection, 0); + + // Make the tube selections that should be repeated after failure + $managed->useTube($useTube); + $managed->watch($watchTube); + $managed->ignore($ignoreTube); + + try { + // Trigger the connection error + $managed->stats(); + } catch (RuntimeException $e) { + // ignore + } + + // Call the command that should trigger the reinitialisation + $managed->release($jobId); + } } From 1561478fc7b881036cc1a62cd1b5df67842f8e9c Mon Sep 17 00:00:00 2001 From: Chris Minett <1084019+chrisminett@users.noreply.github.com> Date: Sun, 20 Aug 2023 09:05:17 +0100 Subject: [PATCH 2/2] Add debug log when failed Pool connection retries --- src/Pool/ManagedConnection.php | 10 ++++++++++ tests/Pool/ManagedConnectionTest.php | 23 ++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/Pool/ManagedConnection.php b/src/Pool/ManagedConnection.php index da6bb95..02b20c6 100644 --- a/src/Pool/ManagedConnection.php +++ b/src/Pool/ManagedConnection.php @@ -241,6 +241,16 @@ public function isAvailable(): bool private function replayTubes(): void { + $this->logger->debug( + sprintf('Replay tube selections for connection \'%s\'', $this->getName()), + [ + 'connectionName' => $this->getName(), + 'using' => $this->using, + 'watching' => array_keys($this->watching), + 'ignoring' => array_keys($this->ignoring), + ] + ); + $this->connection->useTube($this->using); foreach (array_keys($this->watching) as $watchTube) { diff --git a/tests/Pool/ManagedConnectionTest.php b/tests/Pool/ManagedConnectionTest.php index 1c6e979..c51d40a 100644 --- a/tests/Pool/ManagedConnectionTest.php +++ b/tests/Pool/ManagedConnectionTest.php @@ -264,7 +264,9 @@ public function testConnectionGetsRestored(): void static::assertSame($jobId, $withJobId); }); - $managed = new ManagedConnection($this->connection, 0); + $logger = new TestLogger(); + + $managed = new ManagedConnection($this->connection, 0, $logger); // Make the tube selections that should be repeated after failure $managed->useTube($useTube); @@ -280,5 +282,24 @@ public function testConnectionGetsRestored(): void // Call the command that should trigger the reinitialisation $managed->release($jobId); + + // Expected retry log + $logMsg = sprintf( + 'Replay tube selections for connection \'%s\'', + $this->connectionName, + ); + $logCtxt = [ + 'connectionName' => $this->connectionName, + 'using' => $useTube, + 'watching' => [$watchTube], + 'ignoring' => [$ignoreTube], + ]; + static::assertCount(2, $logger->records); + $log = $logger->records[1]; + static::assertSame(LogLevel::DEBUG, $log['level']); + static::assertSame($logMsg, $log['message']); + foreach ($logCtxt as $key => $expectedCtxt) { + static::assertSame($expectedCtxt, $log['context'][$key]); + } } }