Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::{stream::Map, Stream};

mod fast_forward;
mod flatten_switch;
mod merge_round_robin;
mod outer_waker;
mod sample;

Expand All @@ -33,6 +34,7 @@ pub mod test_util;

pub use fast_forward::*;
pub use flatten_switch::*;
pub use merge_round_robin::*;
pub use sample::*;

#[cfg(feature = "tokio-time")]
Expand Down Expand Up @@ -172,6 +174,47 @@ pub trait StreamTools: Stream {
{
RecordDelay::new(self)
}

/// Merge two streams into one, allowing a custom round robin policy
///
/// The resulting stream emits `nb_self` elements from the first stream,
/// then `nb_other` from the other. When one of the stream finishes, the
/// second is then used.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a comment explaining that nb_self and nb_other must be non-zero or the method will panic?

///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use streamies::Streamies as _;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// use streamies::Streamies as _;
/// use streamtools::StreamTools as _;

///
/// let stream1 = stream::iter(vec!["a", "a"]);
/// let stream2 = stream::iter(vec!["b", "b", "b", "b", "c"]);
///
/// let stream = stream1.merge_round_robin(stream2, 1, 2);
///
/// let result: Vec<_> = stream.collect().await;
/// assert_eq!(result, vec![
/// "a",
/// "b",
/// "b",
/// "a",
/// "b",
/// "b",
/// "c"
/// ]);
/// # });
/// ```
fn merge_round_robin<St>(
self,
other: St,
nb_self: usize,
nb_other: usize,
) -> MergeRoundRobin<Self, St>
where
St: Stream<Item = Self::Item>,
Self: Sized,
{
MergeRoundRobin::new(self, other, nb_self, nb_other)
}
}

impl<T: Stream> StreamTools for T {}
Expand Down
150 changes: 150 additions & 0 deletions src/merge_round_robin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::task::Context;
use core::task::Poll;

use futures::ready;
use futures::stream::FusedStream;
use futures::Stream;
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`merge_round_robin`](crate::Streamies::merge_round_robin) method.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Stream for the [`merge_round_robin`](crate::Streamies::merge_round_robin) method.
/// Stream for the [`merge_round_robin`](crate::StreamTools::merge_round_robin) method.

#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct MergeRoundRobin<St1, St2> {
#[pin]
first: Option<St1>,
#[pin]
second: Option<St2>,

first_nb_ele: NonZeroUsize,
second_nb_ele: NonZeroUsize,

first_count: usize,
second_count: usize
}
}

impl<St1, St2> MergeRoundRobin<St1, St2>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
pub(super) fn new(
stream1: St1,
stream2: St2,
first_nb_ele: usize,
second_nb_ele: usize,
) -> Self {
Self {
first: Some(stream1),
second: Some(stream2),
first_nb_ele: NonZeroUsize::new(first_nb_ele).expect(
"Couldn't convert `first_nb_ele` to `NonZeroUsize`. The value must no be 0",
),
second_nb_ele: NonZeroUsize::new(second_nb_ele).expect(
"Couldn't convert `second_nb_ele` to `NonZeroUsize`. The value must no be 0",
Comment on lines +44 to +47
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Couldn't convert `first_nb_ele` to `NonZeroUsize`. The value must no be 0",
),
second_nb_ele: NonZeroUsize::new(second_nb_ele).expect(
"Couldn't convert `second_nb_ele` to `NonZeroUsize`. The value must no be 0",
"Couldn't convert `first_nb_ele` to `NonZeroUsize`. The value must not be 0",
),
second_nb_ele: NonZeroUsize::new(second_nb_ele).expect(
"Couldn't convert `second_nb_ele` to `NonZeroUsize`. The value must not be 0",

),
first_count: 0,
second_count: 0,
}
}
}

impl<St1, St2> FusedStream for MergeRoundRobin<St1, St2>
where
St1: FusedStream,
St2: FusedStream<Item = St1::Item>,
{
fn is_terminated(&self) -> bool {
self.first.as_ref().is_none_or(|s| s.is_terminated())
&& self.second.as_ref().is_none_or(|s| s.is_terminated())
}
}

impl<St1, St2> Stream for MergeRoundRobin<St1, St2>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
type Item = St1::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
// Check if second stream has finished it's turn
if this.second_count == &mut this.second_nb_ele.get() {
// It finished. Let's reset the turns
*this.first_count = 0;
*this.second_count = 0;
}

// Check if we should be polling from the first stream.
// This means:
// - It's our turn to be polled AND the se
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete comment

// - The stream isn't ended
if this.first_count < &mut this.first_nb_ele.get() {
if let Some(first) = this.first.as_mut().as_pin_mut() {
if let Some(item) = ready!(first.poll_next(cx)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this behaviour intentional - to wait for the first stream to yield, even if there are items available on the second stream?

My intuition would have been to fall back to the other stream if the first is pending, but I guess it depends on the use case.

Could possibly make this parameterisable or have a different variant which has the fallback behaviour.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RustyNova016 any thoughts on this? I think it'd be nice to have two variants: merge_round_robin and merge_round_robin_strict.

// We have an item! Increment the count for the next poll
*this.first_count += 1;
return Poll::Ready(Some(item));
}

// The stream has finished. Let's dispose of the stream
this.first.set(None);
} else {
// The stream is empty. We can just poll `second` now
return this
.second
.as_mut()
.as_pin_mut()
.map(|second| second.poll_next(cx))
.unwrap_or_else(|| Poll::Ready(None));
}
}

// First stream wasn't polled, so we poll the second stream
if let Some(second) = this.second.as_mut().as_pin_mut() {
if let Some(item) = ready!(second.poll_next(cx)) {
// We have an item! Increment the count for the next poll
*this.second_count += 1;
return Poll::Ready(Some(item));
}

// The stream has finished. Let's dispose of the stream
this.second.set(None);
}

// The second stream is empty. We can just poll `first` now
this.first
.as_mut()
.as_pin_mut()
.map(|first| first.poll_next(cx))
.unwrap_or_else(|| Poll::Ready(None))
}

fn size_hint(&self) -> (usize, Option<usize>) {
match &self.first {
Some(first) => match &self.second {
Some(second) => {
let first_size = first.size_hint();
let second_size = second.size_hint();

(
first_size.0.saturating_add(second_size.0),
match (first_size.1, second_size.1) {
(Some(x), Some(y)) => x.checked_add(y),
_ => None,
},
)
}
None => first.size_hint(),
},
None => match &self.second {
Some(second) => second.size_hint(),
None => (0, Some(0)),
},
}
}
}