Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 34 additions & 66 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinSet;
use tokio::time;
Expand Down Expand Up @@ -57,8 +57,6 @@ pub struct PeerNetworkManager {
tasks: Arc<Mutex<JoinSet<()>>>,
/// Initial peer addresses
initial_peers: Vec<SocketAddr>,
/// When we first started needing peers (for DNS delay)
peer_search_started: Arc<Mutex<Option<SystemTime>>>,
/// Current sync peer (sticky during sync operations)
current_sync_peer: Arc<Mutex<Option<SocketAddr>>>,
/// Data directory for storage
Expand Down Expand Up @@ -115,7 +113,6 @@ impl PeerNetworkManager {
shutdown_token: CancellationToken::new(),
tasks: Arc::new(Mutex::new(JoinSet::new())),
initial_peers: config.peers.clone(),
peer_search_started: Arc::new(Mutex::new(None)),
current_sync_peer: Arc::new(Mutex::new(None)),
data_dir,
mempool_strategy: config.mempool_strategy,
Expand Down Expand Up @@ -744,7 +741,6 @@ impl PeerNetworkManager {
let addrv2_handler = self.addrv2_handler.clone();
let peer_store = self.peer_store.clone();
let reputation_manager = self.reputation_manager.clone();
let peer_search_started = self.peer_search_started.clone();
let initial_peers = self.initial_peers.clone();
let connected_peer_count = self.connected_peer_count.clone();

Expand All @@ -762,6 +758,9 @@ impl PeerNetworkManager {

let mut tasks = self.tasks.lock().await;
tasks.spawn(async move {
// Periodic DNS discovery check (only active in non-exclusive mode)
let mut dns_interval = time::interval(DNS_DISCOVERY_DELAY);

while !shutdown_token.is_cancelled() {
// Clean up disconnected peers
pool.cleanup_disconnected().await;
Expand All @@ -787,21 +786,6 @@ impl PeerNetworkManager {
} else {
// Normal mode: try to maintain minimum peer count with discovery
if count < MIN_PEERS {
// Track when we first started needing peers
let mut search_started = peer_search_started.lock().await;
if search_started.is_none() {
*search_started = Some(SystemTime::now());
log::info!("Below minimum peers ({}/{}), starting peer search (will try DNS after {}s)", count, MIN_PEERS, DNS_DISCOVERY_DELAY.as_secs());
}
let search_time = match *search_started {
Some(time) => time,
None => {
log::error!("Search time not set when expected");
continue;
}
};
drop(search_started);

// Try known addresses first, sorted by reputation
let known = addrv2_handler.get_known_addresses().await;
let needed = TARGET_PEERS.saturating_sub(count);
Expand All @@ -825,51 +809,6 @@ impl PeerNetworkManager {
}
}

// If still need more, check if we can use DNS (after 10 second delay)
let count = pool.peer_count().await;
if count < MIN_PEERS {
let elapsed = SystemTime::now()
.duration_since(search_time)
.unwrap_or_else(|e| {
log::warn!("System time error calculating elapsed time: {}", e);
Duration::ZERO
});
if elapsed >= DNS_DISCOVERY_DELAY {
log::info!("Using DNS discovery after {}s delay", elapsed.as_secs());
let dns_peers = tokio::select! {
peers = discovery.discover_peers(network) => peers,
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during DNS discovery");
break;
}
};
let mut dns_attempted = 0;
for addr in dns_peers.into_iter() {
if !pool.is_connected(&addr).await && !pool.is_connecting(&addr).await {
tokio::select! {
_= connect_fn(addr) => {},
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during connection attempt (dns)");
break;
}
}
dns_attempted += 1;
if dns_attempted >= needed {
break;
}
}
}
} else {
log::debug!("Waiting for DNS delay: {}s elapsed, need {}s", elapsed.as_secs(), DNS_DISCOVERY_DELAY.as_secs());
}
}
} else {
// We have enough peers, reset the search timer
let mut search_started = peer_search_started.lock().await;
if search_started.is_some() {
log::trace!("Peer count restored, resetting DNS delay timer");
*search_started = None;
}
}
}

Expand Down Expand Up @@ -909,6 +848,36 @@ impl PeerNetworkManager {
_ = time::sleep(MAINTENANCE_INTERVAL) => {
log::debug!("Maintenance interval elapsed");
}
_ = dns_interval.tick(), if !exclusive_mode => {
let count = pool.peer_count().await;
if count >= MIN_PEERS {
continue;
}
let dns_peers = tokio::select! {
peers = discovery.discover_peers(network) => peers,
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during DNS discovery");
break;
}
};
let needed = TARGET_PEERS.saturating_sub(count);
let mut dns_attempted = 0;
for addr in dns_peers.iter() {
if !pool.is_connected(addr).await && !pool.is_connecting(addr).await {
tokio::select! {
_= connect_fn(*addr) => {},
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during connection attempt (dns)");
break;
}
}
dns_attempted += 1;
if dns_attempted >= needed {
break;
}
}
}
}
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down");
break;
Expand Down Expand Up @@ -1296,7 +1265,6 @@ impl Clone for PeerNetworkManager {
shutdown_token: self.shutdown_token.clone(),
tasks: self.tasks.clone(),
initial_peers: self.initial_peers.clone(),
peer_search_started: self.peer_search_started.clone(),
current_sync_peer: self.current_sync_peer.clone(),
data_dir: self.data_dir.clone(),
mempool_strategy: self.mempool_strategy,
Expand Down
Loading