Skip to content

Conversation

@klimick
Copy link
Contributor

@klimick klimick commented Dec 4, 2025

Closes #80

$consumedMessages[$delivery->exchange][] = $delivery->message->body;
$delivery->ack();
if (\count($consumedMessages[$delivery->exchange]) === $messageCount) {
if (\count($consumedMessages[$delivery->exchange]) === $messageCount && !$deferred->isComplete()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the changes in the Consumer, a race condition appeared here.

Here is a minimized race condition.
/** @var DeferredFuture<null> */
$deferred = new DeferredFuture();

/** @var \ArrayObject<int, string> */
$consumed = new \ArrayObject();

$callback = static function () use ($consumed, $deferred) {
    $consumed[] = 'elem';

    // Simulates an I/O operation (e.g., $delivery->ack())
    delay(0.1);

    // After resuming, both fibers will see that \count($consumed) === 2
    if (\count($consumed) === 2) {
        // Both fibers attempt to complete the same DeferredFuture
        // Error: the operation is no longer pending
        $deferred->complete();
    }
};

await([
    async($callback),
    async($callback),
    $deferred->getFuture(),
]);

@kafkiansky kafkiansky merged commit 1bb9a73 into 1.0.x Dec 8, 2025
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow handle deliveries concurrently

3 participants