diff --git a/dash-spv/src/network/addrv2.rs b/dash-spv/src/network/addrv2.rs index 896b24596..819a9e6fe 100644 --- a/dash-spv/src/network/addrv2.rs +++ b/dash-spv/src/network/addrv2.rs @@ -1,7 +1,7 @@ //! AddrV2 message handling for modern peer exchange protocol use rand::prelude::*; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -13,10 +13,23 @@ use dashcore::network::message::NetworkMessage; use crate::network::constants::{MAX_ADDR_TO_SEND, MAX_ADDR_TO_STORE}; +const ONE_WEEK: u32 = 7 * 24 * 60 * 60; +const TEN_MINUTES: u32 = 600; + +/// Evict oldest entries if the map exceeds capacity, keeping the freshest addresses. +fn evict_if_needed(peers: &mut HashMap) { + if peers.len() > MAX_ADDR_TO_STORE { + let mut entries: Vec<_> = peers.drain().collect(); + entries.sort_by_key(|(_, msg)| std::cmp::Reverse(msg.time)); + entries.truncate(MAX_ADDR_TO_STORE); + peers.extend(entries); + } +} + /// Handler for AddrV2 peer exchange protocol pub struct AddrV2Handler { /// Known peer addresses from AddrV2 messages - known_peers: Arc>>, + known_peers: Arc>>, /// Peers that support AddrV2 supports_addrv2: Arc>>, } @@ -25,7 +38,7 @@ impl AddrV2Handler { /// Create a new AddrV2 handler pub fn new() -> Self { Self { - known_peers: Arc::new(RwLock::new(Vec::new())), + known_peers: Arc::new(RwLock::new(HashMap::new())), supports_addrv2: Arc::new(RwLock::new(HashSet::new())), } } @@ -47,44 +60,43 @@ impl AddrV2Handler { }) .as_secs() as u32; - let _initial_count = known_peers.len(); + let received = messages.len(); let mut added = 0; + let mut updated = 0; for msg in messages { - // Validate timestamp - // Accept addresses from up to 3 hours ago and up to 10 minutes in the future - if msg.time <= now.saturating_sub(10800) || msg.time > now + 600 { + // Accept addresses seen within the last week. Older addresses are likely stale. + // Also, reject timestamps more than 10 minutes in the future which are invalid. + if msg.time < now.saturating_sub(ONE_WEEK) || msg.time > now + TEN_MINUTES { log::trace!("Ignoring AddrV2 with invalid timestamp: {}", msg.time); continue; } - // Only store if we can convert to socket address - if msg.socket_addr().is_ok() { - known_peers.push(msg); - added += 1; + let Ok(socket_addr) = msg.socket_addr() else { + continue; + }; + + // Only update if new or has fresher timestamp + match known_peers.get(&socket_addr) { + Some(existing) if existing.time >= msg.time => continue, + Some(_) => { + known_peers.insert(socket_addr, msg); + updated += 1; + } + None => { + known_peers.insert(socket_addr, msg); + added += 1; + } } } - // Sort by timestamp (newest first) and deduplicate - known_peers.sort_by_key(|a| std::cmp::Reverse(a.time)); + evict_if_needed(&mut known_peers); - // Deduplicate by socket address - let mut seen = HashSet::new(); - known_peers.retain(|addr| { - if let Ok(socket_addr) = addr.socket_addr() { - seen.insert(socket_addr) - } else { - false - } - }); - - // Keep only the most recent addresses - known_peers.truncate(MAX_ADDR_TO_STORE); - - let _processed_count = added; log::info!( - "Processed AddrV2 messages: added {}, total known peers: {}", + "Processed AddrV2 messages: received {}, added {}, updated {}, total known peers: {}", + received, added, + updated, known_peers.len() ); } @@ -97,14 +109,13 @@ impl AddrV2Handler { return vec![]; } - // Select random subset + // Select random subset from values let mut rng = thread_rng(); let count = count.min(MAX_ADDR_TO_SEND).min(known_peers.len()); let addresses: Vec = - known_peers.choose_multiple(&mut rng, count).cloned().collect(); + known_peers.values().choose_multiple(&mut rng, count).into_iter().cloned().collect(); - log::debug!("Sharing {} addresses with peer", addresses.len()); addresses } @@ -115,7 +126,7 @@ impl AddrV2Handler { /// Get all known socket addresses pub async fn get_known_addresses(&self) -> Vec { - self.known_peers.read().await.clone() + self.known_peers.read().await.values().cloned().collect() } /// Add a known peer address @@ -141,13 +152,8 @@ impl AddrV2Handler { }; let mut known_peers = self.known_peers.write().await; - known_peers.push(addr_msg); - - // Keep size under control - if known_peers.len() > MAX_ADDR_TO_STORE { - known_peers.sort_by_key(|a| std::cmp::Reverse(a.time)); - known_peers.truncate(MAX_ADDR_TO_STORE); - } + known_peers.insert(addr, addr_msg); + evict_if_needed(&mut known_peers); } /// Build a GetAddr response message diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 60f41b5b7..fc69a4630 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -26,6 +26,7 @@ use crate::network::{ }; use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; use async_trait::async_trait; +use dashcore::network::address::{AddrV2, AddrV2Message}; use dashcore::network::constants::ServiceFlags; use dashcore::network::message::NetworkMessage; use dashcore::network::message_headers2::CompressionState; @@ -251,6 +252,11 @@ impl PeerNetworkManager { Ok(_) => { log::info!("Successfully connected to {}", addr); + // Request addresses from the peer for discovery + if let Err(e) = peer.send_message(NetworkMessage::GetAddr).await { + log::warn!("Failed to send GetAddr to {}: {}", addr, e); + } + // Record successful connection reputation_manager.record_successful_connection(addr).await; @@ -455,9 +461,32 @@ impl PeerNetworkManager { ); continue; } - NetworkMessage::Addr(_) => { - // Handle legacy addr messages (convert to AddrV2 if needed) - log::trace!("Received legacy addr message from {}", addr); + NetworkMessage::Addr(addresses) => { + // Convert legacy addr messages to AddrV2 format + let converted: Vec = addresses + .iter() + .filter_map(|(time, a)| { + let socket = a.socket_addr().ok()?; + let addr_v2 = match socket.ip() { + std::net::IpAddr::V4(v4) => AddrV2::Ipv4(v4), + std::net::IpAddr::V6(v6) => AddrV2::Ipv6(v6), + }; + Some(AddrV2Message { + time: *time, + services: a.services, + addr: addr_v2, + port: socket.port(), + }) + }) + .collect(); + if !converted.is_empty() { + log::debug!( + "Converted {} legacy addr entries from {}", + converted.len(), + addr + ); + addrv2_handler.handle_addrv2(converted).await; + } continue; } NetworkMessage::Headers(headers) => {