Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::{
resolve_hostname_port, ShredstreamProxyError,
};

use crate::multicast_config::{ipv4_addr_for_device, ifindex_for_device};

// values copied from https://github.com/solana-labs/solana/blob/33bde55bbdde13003acf45bb6afe6db4ab599ae4/core/src/sigverify_shreds.rs#L20
pub const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
pub const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
Expand All @@ -54,6 +56,7 @@ pub fn start_forwarder_threads(
src_port: u16,
maybe_multicast_socket: Option<Vec<UdpSocket>>,
maybe_triton_multicast_socket: Option<(IpAddr, Vec<UdpSocket>)>,
maybe_multicast_ifname: Option<String>,
num_threads: Option<usize>,
deduper: Arc<RwLock<Deduper<2, [u8]>>>,
should_reconstruct_shreds: bool,
Expand Down Expand Up @@ -240,7 +243,8 @@ pub fn start_forwarder_threads(
if let Some((multicast_origin, multicast_socket)) = maybe_triton_multicast_socket {
start_multicast_forwarder_thread(
multicast_origin,
multicast_socket,
multicast_socket,
maybe_multicast_ifname,
recycler,
reconstruct_tx,
unioned_dest_sockets,
Expand Down Expand Up @@ -277,6 +281,7 @@ pub struct MulticastSource {
pub fn start_multicast_forwarder_thread(
multicast_origin: IpAddr,
sockets: Vec<UdpSocket>,
maybe_multicast_ifname: Option<String>,
recycler: PacketBatchRecycler,
reconstruct_tx: crossbeam_channel::Sender<PacketBatch>,
unioned_dest_sockets: Arc<ArcSwap<Vec<SocketAddr>>>,
Expand Down Expand Up @@ -313,7 +318,7 @@ pub fn start_multicast_forwarder_thread(
let reconstruct_tx = reconstruct_tx.clone();
let exit = exit.clone();


let ifname_opt = maybe_multicast_ifname.clone();

let send_thread = Builder::new()
.name(format!("ssPxyTxMulticast_{thread_id}"))
Expand All @@ -326,6 +331,11 @@ pub fn start_multicast_forwarder_thread(
match try_create_ipv6_socket(ipv6_addr) {
Ok(socket) => {
info!("Successfully bound send socket to IPv6 dual-stack address.");
if let Some(ifname) = &ifname_opt {
if let Ok(Some(idx)) = ifindex_for_device(ifname) {
let _ = socket.set_multicast_if_v6(idx);
}
}
socket.set_multicast_loop_v6(false)
.expect("Failed to disable IPv6 multicast loopback");
socket
Expand All @@ -337,6 +347,19 @@ pub fn start_multicast_forwarder_thread(
let socket = UdpSocket::bind(ipv4_addr)
.expect("Failed to bind to IPv4 socket after IPv6 failed");
socket.set_multicast_ttl_v4(IP_MULTICAST_TTL).expect("IP_MULTICAST_TTL_V4");

if let Some(ifname) = &ifname_opt {
match ipv4_addr_for_device(ifname) {
Ok(Some(device_ip)) => {
info!("Binding multicast output for {} to interface IP: {}", ifname, device_ip);
socket.set_multicast_if_v4(&device_ip)
.expect("Failed to set multicast output interface");
}
Ok(None) => warn!("Interface {} has no IPv4 address; skipping set_multicast_if_v4", ifname),
Err(e) => warn!("Failed to resolve interface {}: {}", ifname, e),
}
}

socket.set_multicast_loop_v4(false)
.expect("Failed to disable IPv4 multicast loopback");
socket
Expand Down
1 change: 1 addition & 0 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ fn main() -> Result<(), ShredstreamProxyError> {
args.src_bind_port,
maybe_multicast_socket,
maybe_triton_multicast_socket,
maybe_multicast_ifname,
args.num_threads,
deduper.clone(),
args.grpc_service_port.is_some(),
Expand Down
Loading