Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
Fold, ForEach, Fuse, Inspect, Last, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
TryFold, TryForEach, Unzip, Zip,
};
Expand Down
70 changes: 70 additions & 0 deletions futures-util/src/stream/stream/last.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`last`](super::StreamExt::last) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Last<St: Stream> {
#[pin]
stream: St,
last: Option<St::Item>,
done: bool,
}
}

impl<St> fmt::Debug for Last<St>
where
St: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Last").field("stream", &self.stream).finish()
}
}

impl<St> Last<St>
where
St: Stream,
{
pub(super) fn new(stream: St) -> Self {
Self { stream, last: None, done: false }
}
}

impl<St> FusedFuture for Last<St>
where
St: FusedStream,
{
fn is_terminated(&self) -> bool {
self.done
}
}

impl<St> Future for Last<St>
where
St: Stream,
{
type Output = Option<St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if *this.done {
panic!("Last polled after completion");
}

Poll::Ready(loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(item) => *this.last = Some(item),
None => {
*this.done = true;
break this.last.take();
}
}
})
}
}
29 changes: 29 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub use self::any::Any;
mod all;
pub use self::all::All;

mod last;
pub use self::last::Last;

#[cfg(feature = "sink")]
mod forward;

Expand Down Expand Up @@ -716,6 +719,32 @@ pub trait StreamExt: Stream {
assert_future::<bool, _>(All::new(self, f))
}

/// Returns the last element of the stream, or `None` if the stream is empty.
///
/// This function will consume the entire stream to return the last item.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let number_stream = stream::iter(1..=5);
/// let last_number = number_stream.last().await;
/// assert_eq!(last_number, Some(5));
///
/// let empty_stream = stream::iter(Vec::<i32>::new());
/// let last_number = empty_stream.last().await;
/// assert_eq!(last_number, None);
/// # });
/// ```
fn last(self) -> Last<Self>
where
Self: Sized,
{
assert_future::<Option<Self::Item>, _>(Last::new(self))
}

/// Flattens a stream of streams into just one continuous stream.
///
/// # Examples
Expand Down
18 changes: 18 additions & 0 deletions futures/tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,21 @@ fn any() {
assert!(!any);
});
}

#[test]
fn last() {
block_on(async {
let empty: [u8; 0] = [];
let st = stream::iter(empty);
let last = st.last().await;
assert_eq!(last, None);

let st = stream::iter([1]);
let last = st.last().await;
assert_eq!(last, Some(1));

let st = stream::iter([1, 2, 3, 4, 5]);
let last = st.last().await;
assert_eq!(last, Some(5));
});
}
Loading