From 9349eab3bedc0359c5f0c9b2f0fbb9473f8f330d Mon Sep 17 00:00:00 2001 From: Steven Malis Date: Wed, 17 Dec 2025 16:25:18 -0500 Subject: [PATCH 1/2] EXPERIMENT: mesh/pal: Use async_channel instead of futures::channel --- Cargo.lock | 4 +- Cargo.toml | 1 - support/mesh/mesh_node/Cargo.toml | 2 +- support/mesh/mesh_node/src/local_node.rs | 23 ++++----- support/mesh/mesh_remote/Cargo.toml | 1 + support/mesh/mesh_remote/src/alpc_node.rs | 21 ++++---- support/mesh/mesh_remote/src/unix_node.rs | 56 ++++++++++----------- support/pal/pal_async/src/executor_tests.rs | 24 ++++----- support/pal/pal_uring/Cargo.toml | 1 + support/pal/pal_uring/src/threadpool.rs | 17 +++---- 10 files changed, 69 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c38e924e6..3ba64dd719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3990,9 +3990,9 @@ dependencies = [ name = "mesh_node" version = "0.0.0" dependencies = [ + "async-channel", "bitfield-struct 0.11.0", "futures", - "futures-channel", "getrandom 0.3.3", "mesh_derive", "mesh_protobuf", @@ -4047,6 +4047,7 @@ dependencies = [ name = "mesh_remote" version = "0.0.0" dependencies = [ + "async-channel", "event-listener", "futures", "futures-concurrency", @@ -5432,6 +5433,7 @@ dependencies = [ name = "pal_uring" version = "0.0.0" dependencies = [ + "async-channel", "futures", "inspect", "io-uring", diff --git a/Cargo.toml b/Cargo.toml index fc8f7488b9..e5f813ef15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -441,7 +441,6 @@ futures = "0.3.31" futures-concurrency = "7.6.3" futures-executor = "0.3" futures-core = "0.3" -futures-channel = "0.3" futures-io = "0.3" gdbstub = "0.6" gdbstub_arch = "0.2" diff --git a/support/mesh/mesh_node/Cargo.toml b/support/mesh/mesh_node/Cargo.toml index 5bf3bc8114..083f68b892 100644 --- a/support/mesh/mesh_node/Cargo.toml +++ b/support/mesh/mesh_node/Cargo.toml @@ -11,8 +11,8 @@ mesh_derive.workspace = true mesh_protobuf.workspace = true open_enum.workspace = true +async-channel.workspace = true bitfield-struct.workspace = true -futures-channel.workspace = true getrandom.workspace = true parking_lot.workspace = true thiserror.workspace = true diff --git a/support/mesh/mesh_node/src/local_node.rs b/support/mesh/mesh_node/src/local_node.rs index 93338220df..1de9841d54 100644 --- a/support/mesh/mesh_node/src/local_node.rs +++ b/support/mesh/mesh_node/src/local_node.rs @@ -10,7 +10,6 @@ use crate::message::Message; use crate::message::OwnedMessage; use crate::resource::OsResource; use crate::resource::Resource; -use futures_channel::oneshot; use mesh_protobuf::DefaultEncoding; use mesh_protobuf::buffer::Buf; use mesh_protobuf::buffer::Buffer; @@ -1479,8 +1478,9 @@ impl PortInner { let shutdown = state.shutdown.take(); drop(state); // Trace outside the lock to avoid deadlocks. - if shutdown.is_some() { + if let Some(shutdown) = shutdown { tracing::trace!(node = ?local_node.id, "waking shutdown waiter"); + shutdown.close(); } } } @@ -1627,7 +1627,7 @@ impl Clone for RemoteNodeHandle { struct LocalNodeState { ports: HashMap>, nodes: HashMap>, - shutdown: Option>, + shutdown: Option>, } /// The deserialized event for processing by a local port. @@ -1891,8 +1891,7 @@ impl LocalNode { /// process of being sent to another node. pub async fn wait_for_ports(&self, all_ports: bool) { loop { - #[allow(clippy::disallowed_methods)] // TODO - let (send, recv) = oneshot::channel::<()>(); + let (send, recv) = async_channel::bounded::<()>(1); let ports: Vec<_> = { let mut state = self.inner.state.lock(); state.shutdown = Some(send); @@ -1921,7 +1920,7 @@ impl LocalNode { return; } tracing::trace!(node = ?self.id(), count = left, "waiting for ports"); - let _ = recv.await; + let _ = recv.recv().await; } } @@ -2474,7 +2473,7 @@ pub mod tests { struct RemoteLocalNode { _task: Task<()>, node: Arc, - send: futures_channel::mpsc::UnboundedSender, + send: async_channel::Sender, } struct RemoteEvent { @@ -2492,11 +2491,7 @@ pub mod tests { impl RemoteLocalNode { fn new(driver: &impl Spawn) -> Self { - #[expect( - clippy::disallowed_methods, - reason = "can't use mesh channels from mesh_node" - )] - let (send, recv) = futures_channel::mpsc::unbounded::(); + let (send, recv) = async_channel::unbounded::(); let node = Arc::new(LocalNode::with_id(NodeId::new(), Box::new(NullConnect))); let task = driver.spawn("test", { let node = node.clone(); @@ -2526,7 +2521,7 @@ pub mod tests { struct EventsFrom { node_id: NodeId, - send: futures_channel::mpsc::UnboundedSender, + send: async_channel::Sender, } impl SendEvent for EventsFrom { @@ -2535,7 +2530,7 @@ pub mod tests { let mut os_resources = Vec::new(); event.write_to(&mut buffer, &mut os_resources); self.send - .unbounded_send(RemoteEvent { + .send_blocking(RemoteEvent { node_id: self.node_id, data: buffer, resources: os_resources, diff --git a/support/mesh/mesh_remote/Cargo.toml b/support/mesh/mesh_remote/Cargo.toml index 7619b8409c..036cb73490 100644 --- a/support/mesh/mesh_remote/Cargo.toml +++ b/support/mesh/mesh_remote/Cargo.toml @@ -16,6 +16,7 @@ pal_async.workspace = true tracing_helpers.workspace = true unix_socket = { workspace = true, features = ["mesh"] } +async-channel.workspace = true futures.workspace = true futures-concurrency.workspace = true parking_lot.workspace = true diff --git a/support/mesh/mesh_remote/src/alpc_node.rs b/support/mesh/mesh_remote/src/alpc_node.rs index 3c4bc60804..7d2db2ec03 100644 --- a/support/mesh/mesh_remote/src/alpc_node.rs +++ b/support/mesh/mesh_remote/src/alpc_node.rs @@ -11,7 +11,6 @@ use crate::common::InvitationAddress; use crate::protocol; use futures::FutureExt; use futures::StreamExt; -use futures::channel::mpsc; use futures::future::AbortHandle; use futures::future::abortable; use mesh_node::common::Address; @@ -88,7 +87,7 @@ pub struct AlpcNode { recv_abort: AbortHandle, recv_task: Task<()>, connect_task: Task<()>, - connect_send: mpsc::UnboundedSender<(NodeId, RemoteNodeHandle)>, + connect_send: async_channel::Sender<(NodeId, RemoteNodeHandle)>, } trait InvitationDriver: Spawn + Send {} @@ -179,8 +178,7 @@ impl AlpcNode { let port = PolledWait::new(&driver, port)?; let invitations = Default::default(); - #[expect(clippy::disallowed_methods)] // TODO - let (connect_send, connect_recv) = mpsc::unbounded(); + let (connect_send, connect_recv) = async_channel::unbounded(); let local_node = Arc::new(LocalNode::with_id( local_id, Box::new(AlpcConnector { @@ -223,15 +221,16 @@ impl AlpcNode { driver: &(impl ?Sized + Driver), local_id: NodeId, directory: Arc, - mut connect_recv: mpsc::UnboundedReceiver<(NodeId, RemoteNodeHandle)>, + connect_recv: async_channel::Receiver<(NodeId, RemoteNodeHandle)>, ) { let teardowns: Mutex>> = Default::default(); let mut connect_tasks = FuturesUnordered::new(); + let mut recv = std::pin::pin!(connect_recv); loop { // Receive a new request or drive any in-progress connection tasks // forward. let (remote_id, handle) = futures::select! { // merge semantics - msg = connect_recv.next().fuse() => { + msg = recv.next().fuse() => { match msg { Some(msg) => msg, None => break, @@ -498,7 +497,7 @@ impl AlpcNode { /// returns, data loss could occur for other mesh nodes. pub async fn shutdown(self) { self.local_node.wait_for_ports(false).await; - self.connect_send.close_channel(); + self.connect_send.close(); self.connect_task.await; self.recv_abort.abort(); self.recv_task.await; @@ -510,7 +509,7 @@ impl AlpcNode { local_node: Arc, mut port: PolledWait, invitations: InvitationMap, - connect_send: mpsc::UnboundedSender<(NodeId, RemoteNodeHandle)>, + connect_send: async_channel::Sender<(NodeId, RemoteNodeHandle)>, ) -> io::Result<()> { struct Connection { comm: alpc::Port, @@ -595,7 +594,7 @@ impl AlpcNode { // If this is a connection from a node that was invited, // then it's now safe to connect to the node. Send the // deferred connection to the connection task. - let _ = connect_send.unbounded_send((node_id, handle.clone())); + let _ = connect_send.send_blocking((node_id, handle.clone())); handle } else { // Otherwise, establish a connection now to get a @@ -658,13 +657,13 @@ impl AlpcNode { /// Connector for connecting to remote ALPC nodes. #[derive(Debug)] struct AlpcConnector { - connect_send: mpsc::UnboundedSender<(NodeId, RemoteNodeHandle)>, + connect_send: async_channel::Sender<(NodeId, RemoteNodeHandle)>, } impl Connect for AlpcConnector { fn connect(&self, remote_id: NodeId, handle: RemoteNodeHandle) { // Send the handle to the connect task. - let _ = self.connect_send.unbounded_send((remote_id, handle)); + let _ = self.connect_send.send_blocking((remote_id, handle)); } } diff --git a/support/mesh/mesh_remote/src/unix_node.rs b/support/mesh/mesh_remote/src/unix_node.rs index 389f5b75c2..6df8aa3b29 100644 --- a/support/mesh/mesh_remote/src/unix_node.rs +++ b/support/mesh/mesh_remote/src/unix_node.rs @@ -23,7 +23,6 @@ use crate::common::InvitationAddress; use crate::protocol; use futures::FutureExt; use futures::StreamExt; -use futures::channel::mpsc; use futures::future; use futures::future::BoxFuture; use io::ErrorKind; @@ -398,7 +397,7 @@ enum SenderCommand { #[derive(Clone)] struct PacketSender { - send: mpsc::UnboundedSender, + send: async_channel::Sender, socket: Arc, } @@ -435,9 +434,7 @@ impl SendEvent for PacketSender { ) .is_err() { - let _ = self - .send - .unbounded_send(SenderCommand::Send { packet, fds }); + let _ = self.send.send_blocking(SenderCommand::Send { packet, fds }); } } } @@ -494,7 +491,7 @@ impl Drop for PacketSender { fn drop(&mut self) { // Explicitly close the send channel so that the send task returns, even // though the send channel is also in use by the receive task. - self.send.close_channel(); + self.send.close(); } } @@ -506,8 +503,7 @@ fn start_connection( handle: RemoteNodeHandle, socket: UnixSocket, ) { - #[expect(clippy::disallowed_methods)] // TODO - let (send, recv) = mpsc::unbounded(); + let (send, recv) = async_channel::unbounded(); let socket = Arc::new(socket); let sender = PacketSender { send: send.clone(), @@ -533,8 +529,8 @@ fn start_connection( async fn run_connection( local_node: Arc, remote_id: NodeId, - send_send: mpsc::UnboundedSender, - send_recv: mpsc::UnboundedReceiver, + send_send: async_channel::Sender, + send_recv: async_channel::Receiver, socket: Arc, handle: RemoteNodeHandle, ) { @@ -620,7 +616,7 @@ async fn run_receive( local_node: &LocalNode, remote_id: &NodeId, socket: &UnixSocket, - send: &mpsc::UnboundedSender, + send: &async_channel::Sender, ) -> Result<(), ReceiveError> { let mut buf = vec![0; MAX_PACKET_SIZE]; let mut fds = Vec::new(); @@ -631,18 +627,20 @@ async fn run_receive( } if cfg!(target_os = "macos") && !fds.is_empty() { // Tell the opposite endpoint to release the fds it sent. - let _ = send.unbounded_send(SenderCommand::Send { - packet: protocol::ReleaseFds { - header: protocol::PacketHeader { - packet_type: protocol::PacketType::RELEASE_FDS, - ..FromZeros::new_zeroed() - }, - count: fds.len() as u64, - } - .as_bytes() - .to_vec(), - fds: Vec::new(), - }); + let _ = send + .send(SenderCommand::Send { + packet: protocol::ReleaseFds { + header: protocol::PacketHeader { + packet_type: protocol::PacketType::RELEASE_FDS, + ..FromZeros::new_zeroed() + }, + count: fds.len() as u64, + } + .as_bytes() + .to_vec(), + fds: Vec::new(), + }) + .await; } let buf = &buf[..len]; @@ -658,9 +656,11 @@ async fn run_receive( let release_fds = protocol::ReleaseFds::read_from_prefix(buf) .map_err(|_| ReceiveError::BadReleaseFds)? .0; // TODO: zerocopy: map_err (https://github.com/microsoft/openvmm/issues/759) - let _ = send.unbounded_send(SenderCommand::ReleaseFds { - count: release_fds.count as usize, - }); + let _ = send + .send(SenderCommand::ReleaseFds { + count: release_fds.count as usize, + }) + .await; } #[cfg(target_os = "linux")] protocol::PacketType::LARGE_EVENT => { @@ -688,11 +688,11 @@ enum ProtocolError { /// Handles send processing for the socket. async fn run_send( - mut recv: mpsc::UnboundedReceiver, + recv: async_channel::Receiver, socket: &UnixSocket, retained_fds: &mut VecDeque, ) -> io::Result<()> { - while let Some(command) = recv.next().await { + while let Ok(command) = recv.recv().await { match command { SenderCommand::Send { packet, fds } => { match socket.send(&packet, &fds).await { diff --git a/support/pal/pal_async/src/executor_tests.rs b/support/pal/pal_async/src/executor_tests.rs index 5dd9eb277b..0c16424594 100644 --- a/support/pal/pal_async/src/executor_tests.rs +++ b/support/pal/pal_async/src/executor_tests.rs @@ -3,9 +3,6 @@ //! Tests common to every executor. -// Uses futures channels, but is only test code. -#![expect(clippy::disallowed_methods)] - use crate::driver::Driver; use crate::socket::PolledSocket; use crate::task::Spawn; @@ -14,7 +11,6 @@ use crate::timer::Instant; use futures::AsyncReadExt; use futures::AsyncWriteExt; use futures::FutureExt; -use futures::channel::oneshot; use futures::executor::block_on; use pal_event::Event; use parking_lot::Mutex; @@ -31,12 +27,12 @@ use unix_socket::UnixStream; /// Runs waker-related tests. pub async fn waker_tests() { - let (send, recv) = oneshot::channel(); - std::thread::spawn(|| { + let (send, recv) = async_channel::bounded(1); + std::thread::spawn(move || { std::thread::sleep(Duration::from_millis(100)); - send.send(()).unwrap(); + send.send_blocking(()).unwrap(); }); - recv.await.unwrap(); + recv.recv().await.unwrap(); } /// Runs spawn-related tests. @@ -75,8 +71,8 @@ where { let (spawn, run) = f(); let t = std::thread::spawn(run); - let (send, recv) = oneshot::channel::<()>(); - let mut h = spawn.spawn("pending", recv); + let (send, recv) = async_channel::bounded::<()>(1); + let mut h = spawn.spawn("pending", async move { recv.recv().await }); drop(spawn); std::thread::sleep(Duration::from_millis(100)); assert!((&mut h).now_or_never().is_none()); @@ -100,18 +96,18 @@ pub async fn sleep_tests(driver: impl Driver) { timer .lock() .set_deadline(started + Duration::from_secs(1000)); - let (send, mut recv) = oneshot::channel(); + let (send, recv) = async_channel::bounded(1); std::thread::spawn({ let timer = timer.clone(); move || { let now = block_on(poll_fn(|cx| timer.lock().poll_timer(cx, None))); - send.send(now).unwrap(); + send.send_blocking(now).unwrap(); } }); std::thread::sleep(Duration::from_millis(100)); - assert!((&mut recv).now_or_never().is_none()); + assert!(recv.recv().now_or_never().is_none()); timer.lock().set_deadline(started + duration); - let done_at = recv.await.unwrap(); + let done_at = recv.recv().await.unwrap(); let now = Instant::now(); assert!(done_at >= started + duration); assert!(done_at <= now); diff --git a/support/pal/pal_uring/Cargo.toml b/support/pal/pal_uring/Cargo.toml index 6c5a98694f..dbaf8ddb79 100644 --- a/support/pal/pal_uring/Cargo.toml +++ b/support/pal/pal_uring/Cargo.toml @@ -25,6 +25,7 @@ smallbox.workspace = true tracing.workspace = true [target.'cfg(target_os = "linux")'.dev-dependencies] +async-channel.workspace = true pal_async = { workspace = true, features = ["tests"] } once_cell.workspace = true diff --git a/support/pal/pal_uring/src/threadpool.rs b/support/pal/pal_uring/src/threadpool.rs index 47bae717c9..abf4a74b0a 100644 --- a/support/pal/pal_uring/src/threadpool.rs +++ b/support/pal/pal_uring/src/threadpool.rs @@ -628,11 +628,6 @@ impl> Drop for Io { #[cfg(test)] mod tests { - #![expect( - clippy::disallowed_methods, - reason = "test code using futures channels" - )] - use super::Io; use super::IoRing; use crate::IoUringPool; @@ -998,24 +993,24 @@ mod tests { #[test] fn test_run_until_none() { skip_if_no_io_uring_support!(); - let (send, recv) = futures::channel::oneshot::channel(); - let (send2, recv2) = futures::channel::oneshot::channel(); + let (send, recv) = async_channel::bounded(1); + let (send2, recv2) = async_channel::bounded(1); let pool = IoUringPool::new("test", 16).unwrap(); let done = Arc::new(AtomicBool::new(false)); pool.client() .initiator() .spawn("hmm", { async move { - recv.await.unwrap(); - send2.send(()).unwrap(); + recv.recv().await.unwrap(); + send2.send(()).await.unwrap(); } }) .detach(); pool.client().set_idle_task({ let done = done.clone(); |_ctl| async move { - send.send(()).unwrap(); - recv2.await.unwrap(); + send.send(()).await.unwrap(); + recv2.recv().await.unwrap(); done.store(true, Ordering::SeqCst); } }); From 65a04e571c9dd18cd54bababd4cd435ca84e1c82 Mon Sep 17 00:00:00 2001 From: Steven Malis Date: Wed, 17 Dec 2025 16:29:47 -0500 Subject: [PATCH 2/2] Move this one to mesh too --- vm/devices/net/netvsp/src/lib.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/vm/devices/net/netvsp/src/lib.rs b/vm/devices/net/netvsp/src/lib.rs index cd1668bfb8..a5aeecccf0 100644 --- a/vm/devices/net/netvsp/src/lib.rs +++ b/vm/devices/net/netvsp/src/lib.rs @@ -324,7 +324,7 @@ struct QueueState { struct RxBufferRange { id_range: Range, - remote_buffer_id_recv: Option>, + remote_buffer_id_recv: Option>, remote_ranges: Arc, } @@ -332,7 +332,7 @@ impl RxBufferRange { fn new( ranges: Arc, id_range: Range, - remote_buffer_id_recv: Option>, + remote_buffer_id_recv: Option>, ) -> Self { Self { id_range, @@ -352,7 +352,7 @@ impl RxBufferRange { // the active queues. Any extra buffers are given to the last // queue, so redirect any larger values there. let i = (i as usize).min(self.remote_ranges.buffer_id_send.len() - 1); - let _ = self.remote_ranges.buffer_id_send[i].unbounded_send(id); + self.remote_ranges.buffer_id_send[i].send(id); true } } @@ -360,14 +360,13 @@ impl RxBufferRange { struct RxBufferRanges { buffers_per_queue: u32, - buffer_id_send: Vec>, + buffer_id_send: Vec>, } impl RxBufferRanges { - fn new(buffer_count: u32, queue_count: u32) -> (Self, Vec>) { + fn new(buffer_count: u32, queue_count: u32) -> (Self, Vec>) { let buffers_per_queue = (buffer_count - RX_RESERVED_CONTROL_BUFFERS) / queue_count; - #[expect(clippy::disallowed_methods)] // TODO - let (send, recv): (Vec<_>, Vec<_>) = (0..queue_count).map(|_| mpsc::unbounded()).unzip(); + let (send, recv): (Vec<_>, Vec<_>) = (0..queue_count).map(|_| mesh::channel()).unzip(); ( Self { buffers_per_queue,