Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3990,9 +3990,9 @@ dependencies = [
name = "mesh_node"
version = "0.0.0"
dependencies = [
"async-channel",
"bitfield-struct 0.11.0",
"futures",
"futures-channel",
"getrandom 0.3.3",
"mesh_derive",
"mesh_protobuf",
Expand Down Expand Up @@ -4047,6 +4047,7 @@ dependencies = [
name = "mesh_remote"
version = "0.0.0"
dependencies = [
"async-channel",
"event-listener",
"futures",
"futures-concurrency",
Expand Down Expand Up @@ -5432,6 +5433,7 @@ dependencies = [
name = "pal_uring"
version = "0.0.0"
dependencies = [
"async-channel",
"futures",
"inspect",
"io-uring",
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ futures = "0.3.31"
futures-concurrency = "7.6.3"
futures-executor = "0.3"
futures-core = "0.3"
futures-channel = "0.3"
futures-io = "0.3"
gdbstub = "0.6"
gdbstub_arch = "0.2"
Expand Down
2 changes: 1 addition & 1 deletion support/mesh/mesh_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ mesh_derive.workspace = true
mesh_protobuf.workspace = true
open_enum.workspace = true

async-channel.workspace = true
bitfield-struct.workspace = true
futures-channel.workspace = true
getrandom.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
Expand Down
23 changes: 9 additions & 14 deletions support/mesh/mesh_node/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::message::Message;
use crate::message::OwnedMessage;
use crate::resource::OsResource;
use crate::resource::Resource;
use futures_channel::oneshot;
use mesh_protobuf::DefaultEncoding;
use mesh_protobuf::buffer::Buf;
use mesh_protobuf::buffer::Buffer;
Expand Down Expand Up @@ -1479,8 +1478,9 @@ impl PortInner {
let shutdown = state.shutdown.take();
drop(state);
// Trace outside the lock to avoid deadlocks.
if shutdown.is_some() {
if let Some(shutdown) = shutdown {
tracing::trace!(node = ?local_node.id, "waking shutdown waiter");
shutdown.close();
}
}
}
Expand Down Expand Up @@ -1627,7 +1627,7 @@ impl Clone for RemoteNodeHandle {
struct LocalNodeState {
ports: HashMap<PortId, Arc<PortInner>>,
nodes: HashMap<NodeId, Arc<RemoteNode>>,
shutdown: Option<oneshot::Sender<()>>,
shutdown: Option<async_channel::Sender<()>>,
}

/// The deserialized event for processing by a local port.
Expand Down Expand Up @@ -1891,8 +1891,7 @@ impl LocalNode {
/// process of being sent to another node.
pub async fn wait_for_ports(&self, all_ports: bool) {
loop {
#[allow(clippy::disallowed_methods)] // TODO
let (send, recv) = oneshot::channel::<()>();
let (send, recv) = async_channel::bounded::<()>(1);
let ports: Vec<_> = {
let mut state = self.inner.state.lock();
state.shutdown = Some(send);
Expand Down Expand Up @@ -1921,7 +1920,7 @@ impl LocalNode {
return;
}
tracing::trace!(node = ?self.id(), count = left, "waiting for ports");
let _ = recv.await;
let _ = recv.recv().await;
}
}

Expand Down Expand Up @@ -2474,7 +2473,7 @@ pub mod tests {
struct RemoteLocalNode {
_task: Task<()>,
node: Arc<LocalNode>,
send: futures_channel::mpsc::UnboundedSender<RemoteEvent>,
send: async_channel::Sender<RemoteEvent>,
}

struct RemoteEvent {
Expand All @@ -2492,11 +2491,7 @@ pub mod tests {

impl RemoteLocalNode {
fn new(driver: &impl Spawn) -> Self {
#[expect(
clippy::disallowed_methods,
reason = "can't use mesh channels from mesh_node"
)]
let (send, recv) = futures_channel::mpsc::unbounded::<RemoteEvent>();
let (send, recv) = async_channel::unbounded::<RemoteEvent>();
let node = Arc::new(LocalNode::with_id(NodeId::new(), Box::new(NullConnect)));
let task = driver.spawn("test", {
let node = node.clone();
Expand Down Expand Up @@ -2526,7 +2521,7 @@ pub mod tests {

struct EventsFrom {
node_id: NodeId,
send: futures_channel::mpsc::UnboundedSender<RemoteEvent>,
send: async_channel::Sender<RemoteEvent>,
}

impl SendEvent for EventsFrom {
Expand All @@ -2535,7 +2530,7 @@ pub mod tests {
let mut os_resources = Vec::new();
event.write_to(&mut buffer, &mut os_resources);
self.send
.unbounded_send(RemoteEvent {
.send_blocking(RemoteEvent {
node_id: self.node_id,
data: buffer,
resources: os_resources,
Expand Down
1 change: 1 addition & 0 deletions support/mesh/mesh_remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pal_async.workspace = true
tracing_helpers.workspace = true
unix_socket = { workspace = true, features = ["mesh"] }

async-channel.workspace = true
futures.workspace = true
futures-concurrency.workspace = true
parking_lot.workspace = true
Expand Down
21 changes: 10 additions & 11 deletions support/mesh/mesh_remote/src/alpc_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::common::InvitationAddress;
use crate::protocol;
use futures::FutureExt;
use futures::StreamExt;
use futures::channel::mpsc;
use futures::future::AbortHandle;
use futures::future::abortable;
use mesh_node::common::Address;
Expand Down Expand Up @@ -88,7 +87,7 @@ pub struct AlpcNode {
recv_abort: AbortHandle,
recv_task: Task<()>,
connect_task: Task<()>,
connect_send: mpsc::UnboundedSender<(NodeId, RemoteNodeHandle)>,
connect_send: async_channel::Sender<(NodeId, RemoteNodeHandle)>,
}

trait InvitationDriver: Spawn + Send {}
Expand Down Expand Up @@ -179,8 +178,7 @@ impl AlpcNode {
let port = PolledWait::new(&driver, port)?;

let invitations = Default::default();
#[expect(clippy::disallowed_methods)] // TODO
let (connect_send, connect_recv) = mpsc::unbounded();
let (connect_send, connect_recv) = async_channel::unbounded();
let local_node = Arc::new(LocalNode::with_id(
local_id,
Box::new(AlpcConnector {
Expand Down Expand Up @@ -223,15 +221,16 @@ impl AlpcNode {
driver: &(impl ?Sized + Driver),
local_id: NodeId,
directory: Arc<OwnedHandle>,
mut connect_recv: mpsc::UnboundedReceiver<(NodeId, RemoteNodeHandle)>,
connect_recv: async_channel::Receiver<(NodeId, RemoteNodeHandle)>,
) {
let teardowns: Mutex<HashMap<NodeId, mesh_channel::OneshotSender<()>>> = Default::default();
let mut connect_tasks = FuturesUnordered::new();
let mut recv = std::pin::pin!(connect_recv);
loop {
// Receive a new request or drive any in-progress connection tasks
// forward.
let (remote_id, handle) = futures::select! { // merge semantics
msg = connect_recv.next().fuse() => {
msg = recv.next().fuse() => {
match msg {
Some(msg) => msg,
None => break,
Expand Down Expand Up @@ -498,7 +497,7 @@ impl AlpcNode {
/// returns, data loss could occur for other mesh nodes.
pub async fn shutdown(self) {
self.local_node.wait_for_ports(false).await;
self.connect_send.close_channel();
self.connect_send.close();
self.connect_task.await;
self.recv_abort.abort();
self.recv_task.await;
Expand All @@ -510,7 +509,7 @@ impl AlpcNode {
local_node: Arc<LocalNode>,
mut port: PolledWait<alpc::Port>,
invitations: InvitationMap,
connect_send: mpsc::UnboundedSender<(NodeId, RemoteNodeHandle)>,
connect_send: async_channel::Sender<(NodeId, RemoteNodeHandle)>,
) -> io::Result<()> {
struct Connection {
comm: alpc::Port,
Expand Down Expand Up @@ -595,7 +594,7 @@ impl AlpcNode {
// If this is a connection from a node that was invited,
// then it's now safe to connect to the node. Send the
// deferred connection to the connection task.
let _ = connect_send.unbounded_send((node_id, handle.clone()));
let _ = connect_send.send_blocking((node_id, handle.clone()));
handle
} else {
// Otherwise, establish a connection now to get a
Expand Down Expand Up @@ -658,13 +657,13 @@ impl AlpcNode {
/// Connector for connecting to remote ALPC nodes.
#[derive(Debug)]
struct AlpcConnector {
connect_send: mpsc::UnboundedSender<(NodeId, RemoteNodeHandle)>,
connect_send: async_channel::Sender<(NodeId, RemoteNodeHandle)>,
}

impl Connect for AlpcConnector {
fn connect(&self, remote_id: NodeId, handle: RemoteNodeHandle) {
// Send the handle to the connect task.
let _ = self.connect_send.unbounded_send((remote_id, handle));
let _ = self.connect_send.send_blocking((remote_id, handle));
}
}

Expand Down
56 changes: 28 additions & 28 deletions support/mesh/mesh_remote/src/unix_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::common::InvitationAddress;
use crate::protocol;
use futures::FutureExt;
use futures::StreamExt;
use futures::channel::mpsc;
use futures::future;
use futures::future::BoxFuture;
use io::ErrorKind;
Expand Down Expand Up @@ -398,7 +397,7 @@ enum SenderCommand {

#[derive(Clone)]
struct PacketSender {
send: mpsc::UnboundedSender<SenderCommand>,
send: async_channel::Sender<SenderCommand>,
socket: Arc<UnixSocket>,
}

Expand Down Expand Up @@ -435,9 +434,7 @@ impl SendEvent for PacketSender {
)
.is_err()
{
let _ = self
.send
.unbounded_send(SenderCommand::Send { packet, fds });
let _ = self.send.send_blocking(SenderCommand::Send { packet, fds });
}
}
}
Expand Down Expand Up @@ -494,7 +491,7 @@ impl Drop for PacketSender {
fn drop(&mut self) {
// Explicitly close the send channel so that the send task returns, even
// though the send channel is also in use by the receive task.
self.send.close_channel();
self.send.close();
}
}

Expand All @@ -506,8 +503,7 @@ fn start_connection(
handle: RemoteNodeHandle,
socket: UnixSocket,
) {
#[expect(clippy::disallowed_methods)] // TODO
let (send, recv) = mpsc::unbounded();
let (send, recv) = async_channel::unbounded();
let socket = Arc::new(socket);
let sender = PacketSender {
send: send.clone(),
Expand All @@ -533,8 +529,8 @@ fn start_connection(
async fn run_connection(
local_node: Arc<LocalNode>,
remote_id: NodeId,
send_send: mpsc::UnboundedSender<SenderCommand>,
send_recv: mpsc::UnboundedReceiver<SenderCommand>,
send_send: async_channel::Sender<SenderCommand>,
send_recv: async_channel::Receiver<SenderCommand>,
socket: Arc<UnixSocket>,
handle: RemoteNodeHandle,
) {
Expand Down Expand Up @@ -620,7 +616,7 @@ async fn run_receive(
local_node: &LocalNode,
remote_id: &NodeId,
socket: &UnixSocket,
send: &mpsc::UnboundedSender<SenderCommand>,
send: &async_channel::Sender<SenderCommand>,
) -> Result<(), ReceiveError> {
let mut buf = vec![0; MAX_PACKET_SIZE];
let mut fds = Vec::new();
Expand All @@ -631,18 +627,20 @@ async fn run_receive(
}
if cfg!(target_os = "macos") && !fds.is_empty() {
// Tell the opposite endpoint to release the fds it sent.
let _ = send.unbounded_send(SenderCommand::Send {
packet: protocol::ReleaseFds {
header: protocol::PacketHeader {
packet_type: protocol::PacketType::RELEASE_FDS,
..FromZeros::new_zeroed()
},
count: fds.len() as u64,
}
.as_bytes()
.to_vec(),
fds: Vec::new(),
});
let _ = send
.send(SenderCommand::Send {
packet: protocol::ReleaseFds {
header: protocol::PacketHeader {
packet_type: protocol::PacketType::RELEASE_FDS,
..FromZeros::new_zeroed()
},
count: fds.len() as u64,
}
.as_bytes()
.to_vec(),
fds: Vec::new(),
})
.await;
}

let buf = &buf[..len];
Expand All @@ -658,9 +656,11 @@ async fn run_receive(
let release_fds = protocol::ReleaseFds::read_from_prefix(buf)
.map_err(|_| ReceiveError::BadReleaseFds)?
.0; // TODO: zerocopy: map_err (https://github.com/microsoft/openvmm/issues/759)
let _ = send.unbounded_send(SenderCommand::ReleaseFds {
count: release_fds.count as usize,
});
let _ = send
.send(SenderCommand::ReleaseFds {
count: release_fds.count as usize,
})
.await;
}
#[cfg(target_os = "linux")]
protocol::PacketType::LARGE_EVENT => {
Expand Down Expand Up @@ -688,11 +688,11 @@ enum ProtocolError {

/// Handles send processing for the socket.
async fn run_send(
mut recv: mpsc::UnboundedReceiver<SenderCommand>,
recv: async_channel::Receiver<SenderCommand>,
socket: &UnixSocket,
retained_fds: &mut VecDeque<OsResource>,
) -> io::Result<()> {
while let Some(command) = recv.next().await {
while let Ok(command) = recv.recv().await {
match command {
SenderCommand::Send { packet, fds } => {
match socket.send(&packet, &fds).await {
Expand Down
Loading