-
Notifications
You must be signed in to change notification settings - Fork 43
Pipeline Operations #110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Pipeline Operations #110
Conversation
Pipelining should make communication much more efficient for large numbers of sets and deletes.
…p the protocol in sync.
…ulti and leave a "gets" alias. All the _multi methods accept IntoIterator<Item=(AsRef...)>, making them compatible with many kinds of collections.
When a line spanned multiple read packets, CappedLineReader would put the line back together. Line reassembly had two bugs: 1. After assembling the line and passing it to the caller, read_line() should have called self.consume(filled + n), but it only called self.consume(n), causing part of the line to be read again later. 2. If a \r\n sequence happened to straddle two packets, read_line() would fail to notice the end of line. If packets contained only a single byte at a time, read_line() would never notice any end of line at all. Fixed and added tests.
Codecov Report
@@ Coverage Diff @@
## master #110 +/- ##
=========================================
Coverage ? 53.67%
=========================================
Files ? 10
Lines ? 857
Branches ? 0
=========================================
Hits ? 460
Misses ? 397
Partials ? 0
Continue to review full report at Codecov.
|
|
I also fixed 2 ASCII protocol bugs that were accidentally exposed by this new code. I'm not sure what CommitCheck is or why it doesn't like my commit messages. ;-) |
|
Sorry the commit check's style not adding to docs yes, for now please see #106 (comment) . |
|
Is the |
|
Yes, |
|
Hi @hathawsh I think the So I think we can replace |
|
So you prefer the names |
|
I think gets is good because we had it already and this change dose not make API level breaking change. |
|
Sounds good. I changed the names. |
letmutx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, thanks for the PR! I have suggested some changes. Let me know what you think!
|
Great feedback! I will try out all of these suggestions. |
- In ascii::gets, instead of building a string, use BufWriter. Also remove unnecessary allocation. There is some necessary allocation due to the signature of the method. We want to maintain the signature. - Don't add unnecessary indentation just to drop variables. Instead, use drop() or avoid creating variables that need to be dropped. - Reworked the way we collect pipelined responses from the server. Now there's a simple ``final_result`` var and we update that var for each result. - Handle recoverable errors differently from unrecoverable errors. Exit fast on most errors, since they're unrecoverable, but collect all responses on CommandError, since CommandError means we got a full response.
…gets can use a slice: &[K]. This reduces the need to copy Strings.
| check_key_len(k.as_ref())?; | ||
| } | ||
|
|
||
| let mut writer = BufWriter::new(self.reader.get_mut()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I wasn't clear, I meant that we should put the BufWriter in the AsciiProtocol struct. Use try_clone method on the streams and wrap them in BufReader and BufWriter.
struct AsciiProtocol<C> {
reader: CappedLineReader<C>,
writer: BufWriter<C>
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried something like that, but I discovered we can't use BufWriter for any code that writes values to the stream because the values are of the type ToMemcacheValue<Stream>, which means the write_to(W) methods all expect W to be a Stream, not a BufWriter. At some point we might want to think about a less rigid design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is because the impl is for AsciiProtocol<Stream>. If you make it impl<C: Read + Write> AsciiProtocol<C> and change all the ToMemcacheValue to ToMemcacheValue<C>, I think you should be good to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that work? The public interface (Client) is littered with ToMemcacheValue<Stream>. Doesn't that mean we're stuck with using ToMemcacheValue<Stream> values unless we change the public interface? If we want to do that, I would prefer to start a new PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, it just occurred to me that we could probably squeeze BufWriter (and maybe BufReader?) inside the Stream enum, thus making everything use buffers without changing the public interface. That would be a new PR for sure though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't that mean we're stuck with using ToMemcacheValue values unless we change the public interface?
No. AsciiProtocol works for all C where Read + Write, just that it's implemented for Stream. We will generalize it. I was not sure whether this would work, so tried it out myself. The following seems to build. Can you run travis with this if that's okay?
diff --git a/src/protocol/ascii.rs b/src/protocol/ascii.rs
index 1b81ec4..be28580 100644
--- a/src/protocol/ascii.rs
+++ b/src/protocol/ascii.rs
@@ -6,7 +6,6 @@ use super::check_key_len;
use client::Stats;
use error::{ClientError, CommandError, MemcacheError, ServerError};
use std::borrow::Cow;
-use stream::Stream;
use value::{FromMemcacheValueExt, ToMemcacheValue};
#[derive(Default)]
@@ -136,26 +135,50 @@ impl<C: Read> CappedLineReader<C> {
}
}
+pub struct MyBufWriter<C: Read + Write>(BufWriter<C>);
+
+impl<C: Read + Write> MyBufWriter<C> {
+ fn get_mut(&mut self) -> &mut C {
+ self.0.get_mut()
+ }
+}
+
+impl<C: Read + Write> Read for MyBufWriter<C> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ self.0.get_mut().read(buf)
+ }
+}
+
+impl<C: Read + Write> Write for MyBufWriter<C> {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ self.0.write(buf)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.0.flush()
+ }
+}
+
pub struct AsciiProtocol<C: Read + Write + Sized> {
- reader: CappedLineReader<C>,
+ stream: CappedLineReader<MyBufWriter<C>>,
}
-impl AsciiProtocol<Stream> {
- pub(crate) fn new(stream: Stream) -> Self {
+impl<C: Read + Write> AsciiProtocol<C> {
+ pub(crate) fn new(stream: C) -> Self {
Self {
- reader: CappedLineReader::new(stream),
+ stream: CappedLineReader::new(MyBufWriter(BufWriter::with_capacity(4096, stream))),
}
}
- pub(crate) fn stream(&mut self) -> &mut Stream {
- self.reader.get_mut()
+ pub(crate) fn stream(&mut self) -> &mut C {
+ self.stream.get_mut().get_mut()
}
pub(super) fn auth(&mut self, username: &str, password: &str) -> Result<(), MemcacheError> {
return self.set("auth", format!("{} {}", username, password), 0);
}
- fn store<V: ToMemcacheValue<Stream>>(
+ fn store<V: ToMemcacheValue<C>>(
&mut self,
command: StoreCommand,
key: &str,
@@ -165,7 +188,7 @@ impl AsciiProtocol<Stream> {
Ok(self.stores(command, Some((key, value)), options)?)
}
- fn stores<V: ToMemcacheValue<Stream>, K: AsRef<str>, I: IntoIterator<Item = (K, V)>>(
+ fn stores<V: ToMemcacheValue<C>, K: AsRef<str>, I: IntoIterator<Item = (K, V)>>(
&mut self,
command: StoreCommand,
entries: I,
@@ -187,7 +210,7 @@ impl AsciiProtocol<Stream> {
check_key_len(key)?;
if options.cas.is_some() {
write!(
- self.reader.get_mut(),
+ self.stream.get_mut(),
"{command} {key} {flags} {exptime} {vlen} {cas}{noreply}\r\n",
command = command,
key = key,
@@ -199,7 +222,7 @@ impl AsciiProtocol<Stream> {
)?;
} else {
write!(
- self.reader.get_mut(),
+ self.stream.get_mut(),
"{command} {key} {flags} {exptime} {vlen}{noreply}\r\n",
command = command,
key = key,
@@ -210,13 +233,13 @@ impl AsciiProtocol<Stream> {
)?;
}
- value.write_to(self.reader.get_mut())?;
- self.reader.get_mut().write_all(b"\r\n")?;
+ value.write_to(self.stream.get_mut().get_mut())?;
+ self.stream.get_mut().write_all(b"\r\n")?;
sent_count += 1;
}
// Flush now that all the requests have been written.
- self.reader.get_mut().flush()?;
+ self.stream.get_mut().flush()?;
if options.noreply {
return Ok(true);
@@ -230,7 +253,7 @@ impl AsciiProtocol<Stream> {
let mut final_result = Ok(true);
for _ in 0..sent_count {
- let one_result = self.reader.read_line(|response| {
+ let one_result = self.stream.read_line(|response| {
let response = MemcacheError::try_from(response)?;
match response {
"STORED\r\n" => Ok(true),
@@ -261,9 +284,9 @@ impl AsciiProtocol<Stream> {
}
pub(super) fn version(&mut self) -> Result<String, MemcacheError> {
- self.reader.get_mut().write_all(b"version\r\n")?;
- self.reader.get_mut().flush()?;
- self.reader.read_line(|response| {
+ self.stream.get_mut().write_all(b"version\r\n")?;
+ self.stream.get_mut().flush()?;
+ self.stream.read_line(|response| {
let response = MemcacheError::try_from(response)?;
if !response.starts_with("VERSION") {
Err(ServerError::BadResponse(Cow::Owned(response.into())))?
@@ -274,7 +297,7 @@ impl AsciiProtocol<Stream> {
}
fn parse_ok_response(&mut self) -> Result<(), MemcacheError> {
- self.reader.read_line(|response| {
+ self.stream.read_line(|response| {
let response = MemcacheError::try_from(response)?;
if response == "OK\r\n" {
Ok(())
@@ -285,18 +308,18 @@ impl AsciiProtocol<Stream> {
}
pub(super) fn flush(&mut self) -> Result<(), MemcacheError> {
- write!(self.reader.get_mut(), "flush_all\r\n")?;
+ write!(self.stream.get_mut(), "flush_all\r\n")?;
self.parse_ok_response()
}
pub(super) fn flush_with_delay(&mut self, delay: u32) -> Result<(), MemcacheError> {
- write!(self.reader.get_mut(), "flush_all {}\r\n", delay)?;
- self.reader.get_mut().flush()?;
+ write!(self.stream.get_mut(), "flush_all {}\r\n", delay)?;
+ self.stream.get_mut().flush()?;
self.parse_ok_response()
}
pub(super) fn get<V: FromMemcacheValueExt>(&mut self, key: &str) -> Result<Option<V>, MemcacheError> {
- write!(self.reader.get_mut(), "get {}\r\n", key)?;
+ write!(self.stream.get_mut(), "get {}\r\n", key)?;
if let Some((k, v)) = self.parse_get_response(false)? {
if k != key {
@@ -317,7 +340,7 @@ impl AsciiProtocol<Stream> {
&mut self,
has_cas: bool,
) -> Result<Option<(String, V)>, MemcacheError> {
- let result = self.reader.read_line(|buf| {
+ let result = self.stream.read_line(|buf| {
let buf = MemcacheError::try_from(buf)?;
if buf == END {
return Ok(None);
@@ -344,7 +367,7 @@ impl AsciiProtocol<Stream> {
match result {
Some((key, flags, length, cas)) => {
let mut value = vec![0u8; length + 2];
- self.reader.read_exact(value.as_mut_slice())?;
+ self.stream.read_exact(value.as_mut_slice())?;
if &value[length..] != b"\r\n" {
return Err(ServerError::BadResponse(Cow::Owned(String::from_utf8(value)?)))?;
}
@@ -367,7 +390,7 @@ impl AsciiProtocol<Stream> {
check_key_len(k.as_ref())?;
}
- let mut writer = BufWriter::new(self.reader.get_mut());
+ let writer = self.stream.get_mut();
writer.write_all(b"gets")?;
for k in keys.iter() {
writer.write_all(b" ")?;
@@ -375,7 +398,6 @@ impl AsciiProtocol<Stream> {
}
writer.write_all(b"\r\n")?;
writer.flush()?;
- drop(writer);
let mut result: HashMap<String, V> = HashMap::with_capacity(keys.len());
// there will be atmost keys.len() "VALUE <...>" responses and one END response
@@ -391,7 +413,7 @@ impl AsciiProtocol<Stream> {
Err(ServerError::BadResponse(Cow::Borrowed("Expected end of gets response")))?
}
- pub(super) fn cas<V: ToMemcacheValue<Stream>>(
+ pub(super) fn cas<V: ToMemcacheValue<C>>(
&mut self,
key: &str,
value: V,
@@ -412,7 +434,7 @@ impl AsciiProtocol<Stream> {
}
}
- pub(super) fn set<V: ToMemcacheValue<Stream>>(
+ pub(super) fn set<V: ToMemcacheValue<C>>(
&mut self,
key: &str,
value: V,
@@ -425,7 +447,7 @@ impl AsciiProtocol<Stream> {
self.store(StoreCommand::Set, key, value, &options).map(|_| ())
}
- pub(super) fn sets<V: ToMemcacheValue<Stream>, K: AsRef<str>, I: IntoIterator<Item = (K, V)>>(
+ pub(super) fn sets<V: ToMemcacheValue<C>, K: AsRef<str>, I: IntoIterator<Item = (K, V)>>(
&mut self,
entries: I,
expiration: u32,
@@ -437,7 +459,7 @@ impl AsciiProtocol<Stream> {
self.stores(StoreCommand::Set, entries, &options).map(|_| ())
}
- pub(super) fn add<V: ToMemcacheValue<Stream>>(
+ pub(super) fn add<V: ToMemcacheValue<C>>(
&mut self,
key: &str,
value: V,
@@ -450,7 +472,7 @@ impl AsciiProtocol<Stream> {
self.store(StoreCommand::Add, key, value, &options).map(|_| ())
}
- pub(super) fn replace<V: ToMemcacheValue<Stream>>(
+ pub(super) fn replace<V: ToMemcacheValue<C>>(
&mut self,
key: &str,
value: V,
@@ -463,13 +485,13 @@ impl AsciiProtocol<Stream> {
self.store(StoreCommand::Replace, key, value, &options).map(|_| ())
}
- pub(super) fn append<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
+ pub(super) fn append<V: ToMemcacheValue<C>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
check_key_len(key)?;
self.store(StoreCommand::Append, key, value, &Default::default())
.map(|_| ())
}
- pub(super) fn prepend<V: ToMemcacheValue<Stream>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
+ pub(super) fn prepend<V: ToMemcacheValue<C>>(&mut self, key: &str, value: V) -> Result<(), MemcacheError> {
check_key_len(key)?;
self.store(StoreCommand::Prepend, key, value, &Default::default())
.map(|_| ())
@@ -482,10 +504,10 @@ impl AsciiProtocol<Stream> {
}
for k in keys {
- write!(self.reader.get_mut(), "delete {}\r\n", k.as_ref())?;
+ write!(self.stream.get_mut(), "delete {}\r\n", k.as_ref())?;
}
// Flush now that all the requests have been written.
- self.reader.get_mut().flush()?;
+ self.stream.get_mut().flush()?;
// Receive all the responses. If there were errors, return the first.
@@ -493,7 +515,7 @@ impl AsciiProtocol<Stream> {
for _ in 0..keys.len() {
let one_result = self
- .reader
+ .stream
.read_line(|response| match MemcacheError::try_from(response) {
Ok(s) => {
if s == "DELETED\r\n" {
@@ -530,7 +552,7 @@ impl AsciiProtocol<Stream> {
}
fn parse_u64_response(&mut self) -> Result<u64, MemcacheError> {
- self.reader.read_line(|response| {
+ self.stream.read_line(|response| {
let s = MemcacheError::try_from(response)?;
Ok(s.trim_end_matches("\r\n").parse::<u64>()?)
})
@@ -538,21 +560,21 @@ impl AsciiProtocol<Stream> {
pub(super) fn increment(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
check_key_len(key)?;
- write!(self.reader.get_mut(), "incr {} {}\r\n", key, amount)?;
+ write!(self.stream.get_mut(), "incr {} {}\r\n", key, amount)?;
self.parse_u64_response()
}
pub(super) fn decrement(&mut self, key: &str, amount: u64) -> Result<u64, MemcacheError> {
check_key_len(key)?;
- write!(self.reader.get_mut(), "decr {} {}\r\n", key, amount)?;
+ write!(self.stream.get_mut(), "decr {} {}\r\n", key, amount)?;
self.parse_u64_response()
}
pub(super) fn touch(&mut self, key: &str, expiration: u32) -> Result<bool, MemcacheError> {
check_key_len(key)?;
- write!(self.reader.get_mut(), "touch {} {}\r\n", key, expiration)?;
- self.reader.get_mut().flush()?;
- self.reader
+ write!(self.stream.get_mut(), "touch {} {}\r\n", key, expiration)?;
+ self.stream.get_mut().flush()?;
+ self.stream
.read_line(|response| match MemcacheError::try_from(response) {
Ok(s) => {
if s == "TOUCHED\r\n" {
@@ -567,8 +589,8 @@ impl AsciiProtocol<Stream> {
}
pub(super) fn stats(&mut self) -> Result<Stats, MemcacheError> {
- self.reader.get_mut().write_all(b"stats\r\n")?;
- self.reader.get_mut().flush()?;
+ self.stream.get_mut().write_all(b"stats\r\n")?;
+ self.stream.get_mut().flush()?;
enum Loop {
Break,
@@ -577,7 +599,7 @@ impl AsciiProtocol<Stream> {
let mut stats: Stats = HashMap::new();
loop {
- let status = self.reader.read_line(|response| {
+ let status = self.stream.read_line(|response| {
if response != END {
return Ok(Loop::Break);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the test will hang on flush command after apply this patch, will dig it latter.
…tmutx and use it to make error handling more robust
|
Sorry for the late about this PR. I thought is PR is ok now (except just need for a rebase)? @letmutx |
|
I think this patch should be added: #110 (comment) |
This PR adds set_multi() and delete_multi() methods, which use pipelining to reduce the number of server round trips when setting or deleting multiple keys. The new methods accept generic collection arguments. For example, set_multi() accepts a HashMap or Vec of many types of strings.
This PR also adds get_multi() and changes client.gets() to use get_multi(), so that users of get_multi() can also benefit from the generic collection arguments.
Tests included.