diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad22..d2885ac87 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, - Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + Fold, ForEach, Fuse, Inspect, Last, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, TryForEach, Unzip, Zip, }; diff --git a/futures-util/src/stream/stream/last.rs b/futures-util/src/stream/stream/last.rs new file mode 100644 index 000000000..2473ae3ee --- /dev/null +++ b/futures-util/src/stream/stream/last.rs @@ -0,0 +1,70 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`last`](super::StreamExt::last) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Last { + #[pin] + stream: St, + last: Option, + done: bool, + } +} + +impl fmt::Debug for Last +where + St: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Last").field("stream", &self.stream).finish() + } +} + +impl Last +where + St: Stream, +{ + pub(super) fn new(stream: St) -> Self { + Self { stream, last: None, done: false } + } +} + +impl FusedFuture for Last +where + St: FusedStream, +{ + fn is_terminated(&self) -> bool { + self.done + } +} + +impl Future for Last +where + St: Stream, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + if *this.done { + panic!("Last polled after completion"); + } + + Poll::Ready(loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => *this.last = Some(item), + None => { + *this.done = true; + break this.last.take(); + } + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6..5b5df2c5a 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -68,6 +68,9 @@ pub use self::any::Any; mod all; pub use self::all::All; +mod last; +pub use self::last::Last; + #[cfg(feature = "sink")] mod forward; @@ -716,6 +719,32 @@ pub trait StreamExt: Stream { assert_future::(All::new(self, f)) } + /// Returns the last element of the stream, or `None` if the stream is empty. + /// + /// This function will consume the entire stream to return the last item. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(1..=5); + /// let last_number = number_stream.last().await; + /// assert_eq!(last_number, Some(5)); + /// + /// let empty_stream = stream::iter(Vec::::new()); + /// let last_number = empty_stream.last().await; + /// assert_eq!(last_number, None); + /// # }); + /// ``` + fn last(self) -> Last + where + Self: Sized, + { + assert_future::, _>(Last::new(self)) + } + /// Flattens a stream of streams into just one continuous stream. /// /// # Examples diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index db74b9658..6008fe1eb 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -592,3 +592,21 @@ fn any() { assert!(!any); }); } + +#[test] +fn last() { + block_on(async { + let empty: [u8; 0] = []; + let st = stream::iter(empty); + let last = st.last().await; + assert_eq!(last, None); + + let st = stream::iter([1]); + let last = st.last().await; + assert_eq!(last, Some(1)); + + let st = stream::iter([1, 2, 3, 4, 5]); + let last = st.last().await; + assert_eq!(last, Some(5)); + }); +}