From da893256ae9e430f0a3db7820250bfea49b34584 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Thu, 18 Dec 2025 19:35:16 +0300 Subject: [PATCH 1/3] Implement batch visibility timeout --- src/ConsumeController.php | 20 +++++++----------- src/Queue.php | 7 +++++-- src/pgmq.php | 7 ++++--- tests/PgmqTest.php | 43 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/ConsumeController.php b/src/ConsumeController.php index 1828177..aa2eb68 100644 --- a/src/ConsumeController.php +++ b/src/ConsumeController.php @@ -4,10 +4,8 @@ namespace Thesis\Pgmq; -use Amp\Future; use Amp\Postgres\PostgresTransaction; use Thesis\Time\TimeSpan; -use function Amp\async; /** * @api @@ -41,17 +39,13 @@ public function ack(array $messages): void */ public function nack(array $messages, TimeSpan $delay): void { - $futures = []; - - foreach ($messages as $message) { - $futures[] = async( - $this->queue->setVisibilityTimeout(...), - $message->id, - $delay, - ); - } - - Future\awaitAll($futures); + $this->queue->setVisibilityTimeout( + array_map( + static fn(Message $message): int => $message->id, + $messages, + ), + $delay, + ); } /** diff --git a/src/Queue.php b/src/Queue.php index a52751a..ad464f0 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -180,12 +180,15 @@ public function deleteBatch(array $messageIds): array ); } - public function setVisibilityTimeout(int $messageId, TimeSpan $visibilityTimeout): ?Message + /** + * @param list $messageIds + */ + public function setVisibilityTimeout(array $messageIds, TimeSpan $visibilityTimeout): ?Message { return setVisibilityTimeout( pg: $this->pg, queue: $this->name, - messageId: $messageId, + messageIds: $messageIds, visibilityTimeout: $visibilityTimeout, ); } diff --git a/src/pgmq.php b/src/pgmq.php index 9d1720c..dded62b 100644 --- a/src/pgmq.php +++ b/src/pgmq.php @@ -438,17 +438,18 @@ function deleteBatch( /** * @api * @param non-empty-string $queue + * @param list $messageIds */ function setVisibilityTimeout( PostgresLink $pg, string $queue, - int $messageId, + array $messageIds, TimeSpan $visibilityTimeout, ): ?Message { $row = $pg - ->execute('SELECT * FROM pgmq.set_vt(:queue_name, :msg_id::bigint, :vt::int)', [ + ->execute('SELECT * FROM pgmq.set_vt(:queue_name, :msg_ids::bigint[], :vt::int)', [ 'queue_name' => $queue, - 'msg_id' => $messageId, + 'msg_ids' => $messageIds, 'vt' => $visibilityTimeout->toSeconds(), ]) ->fetchRow(); diff --git a/tests/PgmqTest.php b/tests/PgmqTest.php index e303cd8..8af2b8d 100644 --- a/tests/PgmqTest.php +++ b/tests/PgmqTest.php @@ -324,6 +324,49 @@ static function (array $messages, ConsumeController $ctrl) use (&$consumed): voi self::assertSame(0, $queue->metrics()->length); } + public function testNackBatch(): void + { + $queue = createQueue($this->pg, $this->randomQueueName()); + $messageIds = $queue->sendBatch([ + new SendMessage(self::TESTING_MESSAGE), + new SendMessage(self::TESTING_MESSAGE), + ]); + + self::assertCount(2, $messageIds); + self::assertSame(2, $queue->metrics()->length); + + $count = 0; + + /** @var array> $consumed */ + $consumed = []; + + $consumer = createConsumer($this->pg); + $context = $consumer->consume( + static function (array $messages, ConsumeController $ctrl) use (&$consumed, &$count): void { + /** @var Message $message */ + foreach ($messages as $message) { + $consumed[$message->id][] = $message->value; + ++$count; + } + + if ($count === 2) { + $ctrl->nack($messages, TimeSpan::fromSeconds(1)); + } elseif ($count > 2) { + $ctrl->ack($messages); + $ctrl->stop(); + } + }, + new ConsumeConfig($queue->name, pollInterval: TimeSpan::fromMilliseconds(500)), + ); + + $context->awaitCompletion(); + + self::assertCount(2, $consumed); + self::assertEquals($messageIds, array_keys($consumed)); + self::assertEquals([[self::TESTING_MESSAGE, self::TESTING_MESSAGE], [self::TESTING_MESSAGE, self::TESTING_MESSAGE]], array_values($consumed)); + self::assertSame(0, $queue->metrics()->length); + } + public function testStopConsumeOnUnhandledException(): void { $queue = createQueue($this->pg, $this->randomQueueName()); From c6de86207365814d6d816d905c29868236065d1b Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Thu, 18 Dec 2025 19:37:16 +0300 Subject: [PATCH 2/3] chore: fix github workflow --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8e2cef3..691373c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,7 +19,7 @@ jobs: --name pgmq-postgres \ -p 5432:5432 \ -e POSTGRES_PASSWORD=postgres \ - ghcr.io/pgmq/pg18-pgmq:v1.7.0 + ghcr.io/pgmq/pg18-pgmq:v1.8.0 - uses: shivammathur/setup-php@v2 with: php-version: ${{ matrix.php }} From 28b4f9e46ceacf487c2e51a16cf524a5ab2198a1 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Thu, 18 Dec 2025 19:37:20 +0300 Subject: [PATCH 3/3] chore: fix README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3148599..fbbf4cc 100644 --- a/README.md +++ b/README.md @@ -621,7 +621,7 @@ trapSignal([\SIGINT, \SIGTERM]) $consumer->stop(); $context->awaitCompletion(); ``` -\ + ## License The MIT License (MIT). Please see [License File](LICENSE) for more information.