From 40eafd956de9d2dc8452e0cba26f43c3bd0c8677 Mon Sep 17 00:00:00 2001 From: kobayu858 Date: Mon, 6 Oct 2025 17:45:51 +0900 Subject: [PATCH 1/4] fix: lost wakeup Signed-off-by: kobayu858 --- awkernel_async_lib/src/pubsub.rs | 73 +++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 11 deletions(-) diff --git a/awkernel_async_lib/src/pubsub.rs b/awkernel_async_lib/src/pubsub.rs index d6d956537..7e807afe9 100644 --- a/awkernel_async_lib/src/pubsub.rs +++ b/awkernel_async_lib/src/pubsub.rs @@ -155,17 +155,7 @@ impl Future for Receiver<'_, T> { self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll { - let data = self.subscriber.try_recv(); - - if let Some(data) = data { - Poll::Ready(data) - } else { - let mut node = MCSNode::new(); - let mut inner = self.subscriber.inner.lock(&mut node); - inner.waker_subscriber = Some(cx.waker().clone()); - - Poll::Pending - } + self.subscriber.poll_recv(cx) } } @@ -176,6 +166,27 @@ impl Subscriber { receiver.await } + pub(super) fn poll_recv(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + let mut node = MCSNode::new(); + let mut inner = self.inner.lock(&mut node); + + inner.garbage_collect(&self.subscribers.attribute.lifespan); + + if let Some(data) = inner.queue.pop() { + for _ in 0..inner.queue.queue_size() - inner.queue.len() { + if let Some(waker) = inner.waker_publishers.pop_front() { + waker.wake(); + } else { + break; + } + } + core::task::Poll::Ready(data) + } else { + inner.waker_subscriber = Some(cx.waker().clone()); + core::task::Poll::Pending + } + } + /// Non-blocking data receive. /// If there is no data, return `None`. pub fn try_recv(&self) -> Option> { @@ -787,6 +798,11 @@ impl_tuple_to_pub_sub!(); impl_tuple_to_pub_sub!(A); impl_tuple_to_pub_sub!(A, B); impl_tuple_to_pub_sub!(A, B, C); +impl_tuple_to_pub_sub!(T0, T1, T2, T3); +impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4); +impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4, T5); +impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4, T5, T6); +impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4, T5, T6, T7); macro_rules! impl_async_receiver_for_tuple { () => { @@ -838,6 +854,41 @@ impl_async_receiver_for_tuple!(); impl_async_receiver_for_tuple!((A, a, p)); impl_async_receiver_for_tuple!((A, a, p), (B, b, q)); impl_async_receiver_for_tuple!((A, a, p), (B, b, q), (C, c, r)); +impl_async_receiver_for_tuple!((T0, v0, p0), (T1, v1, p1), (T2, v2, p2), (T3, v3, p3)); +impl_async_receiver_for_tuple!( + (T0, v0, p0), + (T1, v1, p1), + (T2, v2, p2), + (T3, v3, p3), + (T4, v4, p4) +); +impl_async_receiver_for_tuple!( + (T0, v0, p0), + (T1, v1, p1), + (T2, v2, p2), + (T3, v3, p3), + (T4, v4, p4), + (T5, v5, p5) +); +impl_async_receiver_for_tuple!( + (T0, v0, p0), + (T1, v1, p1), + (T2, v2, p2), + (T3, v3, p3), + (T4, v4, p4), + (T5, v5, p5), + (T6, v6, p6) +); +impl_async_receiver_for_tuple!( + (T0, v0, p0), + (T1, v1, p1), + (T2, v2, p2), + (T3, v3, p3), + (T4, v4, p4), + (T5, v5, p5), + (T6, v6, p6), + (T7, v7, p7) +); #[cfg(test)] mod tests { From ff7f986eee252864ff92c9149106f4a829c8c898 Mon Sep 17 00:00:00 2001 From: kobayu858 Date: Tue, 7 Oct 2025 12:20:24 +0900 Subject: [PATCH 2/4] refactor: revert pubsub num Signed-off-by: kobayu858 --- awkernel_async_lib/src/pubsub.rs | 40 -------------------------------- 1 file changed, 40 deletions(-) diff --git a/awkernel_async_lib/src/pubsub.rs b/awkernel_async_lib/src/pubsub.rs index 7e807afe9..dfddb636e 100644 --- a/awkernel_async_lib/src/pubsub.rs +++ b/awkernel_async_lib/src/pubsub.rs @@ -798,11 +798,6 @@ impl_tuple_to_pub_sub!(); impl_tuple_to_pub_sub!(A); impl_tuple_to_pub_sub!(A, B); impl_tuple_to_pub_sub!(A, B, C); -impl_tuple_to_pub_sub!(T0, T1, T2, T3); -impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4); -impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4, T5); -impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4, T5, T6); -impl_tuple_to_pub_sub!(T0, T1, T2, T3, T4, T5, T6, T7); macro_rules! impl_async_receiver_for_tuple { () => { @@ -854,41 +849,6 @@ impl_async_receiver_for_tuple!(); impl_async_receiver_for_tuple!((A, a, p)); impl_async_receiver_for_tuple!((A, a, p), (B, b, q)); impl_async_receiver_for_tuple!((A, a, p), (B, b, q), (C, c, r)); -impl_async_receiver_for_tuple!((T0, v0, p0), (T1, v1, p1), (T2, v2, p2), (T3, v3, p3)); -impl_async_receiver_for_tuple!( - (T0, v0, p0), - (T1, v1, p1), - (T2, v2, p2), - (T3, v3, p3), - (T4, v4, p4) -); -impl_async_receiver_for_tuple!( - (T0, v0, p0), - (T1, v1, p1), - (T2, v2, p2), - (T3, v3, p3), - (T4, v4, p4), - (T5, v5, p5) -); -impl_async_receiver_for_tuple!( - (T0, v0, p0), - (T1, v1, p1), - (T2, v2, p2), - (T3, v3, p3), - (T4, v4, p4), - (T5, v5, p5), - (T6, v6, p6) -); -impl_async_receiver_for_tuple!( - (T0, v0, p0), - (T1, v1, p1), - (T2, v2, p2), - (T3, v3, p3), - (T4, v4, p4), - (T5, v5, p5), - (T6, v6, p6), - (T7, v7, p7) -); #[cfg(test)] mod tests { From 0c66bbffb0cfa556922c5a8a466d44061f874ede Mon Sep 17 00:00:00 2001 From: kobayu858 Date: Tue, 7 Oct 2025 13:32:23 +0900 Subject: [PATCH 3/4] refactor: rename Signed-off-by: kobayu858 --- awkernel_async_lib/src/pubsub.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/awkernel_async_lib/src/pubsub.rs b/awkernel_async_lib/src/pubsub.rs index dfddb636e..c7a2a61e4 100644 --- a/awkernel_async_lib/src/pubsub.rs +++ b/awkernel_async_lib/src/pubsub.rs @@ -155,7 +155,7 @@ impl Future for Receiver<'_, T> { self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll { - self.subscriber.poll_recv(cx) + self.subscriber.recv_or_register_waker(cx) } } @@ -166,7 +166,10 @@ impl Subscriber { receiver.await } - pub(super) fn poll_recv(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + pub(super) fn recv_or_register_waker( + &self, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { let mut node = MCSNode::new(); let mut inner = self.inner.lock(&mut node); From 9127f37b1bf04475568b1d7b89be30a0c1c73f64 Mon Sep 17 00:00:00 2001 From: kobayu858 Date: Wed, 8 Oct 2025 14:25:34 +0900 Subject: [PATCH 4/4] docs: explain lost wakeup Signed-off-by: kobayu858 --- awkernel_async_lib/src/pubsub.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/awkernel_async_lib/src/pubsub.rs b/awkernel_async_lib/src/pubsub.rs index c7a2a61e4..c216bae0b 100644 --- a/awkernel_async_lib/src/pubsub.rs +++ b/awkernel_async_lib/src/pubsub.rs @@ -166,6 +166,30 @@ impl Subscriber { receiver.await } + /// This function is designed to prevent a race condition known as the "lost wakeup". + /// + /// # The Lost Wakeup Problem + /// + /// A implementation that first checks for data with `try_recv()` and then acquires + /// a lock to register the waker is vulnerable to the following race condition: + /// + /// 1. **Receiver:** Calls `try_recv()` and finds that the queue is empty. + /// 2. **Sender:** Pushes data to the queue. It then checks for a waker, but finds + /// `None` because the receiver has not registered one yet. The sender proceeds + /// without waking anything up. + /// 3. **Receiver:** Acquires the lock, registers its waker, and then returns + /// `Poll::Pending` to go to sleep. + /// + /// As a result, the receiver is asleep while there is data in the queue. The wakeup + /// from the sender has been "lost," and the receiver will not be notified until + /// new data is sent. + /// + /// # Solution + /// + /// To prevent this, this function performs both the data check and the waker + /// registration within the same critical section. + /// This ensures that a sender cannot push data between the check and the + /// registration, guaranteeing that no wakeup is lost. pub(super) fn recv_or_register_waker( &self, cx: &mut core::task::Context<'_>,