Skip to content

Commit 3a3db48

Browse files
sanityclaude
andauthored
fix(transport): reset gateway session when peer restarts with new identity (#2281)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent fab8b66 commit 3a3db48

File tree

1 file changed

+270
-41
lines changed

1 file changed

+270
-41
lines changed

crates/core/src/transport/connection_handler.rs

Lines changed: 270 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ const INITIAL_INTERVAL: Duration = Duration::from_millis(50);
4242

4343
const DEFAULT_BW_TRACKER_WINDOW_SIZE: Duration = Duration::from_secs(10);
4444

45+
/// Size of RSA-2048 encrypted intro packets (PKCS#1 v1.5 padding).
46+
/// RSA-2048 produces 256-byte ciphertext for any input up to 245 bytes.
47+
/// Used to detect new peer identities from existing addresses (issue #2277).
48+
const RSA_INTRO_PACKET_SIZE: usize = 256;
49+
50+
/// Minimum interval between RSA decryption attempts for the same address.
51+
/// Prevents CPU exhaustion from attackers sending 256-byte packets.
52+
const RSA_DECRYPTION_RATE_LIMIT: Duration = Duration::from_secs(1);
53+
4554
pub type SerializedMessage = Vec<u8>;
4655

4756
type GatewayConnectionFuture = BoxFuture<
@@ -188,6 +197,7 @@ impl OutboundConnectionHandler {
188197
last_drop_warning: Instant::now(),
189198
bandwidth_limit,
190199
expected_non_gateway: expected_non_gateway.clone(),
200+
last_rsa_attempt: HashMap::new(),
191201
};
192202
let bw_tracker = super::rate_limiter::PacketRateLimiter::new(
193203
DEFAULT_BW_TRACKER_WINDOW_SIZE,
@@ -292,6 +302,8 @@ struct UdpPacketsListener<S = UdpSocket> {
292302
last_drop_warning: Instant,
293303
bandwidth_limit: Option<usize>,
294304
expected_non_gateway: Arc<DashSet<IpAddr>>,
305+
/// Rate limiting for RSA decryption attempts to prevent DoS (issue #2277).
306+
last_rsa_attempt: HashMap<SocketAddr, Instant>,
295307
}
296308

297309
type OngoingConnection = (
@@ -378,53 +390,113 @@ impl<S: Socket> UdpPacketsListener<S> {
378390
);
379391

380392
if let Some(remote_conn) = self.remote_connections.remove(&remote_addr) {
381-
match remote_conn.inbound_packet_sender.try_send(packet_data) {
382-
Ok(_) => {
383-
self.remote_connections.insert(remote_addr, remote_conn);
384-
continue;
393+
// Issue #2277: Check if this is a new intro packet from a peer that
394+
// restarted with a new identity. RSA intro packets are exactly 256 bytes.
395+
// If we can decrypt it as an intro, a new peer is connecting from the
396+
// same IP:port (e.g., NAT assigned the same mapping after restart).
397+
let is_new_identity = if self.is_gateway
398+
&& size == RSA_INTRO_PACKET_SIZE
399+
{
400+
// Rate limit RSA decryption attempts to prevent DoS
401+
let now = Instant::now();
402+
let rate_limited = self
403+
.last_rsa_attempt
404+
.get(&remote_addr)
405+
.is_some_and(|last| now.duration_since(*last) < RSA_DECRYPTION_RATE_LIMIT);
406+
407+
if rate_limited {
408+
false
409+
} else {
410+
self.last_rsa_attempt.insert(remote_addr, now);
411+
// Try RSA decryption and validate intro packet structure
412+
match packet_data.try_decrypt_asym(&self.this_peer_keypair.secret) {
413+
Ok(decrypted) => {
414+
// Validate intro packet structure:
415+
// 1. Protocol version (PROTOC_VERSION.len() bytes)
416+
// 2. Outbound symmetric key (16 bytes)
417+
let decrypted_data = decrypted.data();
418+
let proto_len = PROTOC_VERSION.len();
419+
if decrypted_data.len() >= proto_len + 16
420+
&& decrypted_data[..proto_len] == PROTOC_VERSION
421+
{
422+
true
423+
} else {
424+
tracing::debug!(
425+
peer_addr = %remote_addr,
426+
"256-byte packet decrypted but not valid intro structure"
427+
);
428+
false
429+
}
430+
}
431+
Err(_) => false, // Not an RSA intro packet for us
432+
}
385433
}
386-
Err(fast_channel::TrySendError::Full(_)) => {
387-
// Channel full, reinsert connection
388-
self.remote_connections.insert(remote_addr, remote_conn);
434+
} else {
435+
false
436+
};
389437

390-
// Track dropped packets and log warnings periodically
391-
let dropped_count = self.dropped_packets.entry(remote_addr).or_insert(0);
392-
*dropped_count += 1;
438+
if is_new_identity {
439+
tracing::info!(
440+
peer_addr = %remote_addr,
441+
"Detected new peer identity from existing address (issue #2277). \
442+
Peer likely restarted with new identity. Resetting session."
443+
);
444+
// Clean up rate-limit tracking for the old peer
445+
self.last_rsa_attempt.remove(&remote_addr);
446+
// Don't reinsert - let the packet fall through to gateway_connection
447+
// which will establish a fresh session with the new peer
448+
} else {
449+
// Forward packet to existing connection
450+
match remote_conn.inbound_packet_sender.try_send(packet_data) {
451+
Ok(_) => {
452+
self.remote_connections.insert(remote_addr, remote_conn);
453+
continue;
454+
}
455+
Err(fast_channel::TrySendError::Full(_)) => {
456+
// Channel full, reinsert connection
457+
self.remote_connections.insert(remote_addr, remote_conn);
458+
459+
// Track dropped packets and log warnings periodically
460+
let dropped_count = self.dropped_packets.entry(remote_addr).or_insert(0);
461+
*dropped_count += 1;
462+
463+
// Log warning every 10 seconds if packets are being dropped
464+
let now = Instant::now();
465+
if now.duration_since(self.last_drop_warning) > Duration::from_secs(10) {
466+
let total_dropped: u64 = self.dropped_packets.values().sum();
467+
tracing::warn!(
468+
total_dropped,
469+
elapsed_secs = 10,
470+
"Channel overflow: dropped packets (bandwidth limit may be too high or receiver too slow)"
471+
);
472+
for (addr, count) in &self.dropped_packets {
473+
if *count > 100 {
474+
tracing::warn!(
475+
peer_addr = %addr,
476+
dropped_count = count,
477+
"High packet drop rate for peer"
478+
);
479+
}
480+
}
481+
self.dropped_packets.clear();
482+
self.last_drop_warning = now;
483+
}
393484

394-
// Log warning every 10 seconds if packets are being dropped
395-
let now = Instant::now();
396-
if now.duration_since(self.last_drop_warning) > Duration::from_secs(10) {
397-
let total_dropped: u64 = self.dropped_packets.values().sum();
485+
// Drop the packet instead of falling through - prevents symmetric packets
486+
// from being sent to RSA decryption handlers
487+
continue;
488+
}
489+
Err(fast_channel::TrySendError::Disconnected(_)) => {
490+
// Channel closed, connection is dead
398491
tracing::warn!(
399-
total_dropped,
400-
elapsed_secs = 10,
401-
"Channel overflow: dropped packets (bandwidth limit may be too high or receiver too slow)"
492+
peer_addr = %remote_addr,
493+
"Connection closed, removing from active connections"
402494
);
403-
for (addr, count) in &self.dropped_packets {
404-
if *count > 100 {
405-
tracing::warn!(
406-
peer_addr = %addr,
407-
dropped_count = count,
408-
"High packet drop rate for peer"
409-
);
410-
}
411-
}
412-
self.dropped_packets.clear();
413-
self.last_drop_warning = now;
495+
// Clean up rate-limit tracking
496+
self.last_rsa_attempt.remove(&remote_addr);
497+
// Don't reinsert - connection is truly dead
498+
continue;
414499
}
415-
416-
// Drop the packet instead of falling through - prevents symmetric packets
417-
// from being sent to RSA decryption handlers
418-
continue;
419-
}
420-
Err(fast_channel::TrySendError::Disconnected(_)) => {
421-
// Channel closed, connection is dead
422-
tracing::warn!(
423-
peer_addr = %remote_addr,
424-
"Connection closed, removing from active connections"
425-
);
426-
// Don't reinsert - connection is truly dead
427-
continue;
428500
}
429501
}
430502
}
@@ -504,6 +576,8 @@ impl<S: Socket> UdpPacketsListener<S> {
504576
.collect();
505577
for stale_addr in stale_addrs {
506578
self.remote_connections.remove(&stale_addr);
579+
// Clean up rate-limit tracking for the stale connection
580+
self.last_rsa_attempt.remove(&stale_addr);
507581
tracing::debug!(
508582
stale_peer_addr = %stale_addr,
509583
new_peer_addr = %remote_addr,
@@ -687,6 +761,8 @@ impl<S: Socket> UdpPacketsListener<S> {
687761
if existing_conn.inbound_packet_sender.is_closed() {
688762
// Connection is dead, remove it
689763
self.remote_connections.remove(&remote_addr);
764+
// Clean up rate-limit tracking
765+
self.last_rsa_attempt.remove(&remote_addr);
690766
tracing::warn!(
691767
peer_addr = %remote_addr,
692768
direction = "outbound",
@@ -2509,4 +2585,157 @@ pub mod mock_transport {
25092585

25102586
Ok(())
25112587
}
2588+
2589+
/// Create a mock peer at a specific socket address for testing.
2590+
///
2591+
/// This allows testing scenarios where a peer reconnects from the same IP:port.
2592+
#[allow(dead_code)]
2593+
async fn create_mock_peer_at_addr(
2594+
packet_drop_policy: PacketDropPolicy,
2595+
addr: SocketAddr,
2596+
channels: Channels,
2597+
) -> anyhow::Result<(TransportPublicKey, OutboundConnectionHandler, SocketAddr)> {
2598+
let peer_keypair = TransportKeypair::new();
2599+
let peer_pub = peer_keypair.public.clone();
2600+
let socket = Arc::new(MockSocket::new(packet_drop_policy, addr, channels).await);
2601+
let (peer_conn, _inbound_conn) =
2602+
OutboundConnectionHandler::new_test(addr, socket, peer_keypair, false)
2603+
.expect("failed to create peer");
2604+
Ok((peer_pub, peer_conn, addr))
2605+
}
2606+
2607+
/// Test that a peer reconnecting from the SAME IP:port with a NEW identity is handled correctly.
2608+
///
2609+
/// This simulates the scenario from issue #2277 where:
2610+
/// 1. Peer A (identity X) connects to gateway from IP:port
2611+
/// 2. Peer A disconnects (peer restarts, clears state, gets new identity)
2612+
/// 3. Peer B (identity Y) tries to connect from SAME IP:port (NAT assigns same mapping)
2613+
/// 4. Gateway should recognize this as a new peer and establish a fresh session
2614+
///
2615+
/// The bug was that the gateway retained the old session with peer A's encryption keys,
2616+
/// causing peer B's handshake packets to fail decryption and be silently dropped.
2617+
#[tokio::test]
2618+
async fn gateway_handles_peer_reconnection_same_addr_new_identity() -> anyhow::Result<()> {
2619+
let channels = Arc::new(DashMap::new());
2620+
2621+
// Create the gateway
2622+
let (gw_pub, (gw_outbound, mut gw_conn), gw_addr) =
2623+
create_mock_gateway(Default::default(), channels.clone()).await?;
2624+
2625+
// Use a fixed port for testing the same-address scenario
2626+
let peer_addr: SocketAddr = (Ipv4Addr::LOCALHOST, 44444).into();
2627+
2628+
// Step 1: Create peer A with identity X at the fixed address
2629+
tracing::info!(
2630+
"Step 1: Creating peer A (identity X) at {} connecting to gateway at {}",
2631+
peer_addr,
2632+
gw_addr
2633+
);
2634+
2635+
let (peer_a_pub, mut peer_a, _) =
2636+
create_mock_peer_at_addr(Default::default(), peer_addr, channels.clone()).await?;
2637+
2638+
tracing::info!("Peer A public key: {:?}", peer_a_pub);
2639+
2640+
// Peer A connects to gateway
2641+
let gw_task = tokio::spawn(async move {
2642+
let conn = tokio::time::timeout(Duration::from_secs(10), gw_conn.recv())
2643+
.await?
2644+
.ok_or(anyhow::anyhow!(
2645+
"gateway: no inbound connection from peer A"
2646+
))?;
2647+
tracing::info!(
2648+
"Gateway received connection from peer A at {}",
2649+
conn.remote_addr()
2650+
);
2651+
Ok::<_, anyhow::Error>((gw_conn, conn))
2652+
});
2653+
2654+
let peer_a_conn = tokio::time::timeout(
2655+
Duration::from_secs(10),
2656+
peer_a.connect(gw_pub.clone(), gw_addr).await,
2657+
)
2658+
.await??;
2659+
tracing::info!("Peer A connected successfully");
2660+
2661+
let (mut gw_conn, _gw_peer_a_conn) = gw_task.await??;
2662+
2663+
// Step 2: Simulate peer restart by dropping everything related to peer A
2664+
// This simulates: peer process killed, state cleared, restarted with new identity
2665+
tracing::info!("Step 2: Simulating peer A restart (dropping connection and handler)");
2666+
drop(peer_a_conn);
2667+
drop(peer_a);
2668+
2669+
// Give time for cleanup (in reality this could be much longer)
2670+
tokio::time::sleep(Duration::from_millis(100)).await;
2671+
2672+
// Step 3: Create peer B with NEW identity (Y) at the SAME address
2673+
// This simulates the NAT assigning the same external IP:port to the restarted peer
2674+
tracing::info!(
2675+
"Step 3: Creating peer B (NEW identity Y) at SAME address {}",
2676+
peer_addr
2677+
);
2678+
2679+
let (peer_b_pub, mut peer_b, _) =
2680+
create_mock_peer_at_addr(Default::default(), peer_addr, channels.clone()).await?;
2681+
2682+
tracing::info!("Peer B public key: {:?}", peer_b_pub);
2683+
2684+
// Verify this test is actually testing new identity
2685+
assert_ne!(
2686+
peer_a_pub, peer_b_pub,
2687+
"Peer B must have different identity than peer A"
2688+
);
2689+
2690+
// Step 4: Peer B (new identity) connects to gateway from same IP:port
2691+
// This is where the bug manifests:
2692+
// - Gateway still has remote_connections entry for peer_addr with peer A's session
2693+
// - Peer B's handshake packets arrive, get sent to peer A's session handler
2694+
// - Decryption fails because peer B is using different keys
2695+
// - Handshake silently fails
2696+
tracing::info!("Step 4: Peer B (new identity) connecting to gateway from same address");
2697+
2698+
let gw_task = tokio::spawn(async move {
2699+
let conn = tokio::time::timeout(Duration::from_secs(5), gw_conn.recv())
2700+
.await?
2701+
.ok_or(anyhow::anyhow!(
2702+
"gateway: no inbound connection from peer B (bug #2277)"
2703+
))?;
2704+
tracing::info!(
2705+
"Gateway received connection from peer B at {}",
2706+
conn.remote_addr()
2707+
);
2708+
Ok::<_, anyhow::Error>(conn)
2709+
});
2710+
2711+
let start = std::time::Instant::now();
2712+
let peer_b_conn = tokio::time::timeout(
2713+
Duration::from_secs(5),
2714+
peer_b.connect(gw_pub.clone(), gw_addr).await,
2715+
)
2716+
.await??;
2717+
let elapsed = start.elapsed();
2718+
2719+
tracing::info!("Peer B connected in {:?}", elapsed);
2720+
2721+
let _gw_peer_b_conn = gw_task.await??;
2722+
2723+
// Verify the connection works
2724+
assert_eq!(peer_b_conn.remote_addr(), gw_addr);
2725+
2726+
// The connection should complete quickly
2727+
assert!(
2728+
elapsed < Duration::from_secs(2),
2729+
"Connection took {:?}, which suggests the gateway failed to recognize the new identity. \
2730+
This is bug #2277: gateway doesn't reset session when peer restarts with new identity.",
2731+
elapsed
2732+
);
2733+
2734+
// Cleanup
2735+
drop(peer_b_conn);
2736+
drop(peer_b);
2737+
drop(gw_outbound);
2738+
2739+
Ok(())
2740+
}
25122741
}

0 commit comments

Comments
 (0)