Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 45 additions & 39 deletions dash-spv/src/network/addrv2.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! AddrV2 message handling for modern peer exchange protocol

use rand::prelude::*;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
Expand All @@ -13,10 +13,23 @@ use dashcore::network::message::NetworkMessage;

use crate::network::constants::{MAX_ADDR_TO_SEND, MAX_ADDR_TO_STORE};

const ONE_WEEK: u32 = 7 * 24 * 60 * 60;
const TEN_MINUTES: u32 = 600;

/// Evict oldest entries if the map exceeds capacity, keeping the freshest addresses.
fn evict_if_needed(peers: &mut HashMap<SocketAddr, AddrV2Message>) {
if peers.len() > MAX_ADDR_TO_STORE {
let mut entries: Vec<_> = peers.drain().collect();
entries.sort_by_key(|(_, msg)| std::cmp::Reverse(msg.time));
entries.truncate(MAX_ADDR_TO_STORE);
peers.extend(entries);
}
}

/// Handler for AddrV2 peer exchange protocol
pub struct AddrV2Handler {
/// Known peer addresses from AddrV2 messages
known_peers: Arc<RwLock<Vec<AddrV2Message>>>,
known_peers: Arc<RwLock<HashMap<SocketAddr, AddrV2Message>>>,
/// Peers that support AddrV2
supports_addrv2: Arc<RwLock<HashSet<SocketAddr>>>,
}
Expand All @@ -25,7 +38,7 @@ impl AddrV2Handler {
/// Create a new AddrV2 handler
pub fn new() -> Self {
Self {
known_peers: Arc::new(RwLock::new(Vec::new())),
known_peers: Arc::new(RwLock::new(HashMap::new())),
supports_addrv2: Arc::new(RwLock::new(HashSet::new())),
}
}
Expand All @@ -47,44 +60,43 @@ impl AddrV2Handler {
})
.as_secs() as u32;

let _initial_count = known_peers.len();
let received = messages.len();
let mut added = 0;
let mut updated = 0;

for msg in messages {
// Validate timestamp
// Accept addresses from up to 3 hours ago and up to 10 minutes in the future
if msg.time <= now.saturating_sub(10800) || msg.time > now + 600 {
// Accept addresses seen within the last week. Older addresses are likely stale.
// Also, reject timestamps more than 10 minutes in the future which are invalid.
if msg.time < now.saturating_sub(ONE_WEEK) || msg.time > now + TEN_MINUTES {
log::trace!("Ignoring AddrV2 with invalid timestamp: {}", msg.time);
continue;
}

// Only store if we can convert to socket address
if msg.socket_addr().is_ok() {
known_peers.push(msg);
added += 1;
let Ok(socket_addr) = msg.socket_addr() else {
continue;
};

// Only update if new or has fresher timestamp
match known_peers.get(&socket_addr) {
Some(existing) if existing.time >= msg.time => continue,
Some(_) => {
known_peers.insert(socket_addr, msg);
updated += 1;
}
None => {
known_peers.insert(socket_addr, msg);
added += 1;
}
}
}

// Sort by timestamp (newest first) and deduplicate
known_peers.sort_by_key(|a| std::cmp::Reverse(a.time));
evict_if_needed(&mut known_peers);

// Deduplicate by socket address
let mut seen = HashSet::new();
known_peers.retain(|addr| {
if let Ok(socket_addr) = addr.socket_addr() {
seen.insert(socket_addr)
} else {
false
}
});

// Keep only the most recent addresses
known_peers.truncate(MAX_ADDR_TO_STORE);

let _processed_count = added;
log::info!(
"Processed AddrV2 messages: added {}, total known peers: {}",
"Processed AddrV2 messages: received {}, added {}, updated {}, total known peers: {}",
received,
added,
updated,
known_peers.len()
);
}
Expand All @@ -97,14 +109,13 @@ impl AddrV2Handler {
return vec![];
}

// Select random subset
// Select random subset from values
let mut rng = thread_rng();
let count = count.min(MAX_ADDR_TO_SEND).min(known_peers.len());

let addresses: Vec<AddrV2Message> =
known_peers.choose_multiple(&mut rng, count).cloned().collect();
known_peers.values().choose_multiple(&mut rng, count).into_iter().cloned().collect();

log::debug!("Sharing {} addresses with peer", addresses.len());
addresses
}

Expand All @@ -115,7 +126,7 @@ impl AddrV2Handler {

/// Get all known socket addresses
pub async fn get_known_addresses(&self) -> Vec<AddrV2Message> {
self.known_peers.read().await.clone()
self.known_peers.read().await.values().cloned().collect()
}

/// Add a known peer address
Expand All @@ -141,13 +152,8 @@ impl AddrV2Handler {
};

let mut known_peers = self.known_peers.write().await;
known_peers.push(addr_msg);

// Keep size under control
if known_peers.len() > MAX_ADDR_TO_STORE {
known_peers.sort_by_key(|a| std::cmp::Reverse(a.time));
known_peers.truncate(MAX_ADDR_TO_STORE);
}
known_peers.insert(addr, addr_msg);
evict_if_needed(&mut known_peers);
}

/// Build a GetAddr response message
Expand Down
35 changes: 32 additions & 3 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::network::{
};
use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage};
use async_trait::async_trait;
use dashcore::network::address::{AddrV2, AddrV2Message};
use dashcore::network::constants::ServiceFlags;
use dashcore::network::message::NetworkMessage;
use dashcore::network::message_headers2::CompressionState;
Expand Down Expand Up @@ -251,6 +252,11 @@ impl PeerNetworkManager {
Ok(_) => {
log::info!("Successfully connected to {}", addr);

// Request addresses from the peer for discovery
if let Err(e) = peer.send_message(NetworkMessage::GetAddr).await {
log::warn!("Failed to send GetAddr to {}: {}", addr, e);
}

// Record successful connection
reputation_manager.record_successful_connection(addr).await;

Expand Down Expand Up @@ -455,9 +461,32 @@ impl PeerNetworkManager {
);
continue;
}
NetworkMessage::Addr(_) => {
// Handle legacy addr messages (convert to AddrV2 if needed)
log::trace!("Received legacy addr message from {}", addr);
NetworkMessage::Addr(addresses) => {
// Convert legacy addr messages to AddrV2 format
let converted: Vec<AddrV2Message> = addresses
.iter()
.filter_map(|(time, a)| {
let socket = a.socket_addr().ok()?;
let addr_v2 = match socket.ip() {
std::net::IpAddr::V4(v4) => AddrV2::Ipv4(v4),
std::net::IpAddr::V6(v6) => AddrV2::Ipv6(v6),
};
Some(AddrV2Message {
time: *time,
services: a.services,
addr: addr_v2,
port: socket.port(),
})
})
.collect();
if !converted.is_empty() {
log::debug!(
"Converted {} legacy addr entries from {}",
converted.len(),
addr
);
addrv2_handler.handle_addrv2(converted).await;
}
continue;
}
NetworkMessage::Headers(headers) => {
Expand Down
Loading