From 5527bfda9d1241eee11d945fa25c2356ae8c84d5 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Sun, 3 Mar 2024 18:27:08 -0500 Subject: [PATCH 1/2] feat: add 'respect eof' option for reading from bounded sources --- src/lib.rs | 62 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 19 deletions(-) mode change 100644 => 100755 src/lib.rs diff --git a/src/lib.rs b/src/lib.rs old mode 100644 new mode 100755 index fec8ae6..19cd042 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,14 +28,14 @@ //! } //! } //! ``` -use std::collections::VecDeque; -use std::ops::Deref; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; use futures::Stream; -use serde::de::DeserializeOwned; use pin_project::pin_project; +use serde::de::DeserializeOwned; use serde_json::Deserializer; +use std::collections::VecDeque; +use std::ops::Deref; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; use tracing::trace; // should be 2^n - 1 for VecDeque to work efficiently @@ -60,6 +60,7 @@ pub struct JsonStream { byte_buffer: VecDeque, finished: bool, max_buffer_capacity: usize, + respect_eof: bool, } impl JsonStream { @@ -85,12 +86,26 @@ impl JsonStream { Self { stream, entry_buffer: Vec::new(), - byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)), + byte_buffer: VecDeque::with_capacity(std::cmp::min( + DEFAULT_BUFFER_CAPACITY, + max_capacity, + )), finished: false, - max_buffer_capacity: max_capacity + max_buffer_capacity: max_capacity, + respect_eof: false, } } + /// Toggles a mode where the stream will "complete" when the + /// buffer is empty, there are no processed messages, and the + /// underlying reader returns an EOF error. All other error + /// messages, either from the parsing (because there might be a + /// partial json object,) or from the IO reader (because more data + /// may arrive or recover eventually,) are ignored. + pub fn set_respect_eof(&mut self) { + self.respect_eof = true + } + /// Controls how large the internal buffer can grow in bytes. If the buffer grows larger than this /// the stream is terminated as it is assumed that the stream is malformed. If this number is too /// large, a malformed stream can cause the server to run out of memory. @@ -115,10 +130,10 @@ impl JsonStream { } impl Stream for JsonStream - where - T: DeserializeOwned, - B: Deref, - S: Stream> + Unpin +where + T: DeserializeOwned, + B: Deref, + S: Stream> + Unpin, { type Item = Result; @@ -143,7 +158,7 @@ impl Stream for JsonStream Some(Err(err)) => { self.finish(); return Poll::Ready(Some(Err(err))); - }, + } None => { self.finish(); return Poll::Ready(None); @@ -157,7 +172,7 @@ impl Stream for JsonStream // no room for this chunk self.finish(); return Poll::Ready(None); - }, + } None => { // overflow occurred self.finish(); @@ -182,14 +197,23 @@ impl Stream for JsonStream Some(Ok(entry)) => { last_read_pos = json_iter.byte_offset(); this.entry_buffer.push(entry); - }, + } // if there was an error, log it but move on because this could be a partial entry Some(Err(err)) => { - trace!(err = ?err, "failed to parse json entry"); - break - }, + if self.respect_eof + && err.is_eof() + && this.entry_buffer.is_empty() + && buffer.is_empty() + { + self.finish(); + return Poll::Ready(Some(Err(err))); + } else { + trace!(err = ?err, "failed to parse json entry"); + break; + }; + } // nothing left then we move on - None => break + None => break, } } @@ -201,4 +225,4 @@ impl Stream for JsonStream this.byte_buffer.make_contiguous(); } } -} \ No newline at end of file +} From 9dcbb7e89860d84a85dddf076d50d544e7e31a5f Mon Sep 17 00:00:00 2001 From: tycho garen Date: Sun, 3 Mar 2024 20:20:54 -0500 Subject: [PATCH 2/2] fixup --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 19cd042..0bd1446 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -200,13 +200,13 @@ where } // if there was an error, log it but move on because this could be a partial entry Some(Err(err)) => { - if self.respect_eof + if *this.respect_eof && err.is_eof() && this.entry_buffer.is_empty() && buffer.is_empty() { self.finish(); - return Poll::Ready(Some(Err(err))); + return Poll::Ready(None); } else { trace!(err = ?err, "failed to parse json entry"); break;