Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 270 additions & 41 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ const INITIAL_INTERVAL: Duration = Duration::from_millis(50);

const DEFAULT_BW_TRACKER_WINDOW_SIZE: Duration = Duration::from_secs(10);

/// Size of RSA-2048 encrypted intro packets (PKCS#1 v1.5 padding).
/// RSA-2048 produces 256-byte ciphertext for any input up to 245 bytes.
/// Used to detect new peer identities from existing addresses (issue #2277).
const RSA_INTRO_PACKET_SIZE: usize = 256;

/// Minimum interval between RSA decryption attempts for the same address.
/// Prevents CPU exhaustion from attackers sending 256-byte packets.
const RSA_DECRYPTION_RATE_LIMIT: Duration = Duration::from_secs(1);

pub type SerializedMessage = Vec<u8>;

type GatewayConnectionFuture = BoxFuture<
Expand Down Expand Up @@ -188,6 +197,7 @@ impl OutboundConnectionHandler {
last_drop_warning: Instant::now(),
bandwidth_limit,
expected_non_gateway: expected_non_gateway.clone(),
last_rsa_attempt: HashMap::new(),
};
let bw_tracker = super::rate_limiter::PacketRateLimiter::new(
DEFAULT_BW_TRACKER_WINDOW_SIZE,
Expand Down Expand Up @@ -292,6 +302,8 @@ struct UdpPacketsListener<S = UdpSocket> {
last_drop_warning: Instant,
bandwidth_limit: Option<usize>,
expected_non_gateway: Arc<DashSet<IpAddr>>,
/// Rate limiting for RSA decryption attempts to prevent DoS (issue #2277).
last_rsa_attempt: HashMap<SocketAddr, Instant>,
}

type OngoingConnection = (
Expand Down Expand Up @@ -378,53 +390,113 @@ impl<S: Socket> UdpPacketsListener<S> {
);

if let Some(remote_conn) = self.remote_connections.remove(&remote_addr) {
match remote_conn.inbound_packet_sender.try_send(packet_data) {
Ok(_) => {
self.remote_connections.insert(remote_addr, remote_conn);
continue;
// Issue #2277: Check if this is a new intro packet from a peer that
// restarted with a new identity. RSA intro packets are exactly 256 bytes.
// If we can decrypt it as an intro, a new peer is connecting from the
// same IP:port (e.g., NAT assigned the same mapping after restart).
let is_new_identity = if self.is_gateway
&& size == RSA_INTRO_PACKET_SIZE
{
// Rate limit RSA decryption attempts to prevent DoS
let now = Instant::now();
let rate_limited = self
.last_rsa_attempt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we clean up from this map after connections are dropped?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 8224600 - added cleanup at all 4 sites where connections are removed. Thanks for catching this!

[AI-assisted - Claude]

.get(&remote_addr)
.is_some_and(|last| now.duration_since(*last) < RSA_DECRYPTION_RATE_LIMIT);

if rate_limited {
false
} else {
self.last_rsa_attempt.insert(remote_addr, now);
// Try RSA decryption and validate intro packet structure
match packet_data.try_decrypt_asym(&self.this_peer_keypair.secret) {
Ok(decrypted) => {
// Validate intro packet structure:
// 1. Protocol version (PROTOC_VERSION.len() bytes)
// 2. Outbound symmetric key (16 bytes)
let decrypted_data = decrypted.data();
let proto_len = PROTOC_VERSION.len();
if decrypted_data.len() >= proto_len + 16
&& decrypted_data[..proto_len] == PROTOC_VERSION
{
true
} else {
tracing::debug!(
peer_addr = %remote_addr,
"256-byte packet decrypted but not valid intro structure"
);
false
}
}
Err(_) => false, // Not an RSA intro packet for us
}
}
Err(fast_channel::TrySendError::Full(_)) => {
// Channel full, reinsert connection
self.remote_connections.insert(remote_addr, remote_conn);
} else {
false
};

// Track dropped packets and log warnings periodically
let dropped_count = self.dropped_packets.entry(remote_addr).or_insert(0);
*dropped_count += 1;
if is_new_identity {
tracing::info!(
peer_addr = %remote_addr,
"Detected new peer identity from existing address (issue #2277). \
Peer likely restarted with new identity. Resetting session."
);
// Clean up rate-limit tracking for the old peer
self.last_rsa_attempt.remove(&remote_addr);
// Don't reinsert - let the packet fall through to gateway_connection
// which will establish a fresh session with the new peer
} else {
// Forward packet to existing connection
match remote_conn.inbound_packet_sender.try_send(packet_data) {
Ok(_) => {
self.remote_connections.insert(remote_addr, remote_conn);
continue;
}
Err(fast_channel::TrySendError::Full(_)) => {
// Channel full, reinsert connection
self.remote_connections.insert(remote_addr, remote_conn);

// Track dropped packets and log warnings periodically
let dropped_count = self.dropped_packets.entry(remote_addr).or_insert(0);
*dropped_count += 1;

// Log warning every 10 seconds if packets are being dropped
let now = Instant::now();
if now.duration_since(self.last_drop_warning) > Duration::from_secs(10) {
let total_dropped: u64 = self.dropped_packets.values().sum();
tracing::warn!(
total_dropped,
elapsed_secs = 10,
"Channel overflow: dropped packets (bandwidth limit may be too high or receiver too slow)"
);
for (addr, count) in &self.dropped_packets {
if *count > 100 {
tracing::warn!(
peer_addr = %addr,
dropped_count = count,
"High packet drop rate for peer"
);
}
}
self.dropped_packets.clear();
self.last_drop_warning = now;
}

// Log warning every 10 seconds if packets are being dropped
let now = Instant::now();
if now.duration_since(self.last_drop_warning) > Duration::from_secs(10) {
let total_dropped: u64 = self.dropped_packets.values().sum();
// Drop the packet instead of falling through - prevents symmetric packets
// from being sent to RSA decryption handlers
continue;
}
Err(fast_channel::TrySendError::Disconnected(_)) => {
// Channel closed, connection is dead
tracing::warn!(
total_dropped,
elapsed_secs = 10,
"Channel overflow: dropped packets (bandwidth limit may be too high or receiver too slow)"
peer_addr = %remote_addr,
"Connection closed, removing from active connections"
);
for (addr, count) in &self.dropped_packets {
if *count > 100 {
tracing::warn!(
peer_addr = %addr,
dropped_count = count,
"High packet drop rate for peer"
);
}
}
self.dropped_packets.clear();
self.last_drop_warning = now;
// Clean up rate-limit tracking
self.last_rsa_attempt.remove(&remote_addr);
// Don't reinsert - connection is truly dead
continue;
}

// Drop the packet instead of falling through - prevents symmetric packets
// from being sent to RSA decryption handlers
continue;
}
Err(fast_channel::TrySendError::Disconnected(_)) => {
// Channel closed, connection is dead
tracing::warn!(
peer_addr = %remote_addr,
"Connection closed, removing from active connections"
);
// Don't reinsert - connection is truly dead
continue;
}
}
}
Expand Down Expand Up @@ -504,6 +576,8 @@ impl<S: Socket> UdpPacketsListener<S> {
.collect();
for stale_addr in stale_addrs {
self.remote_connections.remove(&stale_addr);
// Clean up rate-limit tracking for the stale connection
self.last_rsa_attempt.remove(&stale_addr);
tracing::debug!(
stale_peer_addr = %stale_addr,
new_peer_addr = %remote_addr,
Expand Down Expand Up @@ -687,6 +761,8 @@ impl<S: Socket> UdpPacketsListener<S> {
if existing_conn.inbound_packet_sender.is_closed() {
// Connection is dead, remove it
self.remote_connections.remove(&remote_addr);
// Clean up rate-limit tracking
self.last_rsa_attempt.remove(&remote_addr);
tracing::warn!(
peer_addr = %remote_addr,
direction = "outbound",
Expand Down Expand Up @@ -2509,4 +2585,157 @@ pub mod mock_transport {

Ok(())
}

/// Create a mock peer at a specific socket address for testing.
///
/// This allows testing scenarios where a peer reconnects from the same IP:port.
#[allow(dead_code)]
async fn create_mock_peer_at_addr(
packet_drop_policy: PacketDropPolicy,
addr: SocketAddr,
channels: Channels,
) -> anyhow::Result<(TransportPublicKey, OutboundConnectionHandler, SocketAddr)> {
let peer_keypair = TransportKeypair::new();
let peer_pub = peer_keypair.public.clone();
let socket = Arc::new(MockSocket::new(packet_drop_policy, addr, channels).await);
let (peer_conn, _inbound_conn) =
OutboundConnectionHandler::new_test(addr, socket, peer_keypair, false)
.expect("failed to create peer");
Ok((peer_pub, peer_conn, addr))
}

/// Test that a peer reconnecting from the SAME IP:port with a NEW identity is handled correctly.
///
/// This simulates the scenario from issue #2277 where:
/// 1. Peer A (identity X) connects to gateway from IP:port
/// 2. Peer A disconnects (peer restarts, clears state, gets new identity)
/// 3. Peer B (identity Y) tries to connect from SAME IP:port (NAT assigns same mapping)
/// 4. Gateway should recognize this as a new peer and establish a fresh session
///
/// The bug was that the gateway retained the old session with peer A's encryption keys,
/// causing peer B's handshake packets to fail decryption and be silently dropped.
#[tokio::test]
async fn gateway_handles_peer_reconnection_same_addr_new_identity() -> anyhow::Result<()> {
let channels = Arc::new(DashMap::new());

// Create the gateway
let (gw_pub, (gw_outbound, mut gw_conn), gw_addr) =
create_mock_gateway(Default::default(), channels.clone()).await?;

// Use a fixed port for testing the same-address scenario
let peer_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 44444).into();

// Step 1: Create peer A with identity X at the fixed address
tracing::info!(
"Step 1: Creating peer A (identity X) at {} connecting to gateway at {}",
peer_addr,
gw_addr
);

let (peer_a_pub, mut peer_a, _) =
create_mock_peer_at_addr(Default::default(), peer_addr, channels.clone()).await?;

tracing::info!("Peer A public key: {:?}", peer_a_pub);

// Peer A connects to gateway
let gw_task = tokio::spawn(async move {
let conn = tokio::time::timeout(Duration::from_secs(10), gw_conn.recv())
.await?
.ok_or(anyhow::anyhow!(
"gateway: no inbound connection from peer A"
))?;
tracing::info!(
"Gateway received connection from peer A at {}",
conn.remote_addr()
);
Ok::<_, anyhow::Error>((gw_conn, conn))
});

let peer_a_conn = tokio::time::timeout(
Duration::from_secs(10),
peer_a.connect(gw_pub.clone(), gw_addr).await,
)
.await??;
tracing::info!("Peer A connected successfully");

let (mut gw_conn, _gw_peer_a_conn) = gw_task.await??;

// Step 2: Simulate peer restart by dropping everything related to peer A
// This simulates: peer process killed, state cleared, restarted with new identity
tracing::info!("Step 2: Simulating peer A restart (dropping connection and handler)");
drop(peer_a_conn);
drop(peer_a);

// Give time for cleanup (in reality this could be much longer)
tokio::time::sleep(Duration::from_millis(100)).await;

// Step 3: Create peer B with NEW identity (Y) at the SAME address
// This simulates the NAT assigning the same external IP:port to the restarted peer
tracing::info!(
"Step 3: Creating peer B (NEW identity Y) at SAME address {}",
peer_addr
);

let (peer_b_pub, mut peer_b, _) =
create_mock_peer_at_addr(Default::default(), peer_addr, channels.clone()).await?;

tracing::info!("Peer B public key: {:?}", peer_b_pub);

// Verify this test is actually testing new identity
assert_ne!(
peer_a_pub, peer_b_pub,
"Peer B must have different identity than peer A"
);

// Step 4: Peer B (new identity) connects to gateway from same IP:port
// This is where the bug manifests:
// - Gateway still has remote_connections entry for peer_addr with peer A's session
// - Peer B's handshake packets arrive, get sent to peer A's session handler
// - Decryption fails because peer B is using different keys
// - Handshake silently fails
tracing::info!("Step 4: Peer B (new identity) connecting to gateway from same address");

let gw_task = tokio::spawn(async move {
let conn = tokio::time::timeout(Duration::from_secs(5), gw_conn.recv())
.await?
.ok_or(anyhow::anyhow!(
"gateway: no inbound connection from peer B (bug #2277)"
))?;
tracing::info!(
"Gateway received connection from peer B at {}",
conn.remote_addr()
);
Ok::<_, anyhow::Error>(conn)
});

let start = std::time::Instant::now();
let peer_b_conn = tokio::time::timeout(
Duration::from_secs(5),
peer_b.connect(gw_pub.clone(), gw_addr).await,
)
.await??;
let elapsed = start.elapsed();

tracing::info!("Peer B connected in {:?}", elapsed);

let _gw_peer_b_conn = gw_task.await??;

// Verify the connection works
assert_eq!(peer_b_conn.remote_addr(), gw_addr);

// The connection should complete quickly
assert!(
elapsed < Duration::from_secs(2),
"Connection took {:?}, which suggests the gateway failed to recognize the new identity. \
This is bug #2277: gateway doesn't reset session when peer restarts with new identity.",
elapsed
);

// Cleanup
drop(peer_b_conn);
drop(peer_b);
drop(gw_outbound);

Ok(())
}
}