From d8c7b297393cf478b29fabf06581fd27c3992535 Mon Sep 17 00:00:00 2001 From: Aminu 'Seun Joshua <34725212+seun-ja@users.noreply.github.com> Date: Fri, 16 May 2025 16:15:35 +0100 Subject: [PATCH 1/4] replace duplicate code with common function --- src/codec/identity.rs | 2 +- src/protocol/libp2p/kademlia/bucket.rs | 4 +-- src/protocol/libp2p/kademlia/routing_table.rs | 34 ++++++++----------- src/protocol/mod.rs | 18 +++++++++- src/protocol/transport_service.rs | 15 +++----- src/transport/manager/mod.rs | 6 ++-- tests/conformance/rust/kademlia.rs | 2 +- 7 files changed, 43 insertions(+), 38 deletions(-) diff --git a/src/codec/identity.rs b/src/codec/identity.rs index 266bff4b..618dfa0c 100644 --- a/src/codec/identity.rs +++ b/src/codec/identity.rs @@ -102,7 +102,7 @@ mod tests { let bytes = [3u8; 64]; let mut bytes = BytesMut::from(&bytes[..]); - let decoded = codec.decode(&mut bytes); + let _decoded = codec.decode(&mut bytes); } #[test] diff --git a/src/protocol/libp2p/kademlia/bucket.rs b/src/protocol/libp2p/kademlia/bucket.rs index e1b115cf..4c999efc 100644 --- a/src/protocol/libp2p/kademlia/bucket.rs +++ b/src/protocol/libp2p/kademlia/bucket.rs @@ -128,7 +128,7 @@ mod tests { .collect::>(); let target = Key::from(PeerId::random()); - let mut iter = bucket.closest_iter(&target); + let iter = bucket.closest_iter(&target); let mut prev = None; for node in iter { @@ -173,7 +173,7 @@ mod tests { .collect::>(); let target = Key::from(PeerId::random()); - let mut iter = bucket.closest_iter(&target); + let iter = bucket.closest_iter(&target); let mut prev = None; let mut num_peers = 0usize; diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index e012318e..12d8258b 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -22,9 +22,12 @@ //! Kademlia routing table implementation. use crate::{ - protocol::libp2p::kademlia::{ - bucket::{KBucket, KBucketEntry}, - types::{ConnectionType, Distance, KademliaPeer, Key, U256}, + protocol::{ + libp2p::kademlia::{ + bucket::{KBucket, KBucketEntry}, + types::{ConnectionType, Distance, KademliaPeer, Key, U256}, + }, + sort_address, }, transport::{ manager::address::{scores, AddressRecord}, @@ -33,8 +36,7 @@ use crate::{ PeerId, }; -use multiaddr::{Multiaddr, Protocol}; -use multihash::Multihash; +use multiaddr::Multiaddr; /// Number of k-buckets. const NUM_BUCKETS: usize = 256; @@ -188,17 +190,7 @@ impl RoutingTable { ); // TODO: https://github.com/paritytech/litep2p/issues/337 this has to be moved elsewhere at some point - let addresses: Vec = addresses - .into_iter() - .filter_map(|address| { - let last = address.iter().last(); - if std::matches!(last, Some(Protocol::P2p(_))) { - Some(address) - } else { - Some(address.with(Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).ok()?))) - } - }) - .collect(); + let addresses: Vec = sort_address(addresses.into_iter(), peer); if addresses.is_empty() { tracing::debug!( @@ -304,7 +296,7 @@ impl Iterator for ClosestBucketsIter { self.state = ClosestBucketsIterState::ZoomIn(i); Some(i) } - ClosestBucketsIterState::ZoomIn(i) => + ClosestBucketsIterState::ZoomIn(i) => { if let Some(i) = self.next_in(i) { self.state = ClosestBucketsIterState::ZoomIn(i); Some(i) @@ -312,15 +304,17 @@ impl Iterator for ClosestBucketsIter { let i = BucketIndex(0); self.state = ClosestBucketsIterState::ZoomOut(i); Some(i) - }, - ClosestBucketsIterState::ZoomOut(i) => + } + } + ClosestBucketsIterState::ZoomOut(i) => { if let Some(i) = self.next_out(i) { self.state = ClosestBucketsIterState::ZoomOut(i); Some(i) } else { self.state = ClosestBucketsIterState::Done; None - }, + } + } ClosestBucketsIterState::Done => None, } } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 36591912..ca80b1c7 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -29,7 +29,8 @@ use crate::{ PeerId, }; -use multiaddr::Multiaddr; +use multiaddr::{Multiaddr, Protocol}; +use multihash::Multihash; use std::fmt::Debug; @@ -141,3 +142,18 @@ pub trait UserProtocol: Send { /// Start the the user protocol event loop. async fn run(self: Box, service: TransportService) -> crate::Result<()>; } + +pub fn sort_address(addresses: impl Iterator, peer_id: PeerId) -> Vec { + addresses + .filter_map(|address| { + let last = address.iter().last(); + if std::matches!(last, Some(Protocol::P2p(_))) { + Some(address) + } else { + Some(address.with(Protocol::P2p( + Multihash::from_bytes(&peer_id.to_bytes()).ok()?, + ))) + } + }) + .collect() +} diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 4e8bc93a..8e039661 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -28,8 +28,7 @@ use crate::{ }; use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; -use multiaddr::{Multiaddr, Protocol}; -use multihash::Multihash; +use multiaddr::Multiaddr; use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::{ @@ -44,6 +43,8 @@ use std::{ time::{Duration, Instant}, }; +use super::sort_address; + /// Logging target for the file. const LOG_TARGET: &str = "litep2p::transport-service"; @@ -467,15 +468,7 @@ impl TransportService { /// /// The list is filtered for duplicates and unsupported transports. pub fn add_known_address(&mut self, peer: &PeerId, addresses: impl Iterator) { - let addresses: HashSet = addresses - .filter_map(|address| { - if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) { - Some(address.with(Protocol::P2p(Multihash::from_bytes(&peer.to_bytes()).ok()?))) - } else { - Some(address) - } - }) - .collect(); + let addresses = sort_address(addresses.into_iter(), *peer); self.transport_handle.add_known_address(peer, addresses.into_iter()); } diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 5fc9e330..c84cd216 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -552,10 +552,11 @@ impl TransportManager { #[cfg(feature = "websocket")] Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) => SupportedTransport::WebSocket, Some(Protocol::P2p(_)) => SupportedTransport::Tcp, - _ => + _ => { return Err(Error::TransportNotSupported( address_record.address().clone(), - )), + )) + } }, #[cfg(feature = "quic")] Protocol::Udp(_) => match protocol_stack @@ -1358,6 +1359,7 @@ mod tests { rx: tokio::sync::mpsc::Receiver, } + #[allow(dead_code)] impl MockTransport { fn new(rx: tokio::sync::mpsc::Receiver) -> Self { Self { rx } diff --git a/tests/conformance/rust/kademlia.rs b/tests/conformance/rust/kademlia.rs index 8c9afbfa..84761914 100644 --- a/tests/conformance/rust/kademlia.rs +++ b/tests/conformance/rust/kademlia.rs @@ -24,7 +24,7 @@ use libp2p::{ identify, identity, kad::{ self, store::RecordStore, AddProviderOk, GetProvidersOk, InboundRequest, - KademliaEvent as Libp2pKademliaEvent, QueryResult, RecordKey as Libp2pRecordKey, + KademliaEvent as Libp2pKademliaEvent, QueryResult, }, swarm::{keep_alive, AddressScore, NetworkBehaviour, SwarmBuilder, SwarmEvent}, PeerId, Swarm, From 0323cc85e88fc92369d7b094f07ef183fedf21d4 Mon Sep 17 00:00:00 2001 From: Aminu 'Seun Joshua <34725212+seun-ja@users.noreply.github.com> Date: Wed, 4 Jun 2025 08:54:15 +0100 Subject: [PATCH 2/4] refactor: renaming function + cleanup --- src/protocol/libp2p/kademlia/routing_table.rs | 5 ++--- src/protocol/mod.rs | 5 ++++- src/protocol/transport_service.rs | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index 12d8258b..b5ea757b 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -23,11 +23,11 @@ use crate::{ protocol::{ + ensure_address_with_peer, libp2p::kademlia::{ bucket::{KBucket, KBucketEntry}, types::{ConnectionType, Distance, KademliaPeer, Key, U256}, }, - sort_address, }, transport::{ manager::address::{scores, AddressRecord}, @@ -189,8 +189,7 @@ impl RoutingTable { "add known peer" ); - // TODO: https://github.com/paritytech/litep2p/issues/337 this has to be moved elsewhere at some point - let addresses: Vec = sort_address(addresses.into_iter(), peer); + let addresses: Vec = ensure_address_with_peer(addresses.into_iter(), peer); if addresses.is_empty() { tracing::debug!( diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index ca80b1c7..758ac553 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -143,7 +143,10 @@ pub trait UserProtocol: Send { async fn run(self: Box, service: TransportService) -> crate::Result<()>; } -pub fn sort_address(addresses: impl Iterator, peer_id: PeerId) -> Vec { +pub fn ensure_address_with_peer( + addresses: impl Iterator, + peer_id: PeerId, +) -> Vec { addresses .filter_map(|address| { let last = address.iter().last(); diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 8e039661..190f1448 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -43,7 +43,7 @@ use std::{ time::{Duration, Instant}, }; -use super::sort_address; +use super::ensure_address_with_peer; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::transport-service"; @@ -468,7 +468,7 @@ impl TransportService { /// /// The list is filtered for duplicates and unsupported transports. pub fn add_known_address(&mut self, peer: &PeerId, addresses: impl Iterator) { - let addresses = sort_address(addresses.into_iter(), *peer); + let addresses = ensure_address_with_peer(addresses.into_iter(), *peer); self.transport_handle.add_known_address(peer, addresses.into_iter()); } From a375a0c11c905c2c6c8f3bd2b22d61065f1bb007 Mon Sep 17 00:00:00 2001 From: Aminu 'Seun Joshua <34725212+seun-ja@users.noreply.github.com> Date: Wed, 4 Jun 2025 13:12:30 +0100 Subject: [PATCH 3/4] fixes fmt warnings --- src/protocol/libp2p/kademlia/routing_table.rs | 10 ++++------ src/transport/manager/mod.rs | 5 ++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index b5ea757b..8d31dedd 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -295,7 +295,7 @@ impl Iterator for ClosestBucketsIter { self.state = ClosestBucketsIterState::ZoomIn(i); Some(i) } - ClosestBucketsIterState::ZoomIn(i) => { + ClosestBucketsIterState::ZoomIn(i) => if let Some(i) = self.next_in(i) { self.state = ClosestBucketsIterState::ZoomIn(i); Some(i) @@ -303,17 +303,15 @@ impl Iterator for ClosestBucketsIter { let i = BucketIndex(0); self.state = ClosestBucketsIterState::ZoomOut(i); Some(i) - } - } - ClosestBucketsIterState::ZoomOut(i) => { + }, + ClosestBucketsIterState::ZoomOut(i) => if let Some(i) = self.next_out(i) { self.state = ClosestBucketsIterState::ZoomOut(i); Some(i) } else { self.state = ClosestBucketsIterState::Done; None - } - } + }, ClosestBucketsIterState::Done => None, } } diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 87333b3d..93562292 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -572,11 +572,10 @@ impl TransportManager { #[cfg(feature = "websocket")] Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) => SupportedTransport::WebSocket, Some(Protocol::P2p(_)) => SupportedTransport::Tcp, - _ => { + _ => return Err(Error::TransportNotSupported( address_record.address().clone(), - )) - } + )), }, #[cfg(feature = "quic")] Protocol::Udp(_) => match protocol_stack From 2a30ddcb83baebc2e82be31793f424734818bc89 Mon Sep 17 00:00:00 2001 From: Aminu 'Seun Joshua <34725212+seun-ja@users.noreply.github.com> Date: Thu, 26 Jun 2025 15:30:55 +0100 Subject: [PATCH 4/4] duplicate removed --- src/protocol/transport_service.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index fe0fbc8b..f889c8fd 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -468,7 +468,10 @@ impl TransportService { /// /// The list is filtered for duplicates and unsupported transports. pub fn add_known_address(&mut self, peer: &PeerId, addresses: impl Iterator) { - let addresses = ensure_address_with_peer(addresses.into_iter(), *peer); + let mut addresses = ensure_address_with_peer(addresses.into_iter(), *peer); + + addresses.sort(); + addresses.dedup(); self.transport_handle.add_known_address(peer, addresses.into_iter()); }