Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::network::{
};
use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage};
use async_trait::async_trait;
use dashcore::network::address::AddrV2Message;
use dashcore::network::constants::ServiceFlags;
use dashcore::network::message::NetworkMessage;
use dashcore::network::message_headers2::CompressionState;
Expand Down Expand Up @@ -152,7 +153,11 @@ impl PeerNetworkManager {
pub async fn start(&self) -> Result<(), Error> {
log::info!("Starting peer network manager for {:?}", self.network);

let mut peer_addresses = self.initial_peers.clone();
let mut peer_addresses: Vec<AddrV2Message> = self
.initial_peers
.iter()
.map(|addr| AddrV2Message::new(*addr, ServiceFlags::NETWORK))
.collect();

if self.exclusive_mode {
log::info!(
Expand All @@ -161,7 +166,10 @@ impl PeerNetworkManager {
);
} else {
// Load saved peers from disk
let saved_peers = self.peer_store.load_peers().await.unwrap_or_default();
let saved_peers = self.peer_store.load_peers().await.unwrap_or_else(|e| {
tracing::warn!("Failed to load peers: {}", e);
Vec::new()
});
peer_addresses.extend(saved_peers);

// If we still have no peers, immediately discover via DNS
Expand All @@ -171,10 +179,16 @@ impl PeerNetworkManager {
self.network
);
let dns_peers = self.discovery.discover_peers(self.network).await;
peer_addresses.extend(dns_peers.iter().take(TARGET_PEERS));
let dns_peers_found = dns_peers.len();
peer_addresses.extend(
dns_peers
.into_iter()
.take(TARGET_PEERS)
.map(|addr| AddrV2Message::new(addr, ServiceFlags::NETWORK)),
);
log::info!(
"DNS discovery found {} peers, using {} for startup",
dns_peers.len(),
dns_peers_found,
peer_addresses.len()
);
} else {
Expand All @@ -185,15 +199,7 @@ impl PeerNetworkManager {
}
}

// Connect to peers (all in exclusive mode, or up to TARGET_PEERS in normal mode)
let max_connections = if self.exclusive_mode {
peer_addresses.len()
} else {
TARGET_PEERS
};
for addr in peer_addresses.iter().take(max_connections) {
self.connect_to_peer(*addr).await;
}
self.addrv2_handler.handle_addrv2(peer_addresses.clone()).await;

// Start maintenance loop
self.start_maintenance_loop().await;
Expand Down
8 changes: 3 additions & 5 deletions dash-spv/src/storage/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait PeerStorage {
peers: &[dashcore::network::address::AddrV2Message],
) -> StorageResult<()>;

async fn load_peers(&self) -> StorageResult<Vec<SocketAddr>>;
async fn load_peers(&self) -> StorageResult<Vec<AddrV2Message>>;

async fn save_peers_reputation(
&self,
Expand Down Expand Up @@ -90,7 +90,7 @@ impl PeerStorage for PersistentPeerStorage {
Ok(())
}

async fn load_peers(&self) -> StorageResult<Vec<SocketAddr>> {
async fn load_peers(&self) -> StorageResult<Vec<AddrV2Message>> {
let peers_file = self.peers_data_file();

if !fs::try_exists(&peers_file).await? {
Expand Down Expand Up @@ -122,8 +122,6 @@ impl PeerStorage for PersistentPeerStorage {
.await
.map_err(|e| StorageError::ReadFailed(format!("Failed to load peers: {e}")))??;

let peers = peers.into_iter().filter_map(|p| p.socket_addr().ok()).collect();

Ok(peers)
}

Expand Down Expand Up @@ -190,7 +188,7 @@ mod tests {

let loaded = store.load_peers().await.expect("Failed to load peers in test");
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0], addr);
assert_eq!(loaded[0].socket_addr().unwrap(), addr);
}

#[tokio::test]
Expand Down
16 changes: 16 additions & 0 deletions dash/src/network/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,22 @@ pub struct AddrV2Message {
}

impl AddrV2Message {
pub fn new(socket_addr: SocketAddr, flags: ServiceFlags) -> Self {
let time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32;
let addr_v2 = match socket_addr.ip() {
std::net::IpAddr::V4(v4) => AddrV2::Ipv4(v4),
std::net::IpAddr::V6(v6) => AddrV2::Ipv6(v6),
};
Self {
time,
services: flags,
addr: addr_v2,
port: socket_addr.port(),
}
}
/// Extract socket address from an [AddrV2Message] message.
/// This will return [io::Error] [io::ErrorKind::AddrNotAvailable]
/// if the address type can't be converted into a [SocketAddr].
Expand Down
Loading