diff --git a/awkernel_async_lib/src/pubsub.rs b/awkernel_async_lib/src/pubsub.rs index d6d956537..c216bae0b 100644 --- a/awkernel_async_lib/src/pubsub.rs +++ b/awkernel_async_lib/src/pubsub.rs @@ -155,17 +155,7 @@ impl Future for Receiver<'_, T> { self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll { - let data = self.subscriber.try_recv(); - - if let Some(data) = data { - Poll::Ready(data) - } else { - let mut node = MCSNode::new(); - let mut inner = self.subscriber.inner.lock(&mut node); - inner.waker_subscriber = Some(cx.waker().clone()); - - Poll::Pending - } + self.subscriber.recv_or_register_waker(cx) } } @@ -176,6 +166,54 @@ impl Subscriber { receiver.await } + /// This function is designed to prevent a race condition known as the "lost wakeup". + /// + /// # The Lost Wakeup Problem + /// + /// A implementation that first checks for data with `try_recv()` and then acquires + /// a lock to register the waker is vulnerable to the following race condition: + /// + /// 1. **Receiver:** Calls `try_recv()` and finds that the queue is empty. + /// 2. **Sender:** Pushes data to the queue. It then checks for a waker, but finds + /// `None` because the receiver has not registered one yet. The sender proceeds + /// without waking anything up. + /// 3. **Receiver:** Acquires the lock, registers its waker, and then returns + /// `Poll::Pending` to go to sleep. + /// + /// As a result, the receiver is asleep while there is data in the queue. The wakeup + /// from the sender has been "lost," and the receiver will not be notified until + /// new data is sent. + /// + /// # Solution + /// + /// To prevent this, this function performs both the data check and the waker + /// registration within the same critical section. + /// This ensures that a sender cannot push data between the check and the + /// registration, guaranteeing that no wakeup is lost. + pub(super) fn recv_or_register_waker( + &self, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let mut node = MCSNode::new(); + let mut inner = self.inner.lock(&mut node); + + inner.garbage_collect(&self.subscribers.attribute.lifespan); + + if let Some(data) = inner.queue.pop() { + for _ in 0..inner.queue.queue_size() - inner.queue.len() { + if let Some(waker) = inner.waker_publishers.pop_front() { + waker.wake(); + } else { + break; + } + } + core::task::Poll::Ready(data) + } else { + inner.waker_subscriber = Some(cx.waker().clone()); + core::task::Poll::Pending + } + } + /// Non-blocking data receive. /// If there is no data, return `None`. pub fn try_recv(&self) -> Option> {