Skip to content

Conversation

@dharjeezy
Copy link
Contributor

closes #415

_ = interval.tick() => {
for peer in &self.peers {
tracing::trace!(target: LOG_TARGET, ?peer, "sending ping");
if let Err(error) = self.service.open_substream(*peer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The libp2p ping protocol spec states that the repeated pings should go on the same substream: https://github.com/libp2p/specs/blob/master/ping/ping.md

Because of this, the issue is actually more difficult due to the need to not keep the connection alive by this ping substream.

I don't think we should break the spec even if it works with the current libp2p implementation. @lexnv what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I would comply with the libp2p spec as much as possible. The main issue would be that we cannot distinguish ping susbtreams for the keep alive connection tracker 🤔

Are we entirely convinced we need to implement this for ping?

  • we'll need to adjust the substream API to expose that the substream is ping (accounting towards the keepalive)
  • how do we treat failures here? If we cannot open a substream should we terminate the connection? If so, we need to propagate again to the higher levels the signal to terminate the connection
    • offhand, from ping we'll need to propagate towards the transport manager, and from the transport manager we'll need to propagate to individual connections

Copy link
Collaborator

@dmitry-markin dmitry-markin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is going in the right direction, but there should be a way to simplify the implementation by not introducing separate async tasks for inbound/outbound substreams and not adding command channels. It should be possible to implement polling of inbound substreams by putting them into FuturesUnordered or, may be, tokio_stream::StreamMap. The latter should be quite easy as there is maximum one inbound substream allowed from any remote peer as per ping spec: https://github.com/libp2p/specs/blob/master/ping/ping.md

@dmitry-markin
Copy link
Collaborator

Hey, this is going in the right direction, but there should be a way to simplify the implementation by not introducing separate async tasks for inbound/outbound substreams and not adding command channels. It should be possible to implement polling of inbound substreams by putting them into FuturesUnordered or, may be, tokio_stream::StreamMap. The latter should be quite easy as there is maximum one inbound substream allowed from any remote peer as per ping spec: https://github.com/libp2p/specs/blob/master/ping/ping.md

tokio_stream::StreamMap might not work because we need to also write to substream and not only poll it, but still there should be a way to simplify the code.

@dharjeezy
Copy link
Contributor Author

Hey, this is going in the right direction, but there should be a way to simplify the implementation by not introducing separate async tasks for inbound/outbound substreams and not adding command channels. It should be possible to implement polling of inbound substreams by putting them into FuturesUnordered or, may be, tokio_stream::StreamMap. The latter should be quite easy as there is maximum one inbound substream allowed from any remote peer as per ping spec: https://github.com/libp2p/specs/blob/master/ping/ping.md

tokio_stream::StreamMap might not work because we need to also write to substream and not only poll it, but still there should be a way to simplify the code.

@dmitry-markin can you check my current implementation

Copy link
Collaborator

@dmitry-markin dmitry-markin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey hey, nice use of SplitStream/SplitSink! I think what is missing is a cleaner separation between inbound and outbound substreams. On inbound substreams we should only respond to incoming pings, and on outbound send pings ourself. It would be better than relying on the presence of the time in the map, which can get screwed when we both send pings to a peer and receive pings from it.

@dharjeezy
Copy link
Contributor Author

Hey hey, nice use of SplitStream/SplitSink! I think what is missing is a cleaner separation between inbound and outbound substreams. On inbound substreams we should only respond to incoming pings, and on outbound send pings ourself. It would be better than relying on the presence of the time in the map, which can get screwed when we both send pings to a peer and receive pings from it.

i have done this now @dmitry-markin

@haikoschol
Copy link
Contributor

Out of scope for this PR but I noticed that ping::config::MAX_FAILURES is not used anywhere. Should we create an issue for that?

Copy link
Contributor

@haikoschol haikoschol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I ran the following tests, using a litep2p listener and webrtc transport:

Dialer Listener -> Dialer Pings Dialer -> Listener Pings
libp2p
"minimal smoldot env"

The failing outbound pings with libp2p is an unrelated issue (#494).

/// Pending outbound substreams.
pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Duration)>>>,
/// Streams we read Pings from.
/// Keyed by a local counter to handle multiple streams per peer if necessary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Dmitry mentioned, there should be only one inbound ping substream per peer. Can we simplify this? I guess just using the PeerId like in outbound_sinks?


/// Log target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::ping";
const PING_TIMEOUT: Duration = Duration::from_secs(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const PING_TIMEOUT: Duration = Duration::from_secs(10);

Since it is unused. However, we probably want to have a timeout for receiving pongs. Arguably out of scope for this PR though.


_ = interval.tick() => {
for (peer, sink) in self.outbound_sinks.iter_mut() {
let payload = vec![0u8; 32];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep this TODO comment:

// TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it

Comment on lines +176 to +182
self.ping_times.insert(*peer, Instant::now());
tracing::trace!(target: LOG_TARGET, ?peer, "sending ping");

if let Err(error) = sink.send(Bytes::from(payload)).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send ping");

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.ping_times.insert(*peer, Instant::now());
tracing::trace!(target: LOG_TARGET, ?peer, "sending ping");
if let Err(error) = sink.send(Bytes::from(payload)).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send ping");
}
tracing::trace!(target: LOG_TARGET, ?peer, "sending ping");
if let Err(error) = sink.send(Bytes::from(payload)).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send ping");
} else {
self.ping_times.insert(*peer, Instant::now());
}

}
}

// Handle Outbound Responses (Ping is expected here)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Handle Outbound Responses (Ping is expected here)
// Handle Inbound Pings

Some((id, event)) = self.inbound_streams.next() => {
match event {
Ok(payload) => {
if let Some(sink) = self.inbound_sinks.get_mut(&id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging when there's no sink for a peer ID.

}
}
}
Err(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging the error.

ping: elapsed,
})
.await;
Ok(payload) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(payload) => {
Ok(_payload) => {

//! [`/ipfs/ping/1.0.0`](https://github.com/libp2p/specs/blob/master/ping/ping.md) implementation.
use crate::{
error::{Error, SubstreamError},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
error::{Error, SubstreamError},

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ping: repeat requests periodically

4 participants