Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,26 @@ and this project (will try to) adhere to [Semantic Versioning](https://semver.or

### Legend

The following icons are used to distinguish breaking changes from non-breaking changes.
The following icons are used to distinguish breaking changes from non-breaking changes.

- 🔥: Breaking change (high impact: will require code changes for most users)
- 💔: Breaking change (low impact: won't require code changes for most users)

## 0.7.6

### Added

- Added `merge_join_by` combinator

### Changed

- Minimum rust version bumped to `1.85.0` as part of moving to rust 2024 edition

### Security

- Optional dependency `tokio` bumped from a version with a [RustSec advisory](https://rustsec.org/advisories/RUSTSEC-2025-0023)


## 0.7.5

### Fixed
Expand All @@ -38,7 +53,7 @@ The following icons are used to distinguish breaking changes from non-breaking c

## 0.7.0

### Added
### Added

- 💔 Added `ThrottleLast` combinator.
- Added "test-util" feature with some helper methods for testing such as `record_delay` and `delay_items`.
Expand Down
20 changes: 15 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
[package]
name = "streamtools"
version = "0.7.5"
edition = "2021"
version = "0.7.6"
edition = "2024"
authors = ["extremeandy"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/extremeandy/streamtools"
description = "Additional stream combinators"
readme = "README.md"
rust-version = "1.65.0"
# Oldest 2024 edition compatible version
rust-version = "1.85.0"

[dependencies]
futures = "0.3.28"
pin-project-lite = "0.2.9"
parking_lot = "0.12.1"
tokio = { version = "1.28.0", optional = true, features = ["time"] }
tokio = { version = "1.48.0", optional = true, features = ["time"] }
tokio-stream = { version = "0.1.14", optional = true, features = ["time"] }

[dependencies.either-or-both]
version = "0.3.0"
# Disabling the `either` feature which is not needed currently.
default-features = false
features = ["std"]


[dev-dependencies]
tokio = { version = "1.28.0", features = ["rt-multi-thread", "sync", "macros", "time"] }
quickcheck = "1.0.3"
quickcheck_macros = "1.1.0"
tokio = { version = "1.48.0", features = ["rt-multi-thread", "sync", "macros", "time"] }
tokio-stream = { version = "0.1.14", features = ["sync", "time"] }
tokio-test = "0.4.2"

Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ Please read the [API documentation here](https://docs.rs/streamtools/).

How to use with Cargo:

```toml
[dependencies]
streamtools = "0.7.5"
```commandline
$ cargo add streamtools
```

## License
Expand All @@ -22,4 +21,4 @@ Licensed under the Apache License, Version 2.0
https://www.apache.org/licenses/LICENSE-2.0 or the MIT license
https://opensource.org/licenses/MIT, at your
option. This file may not be copied, modified, or distributed
except according to those terms.
except according to those terms.
36 changes: 36 additions & 0 deletions examples/merge_join_by_trystream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//! How to use `StreamTools::merge_join_by()` with a `TryStream`,
//! that is, a `Stream` of `Result`s.

use either_or_both::EitherOrBoth::{self, Both};
use futures::stream::{self, StreamExt, TryStreamExt};
use std::cmp::Ordering;
use streamtools::StreamTools;

#[tokio::main]
async fn main() {
let left = stream::iter(vec![Ok(1), Ok(3), Err(4), Ok(5)]);
let right = stream::iter(vec![Ok(2), Ok(3), Err(3)]);

// Instead of using your comparison function directly, you can embed it into a closure that
// gives priority to errors. Then afterwards you can use usual try_* combinators in `futures::stream::TryStreamExt`.
let stream = left
.merge_join_by(right, |left_result, right_result| {
match (left_result, right_result) {
// Use the actual comparison function only if both are Ok.
(Ok(asset), Ok(foreign_asset)) => Ord::cmp(asset, foreign_asset),
// In the error cases return such ordering that the error(s) will be forwarded asap,
// and so we can short circuit faster.
(Err(_), Ok(_)) => Ordering::Less,
(Ok(_), Err(_)) => Ordering::Greater,
(Err(_), Err(_)) => Ordering::Equal,
}
})
// Flip the results from inside the EitherOrs.
// EitherOrBoth has a convenient method for just this.
.map(EitherOrBoth::<Result<_, _>, Result<_, _>>::transpose);

// Run the stream with short circuiting.
let result: Result<Vec<_>, EitherOrBoth<_>> = stream.try_collect().await;

assert_eq!(result, Err(Both(4, 3)));
}
6 changes: 3 additions & 3 deletions src/fast_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ mod tests {
#[tokio::test]
async fn test_fast_forward() {
let waker = futures::task::noop_waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut cx = std::task::Context::from_waker(waker);

let (mut tx, rx) = futures::channel::mpsc::unbounded();

Expand Down Expand Up @@ -123,7 +123,7 @@ mod tests {
#[tokio::test]
async fn test_fast_forward_empty_stream() {
let waker = futures::task::noop_waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut cx = std::task::Context::from_waker(waker);

let mut stream = FastForward::new(stream::empty::<()>());
assert_ready_eq!(stream.poll_next_unpin(&mut cx), None);
Expand All @@ -132,7 +132,7 @@ mod tests {
#[tokio::test]
async fn test_fast_forward_drop_before_polled() {
let waker = futures::task::noop_waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut cx = std::task::Context::from_waker(waker);

let (mut tx, rx) = futures::channel::mpsc::unbounded();

Expand Down
4 changes: 2 additions & 2 deletions src/flatten_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ mod tests {
use tokio_stream::wrappers::BroadcastStream;

let waker = futures::task::noop_waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut cx = std::task::Context::from_waker(waker);

let (tx_inner1, rx_inner1) = broadcast::channel(32);
let (tx_inner2, rx_inner2) = broadcast::channel(32);
Expand Down Expand Up @@ -277,7 +277,7 @@ mod tests {
let mut stream = FlattenSwitch::new(outer_stream);

let waker = futures::task::noop_waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut cx = std::task::Context::from_waker(waker);

assert_ready_eq!(stream.poll_next_unpin(&mut cx), Some(1));
assert_inner_polled();
Expand Down
82 changes: 80 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#![warn(missing_docs)]
#![crate_name = "streamtools"]
#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]

//! Additional stream combinators.
//!
//! See what's included in [`StreamTools`] trait docs.
//!
//!
//! ## Feature flags
//!
//! - `tokio-time`: Enables combinators which depend on the tokio crate and its time feature, in particular:
Expand All @@ -12,12 +14,16 @@
//! - `test-util`: Exposes utilities for testing streams, in particular:
//! - [`delay_items`](crate::test_util::delay_items)
//! - [`record_delay`](crate::StreamTools::record_delay)
#![doc(html_root_url = "https://docs.rs/streamtools/0.7.5/")]
#![doc(html_root_url = "https://docs.rs/streamtools/0.7.6/")]

use either_or_both::EitherOrBoth;
use futures::{stream::Map, Stream};
use merge_join_by::MergeJoinBy;
use std::cmp::Ordering;

mod fast_forward;
mod flatten_switch;
mod merge_join_by;
mod outer_waker;
mod sample;

Expand Down Expand Up @@ -93,6 +99,78 @@ pub trait StreamTools: Stream {
assert_stream::<U::Item, _>(stream)
}

/// A stream that merges items from two streams in ascending order,
/// while also preserving information of where the items came from.
///
/// The resulting stream will look at the tips of the two input streams `L` and `R`
/// and compare the items `l: L::Item` and `r: R::Item` using the provided `comparison` function.
/// The stream will yield:
///
/// - `EitherOrBoth::Left(l)` if `l < r` or if `R` is done, and remove `l` from its source stream
/// - `EitherOrBoth::Both(l, r)` if `l == r` and remove both `l` and `r` from their source streams
/// - `EitherOrBoth::Right(r)` if `l > r` or if `L` is done and remove `r` from its source stream
///
/// That is to say it chooses the *smaller* item, or both when they are equal.
///
///
/// # Lengths
///
/// The input streams can be of different length. After one stream has run out, the items of the other
/// will just be appended to the output stream using the appropriate `Left`/`Right` variant.
///
///
/// # Sort
///
/// If the input streams are sorted into ascending order according to the same criteria as provided by `comparison`,
/// then the output stream will be sorted too.
///
///
/// # Example
///
/// ```rust
/// # futures::executor::block_on(async {
/// use streamtools::StreamTools;
/// use futures::stream::{self, StreamExt};
/// use either_or_both::EitherOrBoth::{Left, Right, Both};
///
/// let left = stream::iter(vec![1, 3, 4, 5]);
/// let right = stream::iter(vec![2, 3, 3]);
///
/// let stream = left.merge_join_by(right, Ord::cmp);
///
/// let result: Vec<_> = stream.collect().await;
///
/// assert_eq!(result,
/// vec![
/// Left(1),
/// Right(2),
/// Both(3, 3),
/// Right(3), // The right stream is exhausted here.
/// Left(4),
/// Left(5)
/// ]
/// );
/// # });
/// ```
///
///
/// # See also
///
/// [`Itertools::merge_join_by`](https://docs.rs/itertools/latest/itertools/trait.Itertools.html#method.merge_join_by) implements the same combinator for iterators.
fn merge_join_by<St, F>(
self,
other: St,
comparison: F,
) -> impl Stream<Item = EitherOrBoth<Self::Item, St::Item>>
where
Self: Sized,
St: Stream,
F: Fn(&Self::Item, &St::Item) -> Ordering,
{
let stream = MergeJoinBy::new(self, other, comparison);
assert_stream(stream)
}

/// Samples values from the stream when the sampler yields.
///
/// The stream terminates when either the input stream or the sampler stream terminate.
Expand Down
Loading