diff --git a/CHANGELOG.md b/CHANGELOG.md index 2402176..37500e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,26 @@ and this project (will try to) adhere to [Semantic Versioning](https://semver.or ### Legend -The following icons are used to distinguish breaking changes from non-breaking changes. +The following icons are used to distinguish breaking changes from non-breaking changes. - 🔥: Breaking change (high impact: will require code changes for most users) - 💔: Breaking change (low impact: won't require code changes for most users) +## 0.7.6 + +### Added + +- Added `merge_join_by` combinator + +### Changed + +- Minimum rust version bumped to `1.85.0` as part of moving to rust 2024 edition + +### Security + +- Optional dependency `tokio` bumped from a version with a [RustSec advisory](https://rustsec.org/advisories/RUSTSEC-2025-0023) + + ## 0.7.5 ### Fixed @@ -38,7 +53,7 @@ The following icons are used to distinguish breaking changes from non-breaking c ## 0.7.0 -### Added +### Added - 💔 Added `ThrottleLast` combinator. - Added "test-util" feature with some helper methods for testing such as `record_delay` and `delay_items`. diff --git a/Cargo.toml b/Cargo.toml index 6079d98..de516b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,23 +1,33 @@ [package] name = "streamtools" -version = "0.7.5" -edition = "2021" +version = "0.7.6" +edition = "2024" authors = ["extremeandy"] license = "MIT OR Apache-2.0" repository = "https://github.com/extremeandy/streamtools" description = "Additional stream combinators" readme = "README.md" -rust-version = "1.65.0" +# Oldest 2024 edition compatible version +rust-version = "1.85.0" [dependencies] futures = "0.3.28" pin-project-lite = "0.2.9" parking_lot = "0.12.1" -tokio = { version = "1.28.0", optional = true, features = ["time"] } +tokio = { version = "1.48.0", optional = true, features = ["time"] } tokio-stream = { version = "0.1.14", optional = true, features = ["time"] } +[dependencies.either-or-both] +version = "0.3.0" +# Disabling the `either` feature which is not needed currently. +default-features = false +features = ["std"] + + [dev-dependencies] -tokio = { version = "1.28.0", features = ["rt-multi-thread", "sync", "macros", "time"] } +quickcheck = "1.0.3" +quickcheck_macros = "1.1.0" +tokio = { version = "1.48.0", features = ["rt-multi-thread", "sync", "macros", "time"] } tokio-stream = { version = "0.1.14", features = ["sync", "time"] } tokio-test = "0.4.2" diff --git a/README.md b/README.md index 625f08c..043dc03 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,8 @@ Please read the [API documentation here](https://docs.rs/streamtools/). How to use with Cargo: -```toml -[dependencies] -streamtools = "0.7.5" +```commandline +$ cargo add streamtools ``` ## License @@ -22,4 +21,4 @@ Licensed under the Apache License, Version 2.0 https://www.apache.org/licenses/LICENSE-2.0 or the MIT license https://opensource.org/licenses/MIT, at your option. This file may not be copied, modified, or distributed -except according to those terms. \ No newline at end of file +except according to those terms. diff --git a/examples/merge_join_by_trystream.rs b/examples/merge_join_by_trystream.rs new file mode 100644 index 0000000..54982c1 --- /dev/null +++ b/examples/merge_join_by_trystream.rs @@ -0,0 +1,36 @@ +//! How to use `StreamTools::merge_join_by()` with a `TryStream`, +//! that is, a `Stream` of `Result`s. + +use either_or_both::EitherOrBoth::{self, Both}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use std::cmp::Ordering; +use streamtools::StreamTools; + +#[tokio::main] +async fn main() { + let left = stream::iter(vec![Ok(1), Ok(3), Err(4), Ok(5)]); + let right = stream::iter(vec![Ok(2), Ok(3), Err(3)]); + + // Instead of using your comparison function directly, you can embed it into a closure that + // gives priority to errors. Then afterwards you can use usual try_* combinators in `futures::stream::TryStreamExt`. + let stream = left + .merge_join_by(right, |left_result, right_result| { + match (left_result, right_result) { + // Use the actual comparison function only if both are Ok. + (Ok(asset), Ok(foreign_asset)) => Ord::cmp(asset, foreign_asset), + // In the error cases return such ordering that the error(s) will be forwarded asap, + // and so we can short circuit faster. + (Err(_), Ok(_)) => Ordering::Less, + (Ok(_), Err(_)) => Ordering::Greater, + (Err(_), Err(_)) => Ordering::Equal, + } + }) + // Flip the results from inside the EitherOrs. + // EitherOrBoth has a convenient method for just this. + .map(EitherOrBoth::, Result<_, _>>::transpose); + + // Run the stream with short circuiting. + let result: Result, EitherOrBoth<_>> = stream.try_collect().await; + + assert_eq!(result, Err(Both(4, 3))); +} diff --git a/src/fast_forward.rs b/src/fast_forward.rs index c4967f7..4358a7a 100644 --- a/src/fast_forward.rs +++ b/src/fast_forward.rs @@ -93,7 +93,7 @@ mod tests { #[tokio::test] async fn test_fast_forward() { let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let (mut tx, rx) = futures::channel::mpsc::unbounded(); @@ -123,7 +123,7 @@ mod tests { #[tokio::test] async fn test_fast_forward_empty_stream() { let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let mut stream = FastForward::new(stream::empty::<()>()); assert_ready_eq!(stream.poll_next_unpin(&mut cx), None); @@ -132,7 +132,7 @@ mod tests { #[tokio::test] async fn test_fast_forward_drop_before_polled() { let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let (mut tx, rx) = futures::channel::mpsc::unbounded(); diff --git a/src/flatten_switch.rs b/src/flatten_switch.rs index 7512a18..fcdc26e 100644 --- a/src/flatten_switch.rs +++ b/src/flatten_switch.rs @@ -147,7 +147,7 @@ mod tests { use tokio_stream::wrappers::BroadcastStream; let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let (tx_inner1, rx_inner1) = broadcast::channel(32); let (tx_inner2, rx_inner2) = broadcast::channel(32); @@ -277,7 +277,7 @@ mod tests { let mut stream = FlattenSwitch::new(outer_stream); let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); assert_ready_eq!(stream.poll_next_unpin(&mut cx), Some(1)); assert_inner_polled(); diff --git a/src/lib.rs b/src/lib.rs index 31a872a..e28dc9a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,11 @@ #![warn(missing_docs)] #![crate_name = "streamtools"] -#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))] //! Additional stream combinators. //! +//! See what's included in [`StreamTools`] trait docs. +//! +//! //! ## Feature flags //! //! - `tokio-time`: Enables combinators which depend on the tokio crate and its time feature, in particular: @@ -12,12 +14,16 @@ //! - `test-util`: Exposes utilities for testing streams, in particular: //! - [`delay_items`](crate::test_util::delay_items) //! - [`record_delay`](crate::StreamTools::record_delay) -#![doc(html_root_url = "https://docs.rs/streamtools/0.7.5/")] +#![doc(html_root_url = "https://docs.rs/streamtools/0.7.6/")] +use either_or_both::EitherOrBoth; use futures::{stream::Map, Stream}; +use merge_join_by::MergeJoinBy; +use std::cmp::Ordering; mod fast_forward; mod flatten_switch; +mod merge_join_by; mod outer_waker; mod sample; @@ -93,6 +99,78 @@ pub trait StreamTools: Stream { assert_stream::(stream) } + /// A stream that merges items from two streams in ascending order, + /// while also preserving information of where the items came from. + /// + /// The resulting stream will look at the tips of the two input streams `L` and `R` + /// and compare the items `l: L::Item` and `r: R::Item` using the provided `comparison` function. + /// The stream will yield: + /// + /// - `EitherOrBoth::Left(l)` if `l < r` or if `R` is done, and remove `l` from its source stream + /// - `EitherOrBoth::Both(l, r)` if `l == r` and remove both `l` and `r` from their source streams + /// - `EitherOrBoth::Right(r)` if `l > r` or if `L` is done and remove `r` from its source stream + /// + /// That is to say it chooses the *smaller* item, or both when they are equal. + /// + /// + /// # Lengths + /// + /// The input streams can be of different length. After one stream has run out, the items of the other + /// will just be appended to the output stream using the appropriate `Left`/`Right` variant. + /// + /// + /// # Sort + /// + /// If the input streams are sorted into ascending order according to the same criteria as provided by `comparison`, + /// then the output stream will be sorted too. + /// + /// + /// # Example + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use streamtools::StreamTools; + /// use futures::stream::{self, StreamExt}; + /// use either_or_both::EitherOrBoth::{Left, Right, Both}; + /// + /// let left = stream::iter(vec![1, 3, 4, 5]); + /// let right = stream::iter(vec![2, 3, 3]); + /// + /// let stream = left.merge_join_by(right, Ord::cmp); + /// + /// let result: Vec<_> = stream.collect().await; + /// + /// assert_eq!(result, + /// vec![ + /// Left(1), + /// Right(2), + /// Both(3, 3), + /// Right(3), // The right stream is exhausted here. + /// Left(4), + /// Left(5) + /// ] + /// ); + /// # }); + /// ``` + /// + /// + /// # See also + /// + /// [`Itertools::merge_join_by`](https://docs.rs/itertools/latest/itertools/trait.Itertools.html#method.merge_join_by) implements the same combinator for iterators. + fn merge_join_by( + self, + other: St, + comparison: F, + ) -> impl Stream> + where + Self: Sized, + St: Stream, + F: Fn(&Self::Item, &St::Item) -> Ordering, + { + let stream = MergeJoinBy::new(self, other, comparison); + assert_stream(stream) + } + /// Samples values from the stream when the sampler yields. /// /// The stream terminates when either the input stream or the sampler stream terminate. diff --git a/src/merge_join_by.rs b/src/merge_join_by.rs new file mode 100644 index 0000000..a57cb0b --- /dev/null +++ b/src/merge_join_by.rs @@ -0,0 +1,212 @@ +//! Implementation of the `StreamTools::merge_join_by` combinator. + +use either_or_both::EitherOrBoth; +use futures::stream::Fuse; +use futures::Stream; +use futures::StreamExt; +use pin_project_lite::pin_project; +use std::{ + cmp::{self, Ordering}, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Stream for [`StreamTools::merge_join_by()`]. + #[must_use = "streams do nothing unless polled"] + pub (crate) struct MergeJoinBy + where + L: Stream, + R: Stream, + // Not allowing any funny business with `FnMut`. + F: Fn(&L::Item, &R::Item) -> Ordering + { + #[pin] + left: Fuse, + #[pin] + right: Fuse, + left_queued: Option, + right_queued: Option, + comparison: F, + } +} + +impl MergeJoinBy +where + L: Stream, + R: Stream, + F: Fn(&L::Item, &R::Item) -> Ordering, +{ + pub(crate) fn new(left_stream: L, right_stream: R, comparison: F) -> Self { + Self { + // Fusing the streams because they might not be the same length. + left: left_stream.fuse(), + right: right_stream.fuse(), + left_queued: None, + right_queued: None, + comparison, + } + } +} + +// Implementation inspired by `futures::Zip` +impl Stream for MergeJoinBy +where + L: Stream, + R: Stream, + // Not allowing any funny business with `FnMut`. + F: Fn(&L::Item, &R::Item) -> Ordering, +{ + type Item = EitherOrBoth; + + // Implementation is inspired by `Zip` in the `futures` crate: + // https://docs.rs/futures-util/0.3.31/src/futures_util/stream/stream/zip.rs.html#71-102 + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + // Poll the streams as necessary. + if this.left_queued.is_none() { + match this.left.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => *this.left_queued = Some(item), + Poll::Ready(None) | Poll::Pending => {} + } + } + if this.right_queued.is_none() { + match this.right.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => *this.right_queued = Some(item), + Poll::Ready(None) | Poll::Pending => {} + } + } + + // The actual logic + if this.left_queued.is_some() && this.right_queued.is_some() { + // This is the non-trivial case where we actually do a comparison. + match (this.comparison)( + this.left_queued.as_ref().unwrap(), + this.right_queued.as_ref().unwrap(), + ) { + Ordering::Less => { + let just_left = EitherOrBoth::Left(this.left_queued.take().unwrap()); + Poll::Ready(Some(just_left)) + } + Ordering::Equal => { + let both = EitherOrBoth::Both( + this.left_queued.take().unwrap(), + this.right_queued.take().unwrap(), + ); + Poll::Ready(Some(both)) + } + Ordering::Greater => { + let just_right = EitherOrBoth::Right(this.right_queued.take().unwrap()); + Poll::Ready(Some(just_right)) + } + } + } else if this.left_queued.is_some() { + if this.right.is_done() { + let just_left = EitherOrBoth::Left(this.left_queued.take().unwrap()); + Poll::Ready(Some(just_left)) + } else { + Poll::Pending // Still waiting for the other one. + } + } else if this.right_queued.is_some() { + if this.left.is_done() { + let just_right = EitherOrBoth::Right(this.right_queued.take().unwrap()); + Poll::Ready(Some(just_right)) + } else { + Poll::Pending // Still waiting for the other one. + } + } + // Both queueds are None from here on. + else if this.left.is_done() && this.right.is_done() { + // All done + Poll::Ready(None) + } else { + // Waiting for both, or only one if the other stream is done already. + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option) { + let left_queued_len = usize::from(self.left_queued.is_some()); + let right_queued_len = usize::from(self.right_queued.is_some()); + + let (left_lower, left_upper) = self.left.size_hint(); + let (right_lower, right_upper) = self.right.size_hint(); + + let left_total_lower = left_lower.saturating_add(left_queued_len); + let right_total_lower = right_lower.saturating_add(right_queued_len); + + // Achieved when all the comparisons turn out equal and we just yield Boths. + let lower = cmp::max(left_total_lower, right_total_lower); + + let upper = match (left_upper, right_upper) { + (Some(l_upper), Some(r_upper)) => { + // The upper limit is reached when none of the comparisons turn out equal. + // I.e. we never yield Both. + // The limit is then just the sum of upper limits, + // with the possibility of overflowing usize. + l_upper + .checked_add(left_queued_len) + .and_then(|left_total_upper| left_total_upper.checked_add(r_upper)) + .and_then(|all_but_right_queue| { + all_but_right_queue.checked_add(right_queued_len) + }) + } + // Either or both of the limits are unknown, + // thus can't compute their sums either. + _ => None, + }; + + (lower, upper) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::StreamTools; + use futures::{executor::block_on_stream, stream}; + use quickcheck::TestResult; + use quickcheck_macros::quickcheck; + use std::collections::BTreeSet; + + #[quickcheck] + fn merge_of_sorteds_is_sorted(left: BTreeSet, right: BTreeSet) -> TestResult { + // BtreeSets are conveniently already sorted. + // They don't contain duplicates though so that's not covered. + let left_stream = stream::iter(left); + let right_stream = stream::iter(right); + + let stream = left_stream.merge_join_by(right_stream, Ord::cmp); + + let sorted = block_on_stream(stream) + // Remove the EitherOrs for sortedness check. + .flat_map(|either_or_both| { + either_or_both.into_iter() // Rather convenient I must admit. + }) + .is_sorted(); + + TestResult::from_bool(sorted) + } + + #[quickcheck] + fn size_hints_dont_lie(left: Vec, right: Vec) -> bool { + let expected_lower = cmp::max(left.len(), right.len()); + let expected_upper = left.len().checked_add(right.len()); // Sum + + let left_stream = stream::iter(left); + let right_stream = stream::iter(right); + + let stream = left_stream.merge_join_by(right_stream, Ord::cmp); + + let (lower, upper) = stream.size_hint(); + + assert_eq!(expected_lower, lower); + assert_eq!(expected_upper, upper); + + let actual_size = block_on_stream(stream).count(); + + lower <= actual_size && upper.is_none_or(|limit| actual_size <= limit) + } +} diff --git a/src/sample.rs b/src/sample.rs index ad1f4fe..212885f 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -144,7 +144,7 @@ mod tests { #[tokio::test] async fn test_sample() { let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let (mut tx, rx) = futures::channel::mpsc::unbounded(); let (mut tx_sampler, rx_sampler) = futures::channel::mpsc::unbounded(); @@ -181,7 +181,7 @@ mod tests { #[tokio::test] async fn test_sample_underlying_terminates() { let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let (mut tx, rx) = futures::channel::mpsc::unbounded(); let (mut tx_sampler, rx_sampler) = futures::channel::mpsc::unbounded(); @@ -204,7 +204,7 @@ mod tests { #[tokio::test] async fn test_sample_underlying_terminates_but_sample_yields() { let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); + let mut cx = std::task::Context::from_waker(waker); let (mut tx_sampler, rx_sampler) = futures::channel::mpsc::unbounded();