Skip to content

Conversation

@klimick
Copy link
Contributor

@klimick klimick commented Dec 21, 2025

Current consumer behaviour with listenForInserts: true gives the impression that polling is no longer required.
Actually, consumer may get stuck and not process messages that are already in the queue.

/** @var PostgresConnectionPool $pg */

$queue = createQueue($pg, 'test_queue');
$queue->send(new SendMessage(valueJson: '"begin"'));
$queue->send(new SendMessage(valueJson: '"stop"'));

$config = new ConsumeConfig(
    queue: $queue->name,
    batch: 1,
    pollInterval: TimeSpan::fromSeconds(0),
    listenForInserts: true,
);

$consumer = createConsumer($pg);

// Due to `batch: 1`, only the "begin" message will be consumed.
// The "stop" message will remain in the queue until a new INSERT occurs
// or the consumer is restarted.
$ctx = $consumer->consume(
    handler: static function (array $messages, ConsumeController $ctx) {
        foreach ($messages as $message) {
            if ($message->value === '"stop"') {
                $ctx->stop();
            }
        }

        $ctx->ack($messages);
    },
    config: $config,
);

$ctx->awaitCompletion();

@klimick klimick requested a review from kafkiansky December 21, 2025 09:24
@klimick klimick changed the title Make pooling required Make polling required Dec 21, 2025
@kafkiansky kafkiansky merged commit 9b45f8b into 0.1.x Dec 21, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants