diff --git a/proxy/src/forwarder.rs b/proxy/src/forwarder.rs index 6ffe4024..7d4710d9 100644 --- a/proxy/src/forwarder.rs +++ b/proxy/src/forwarder.rs @@ -40,6 +40,8 @@ use crate::{ resolve_hostname_port, ShredstreamProxyError, }; +use crate::multicast_config::{ipv4_addr_for_device, ifindex_for_device}; + // values copied from https://github.com/solana-labs/solana/blob/33bde55bbdde13003acf45bb6afe6db4ab599ae4/core/src/sigverify_shreds.rs#L20 pub const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001; pub const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB @@ -54,6 +56,7 @@ pub fn start_forwarder_threads( src_port: u16, maybe_multicast_socket: Option>, maybe_triton_multicast_socket: Option<(IpAddr, Vec)>, + maybe_multicast_ifname: Option, num_threads: Option, deduper: Arc>>, should_reconstruct_shreds: bool, @@ -240,7 +243,8 @@ pub fn start_forwarder_threads( if let Some((multicast_origin, multicast_socket)) = maybe_triton_multicast_socket { start_multicast_forwarder_thread( multicast_origin, - multicast_socket, + multicast_socket, + maybe_multicast_ifname, recycler, reconstruct_tx, unioned_dest_sockets, @@ -277,6 +281,7 @@ pub struct MulticastSource { pub fn start_multicast_forwarder_thread( multicast_origin: IpAddr, sockets: Vec, + maybe_multicast_ifname: Option, recycler: PacketBatchRecycler, reconstruct_tx: crossbeam_channel::Sender, unioned_dest_sockets: Arc>>, @@ -313,7 +318,7 @@ pub fn start_multicast_forwarder_thread( let reconstruct_tx = reconstruct_tx.clone(); let exit = exit.clone(); - + let ifname_opt = maybe_multicast_ifname.clone(); let send_thread = Builder::new() .name(format!("ssPxyTxMulticast_{thread_id}")) @@ -326,6 +331,11 @@ pub fn start_multicast_forwarder_thread( match try_create_ipv6_socket(ipv6_addr) { Ok(socket) => { info!("Successfully bound send socket to IPv6 dual-stack address."); + if let Some(ifname) = &ifname_opt { + if let Ok(Some(idx)) = ifindex_for_device(ifname) { + let _ = socket.set_multicast_if_v6(idx); + } + } socket.set_multicast_loop_v6(false) .expect("Failed to disable IPv6 multicast loopback"); socket @@ -337,6 +347,19 @@ pub fn start_multicast_forwarder_thread( let socket = UdpSocket::bind(ipv4_addr) .expect("Failed to bind to IPv4 socket after IPv6 failed"); socket.set_multicast_ttl_v4(IP_MULTICAST_TTL).expect("IP_MULTICAST_TTL_V4"); + + if let Some(ifname) = &ifname_opt { + match ipv4_addr_for_device(ifname) { + Ok(Some(device_ip)) => { + info!("Binding multicast output for {} to interface IP: {}", ifname, device_ip); + socket.set_multicast_if_v4(&device_ip) + .expect("Failed to set multicast output interface"); + } + Ok(None) => warn!("Interface {} has no IPv4 address; skipping set_multicast_if_v4", ifname), + Err(e) => warn!("Failed to resolve interface {}: {}", ifname, e), + } + } + socket.set_multicast_loop_v4(false) .expect("Failed to disable IPv4 multicast loopback"); socket diff --git a/proxy/src/main.rs b/proxy/src/main.rs index e7c785fc..e499cff5 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -348,6 +348,7 @@ fn main() -> Result<(), ShredstreamProxyError> { args.src_bind_port, maybe_multicast_socket, maybe_triton_multicast_socket, + maybe_multicast_ifname, args.num_threads, deduper.clone(), args.grpc_service_port.is_some(),