Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 35 additions & 70 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,112 +734,83 @@ impl PeerNetworkManager {

/// Start peer connection maintenance loop
async fn start_maintenance_loop(&self) {
let pool = self.pool.clone();
let discovery = self.discovery.clone();
let network = self.network;
let shutdown_token = self.shutdown_token.clone();
let addrv2_handler = self.addrv2_handler.clone();
let peer_store = self.peer_store.clone();
let reputation_manager = self.reputation_manager.clone();
let initial_peers = self.initial_peers.clone();
let connected_peer_count = self.connected_peer_count.clone();

// Check if we're in exclusive mode (explicit flag or peers configured)
let exclusive_mode = self.exclusive_mode;

// Clone self for peer callback
let connect_fn = {
let this = self.clone();
move |addr| {
let this = this.clone();
async move { this.connect_to_peer(addr).await }
}
};

let this = self.clone();
let mut tasks = self.tasks.lock().await;
tasks.spawn(async move {
// Periodic DNS discovery check (only active in non-exclusive mode)
let mut dns_interval = time::interval(DNS_DISCOVERY_DELAY);

while !shutdown_token.is_cancelled() {
while !this.shutdown_token.is_cancelled() {
// Clean up disconnected peers
pool.cleanup_disconnected().await;
this.pool.cleanup_disconnected().await;

let count = pool.peer_count().await;
let count = this.pool.peer_count().await;
log::debug!("Connected peers: {}", count);
// Keep the cached counter in sync with actual pool count
connected_peer_count.store(count, Ordering::Relaxed);
if exclusive_mode {
this.connected_peer_count.store(count, Ordering::Relaxed);
if this.exclusive_mode {
// In exclusive mode, only reconnect to originally specified peers
for addr in initial_peers.iter() {
if !pool.is_connected(addr).await && !pool.is_connecting(addr).await {
for addr in this.initial_peers.iter() {
if !this.pool.is_connected(addr).await
&& !this.pool.is_connecting(addr).await
{
log::info!("Reconnecting to exclusive peer: {}", addr);
tokio::select! {
_= connect_fn(*addr) => {},
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during connection attempt (exclusive)");
break;
}
}
this.connect_to_peer(*addr).await;
}
}
} else {
// Normal mode: try to maintain minimum peer count with discovery
if count < MIN_PEERS {
// Try known addresses first, sorted by reputation
let known = addrv2_handler.get_known_addresses().await;
let known = this.addrv2_handler.get_known_addresses().await;
let needed = TARGET_PEERS.saturating_sub(count);
// Select best peers based on reputation
let best_peers = reputation_manager.select_best_peers(known, needed * 2).await;
let best_peers =
this.reputation_manager.select_best_peers(known, needed * 2).await;
let mut attempted = 0;

for addr in best_peers {
if !pool.is_connected(&addr).await && !pool.is_connecting(&addr).await {
tokio::select! {
_= connect_fn(addr) => {},
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during connection attempt (min peers)");
break;
}
}
if !this.pool.is_connected(&addr).await
&& !this.pool.is_connecting(&addr).await
{
this.connect_to_peer(addr).await;
attempted += 1;
if attempted >= needed {
break;
}
}
}

}
}

// Send ping to all peers if needed
for (addr, peer) in pool.get_all_peers().await {
for (addr, peer) in this.pool.get_all_peers().await {
let mut peer_guard = peer.write().await;
if peer_guard.should_ping() {
if let Err(e) = peer_guard.send_ping().await {
log::error!("Failed to ping {}: {}", addr, e);
// Update reputation for ping failure
reputation_manager.update_reputation(
addr,
misbehavior_scores::TIMEOUT,
"Ping failed",
).await;
this.reputation_manager
.update_reputation(addr, misbehavior_scores::TIMEOUT, "Ping failed")
.await;
}
}
peer_guard.cleanup_old_pings();
}

// Only save known peers if not in exclusive mode
if !exclusive_mode {
let addresses = addrv2_handler.get_addresses_for_peer(MAX_ADDR_TO_STORE).await;
if !this.exclusive_mode {
let addresses =
this.addrv2_handler.get_addresses_for_peer(MAX_ADDR_TO_STORE).await;
if !addresses.is_empty() {
if let Err(e) = peer_store.save_peers(&addresses).await {
if let Err(e) = this.peer_store.save_peers(&addresses).await {
log::warn!("Failed to save peers: {}", e);
}
}

// Save reputation data periodically
if let Err(e) = reputation_manager.save_to_storage(&*peer_store).await {
if let Err(e) = this.reputation_manager.save_to_storage(&*this.peer_store).await
{
log::warn!("Failed to save reputation data: {}", e);
}
}
Expand All @@ -848,37 +819,31 @@ impl PeerNetworkManager {
_ = time::sleep(MAINTENANCE_INTERVAL) => {
log::debug!("Maintenance interval elapsed");
}
_ = dns_interval.tick(), if !exclusive_mode => {
let count = pool.peer_count().await;
_ = dns_interval.tick(), if !this.exclusive_mode => {
let count = this.pool.peer_count().await;
if count >= MIN_PEERS {
continue;
}
let dns_peers = tokio::select! {
peers = discovery.discover_peers(network) => peers,
_ = shutdown_token.cancelled() => {
peers = this.discovery.discover_peers(this.network) => peers,
_ = this.shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during DNS discovery");
break;
}
};
let needed = TARGET_PEERS.saturating_sub(count);
let mut dns_attempted = 0;
for addr in dns_peers.iter() {
if !pool.is_connected(addr).await && !pool.is_connecting(addr).await {
tokio::select! {
_= connect_fn(*addr) => {},
_ = shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down during connection attempt (dns)");
break;
}
}
if !this.pool.is_connected(addr).await && !this.pool.is_connecting(addr).await {
this.connect_to_peer(*addr).await;
dns_attempted += 1;
if dns_attempted >= needed {
break;
}
}
}
}
_ = shutdown_token.cancelled() => {
_ = this.shutdown_token.cancelled() => {
log::info!("Maintenance loop shutting down");
break;
}
Expand Down
Loading