diff --git a/src/lib.rs b/src/lib.rs index 31a872a..4d12bb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ use futures::{stream::Map, Stream}; mod fast_forward; mod flatten_switch; +mod merge_round_robin; mod outer_waker; mod sample; @@ -33,6 +34,7 @@ pub mod test_util; pub use fast_forward::*; pub use flatten_switch::*; +pub use merge_round_robin::*; pub use sample::*; #[cfg(feature = "tokio-time")] @@ -172,6 +174,47 @@ pub trait StreamTools: Stream { { RecordDelay::new(self) } + + /// Merge two streams into one, allowing a custom round robin policy + /// + /// The resulting stream emits `nb_self` elements from the first stream, + /// then `nb_other` from the other. When one of the stream finishes, the + /// second is then used. + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use streamies::Streamies as _; + /// + /// let stream1 = stream::iter(vec!["a", "a"]); + /// let stream2 = stream::iter(vec!["b", "b", "b", "b", "c"]); + /// + /// let stream = stream1.merge_round_robin(stream2, 1, 2); + /// + /// let result: Vec<_> = stream.collect().await; + /// assert_eq!(result, vec![ + /// "a", + /// "b", + /// "b", + /// "a", + /// "b", + /// "b", + /// "c" + /// ]); + /// # }); + /// ``` + fn merge_round_robin( + self, + other: St, + nb_self: usize, + nb_other: usize, + ) -> MergeRoundRobin + where + St: Stream, + Self: Sized, + { + MergeRoundRobin::new(self, other, nb_self, nb_other) + } } impl StreamTools for T {} diff --git a/src/merge_round_robin.rs b/src/merge_round_robin.rs new file mode 100644 index 0000000..b076c91 --- /dev/null +++ b/src/merge_round_robin.rs @@ -0,0 +1,150 @@ +use core::num::NonZeroUsize; +use core::pin::Pin; +use core::task::Context; +use core::task::Poll; + +use futures::ready; +use futures::stream::FusedStream; +use futures::Stream; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`merge_round_robin`](crate::Streamies::merge_round_robin) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct MergeRoundRobin { + #[pin] + first: Option, + #[pin] + second: Option, + + first_nb_ele: NonZeroUsize, + second_nb_ele: NonZeroUsize, + + first_count: usize, + second_count: usize + } +} + +impl MergeRoundRobin +where + St1: Stream, + St2: Stream, +{ + pub(super) fn new( + stream1: St1, + stream2: St2, + first_nb_ele: usize, + second_nb_ele: usize, + ) -> Self { + Self { + first: Some(stream1), + second: Some(stream2), + first_nb_ele: NonZeroUsize::new(first_nb_ele).expect( + "Couldn't convert `first_nb_ele` to `NonZeroUsize`. The value must no be 0", + ), + second_nb_ele: NonZeroUsize::new(second_nb_ele).expect( + "Couldn't convert `second_nb_ele` to `NonZeroUsize`. The value must no be 0", + ), + first_count: 0, + second_count: 0, + } + } +} + +impl FusedStream for MergeRoundRobin +where + St1: FusedStream, + St2: FusedStream, +{ + fn is_terminated(&self) -> bool { + self.first.as_ref().is_none_or(|s| s.is_terminated()) + && self.second.as_ref().is_none_or(|s| s.is_terminated()) + } +} + +impl Stream for MergeRoundRobin +where + St1: Stream, + St2: Stream, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + // Check if second stream has finished it's turn + if this.second_count == &mut this.second_nb_ele.get() { + // It finished. Let's reset the turns + *this.first_count = 0; + *this.second_count = 0; + } + + // Check if we should be polling from the first stream. + // This means: + // - It's our turn to be polled AND the se + // - The stream isn't ended + if this.first_count < &mut this.first_nb_ele.get() { + if let Some(first) = this.first.as_mut().as_pin_mut() { + if let Some(item) = ready!(first.poll_next(cx)) { + // We have an item! Increment the count for the next poll + *this.first_count += 1; + return Poll::Ready(Some(item)); + } + + // The stream has finished. Let's dispose of the stream + this.first.set(None); + } else { + // The stream is empty. We can just poll `second` now + return this + .second + .as_mut() + .as_pin_mut() + .map(|second| second.poll_next(cx)) + .unwrap_or_else(|| Poll::Ready(None)); + } + } + + // First stream wasn't polled, so we poll the second stream + if let Some(second) = this.second.as_mut().as_pin_mut() { + if let Some(item) = ready!(second.poll_next(cx)) { + // We have an item! Increment the count for the next poll + *this.second_count += 1; + return Poll::Ready(Some(item)); + } + + // The stream has finished. Let's dispose of the stream + this.second.set(None); + } + + // The second stream is empty. We can just poll `first` now + this.first + .as_mut() + .as_pin_mut() + .map(|first| first.poll_next(cx)) + .unwrap_or_else(|| Poll::Ready(None)) + } + + fn size_hint(&self) -> (usize, Option) { + match &self.first { + Some(first) => match &self.second { + Some(second) => { + let first_size = first.size_hint(); + let second_size = second.size_hint(); + + ( + first_size.0.saturating_add(second_size.0), + match (first_size.1, second_size.1) { + (Some(x), Some(y)) => x.checked_add(y), + _ => None, + }, + ) + } + None => first.size_hint(), + }, + None => match &self.second { + Some(second) => second.size_hint(), + None => (0, Some(0)), + }, + } + } +} \ No newline at end of file