Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ jobs:
- name: "Setup Rust Toolchain"
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: "Run Tests"
run: cargo test --all-features
run: cargo test --all-features --doc
- name: "Run Example"
run: cd examples/echo_chat && cargo run

# Check formatting with rustfmt
formatting:
Expand Down
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "socketeer"
version = "0.0.3"
version = "0.1.0"
edition = "2021"
description = "Simplified websocket client based on Tokio-Tungstenite"
authors = ["Zach Heylmun <zheylmun@gmail.com>"]
Expand Down
18 changes: 14 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
use thiserror::Error;
use tokio_tungstenite::tungstenite::Message;

/// Error type for the Socketeer library.
/// This type is used to represent all possible external errors that can occur when using the Socketeer library.
#[derive(Debug, Error)]
pub enum Error {
/// Url Parse Error
#[error("Failed to parse URL: {}", 0)]
UrlParse {
/// The URL that failed to parse
url: String,
/// The source of the error, from the [URL crate](https://docs.rs/url/2.2.2/url/enum.ParseError.html)
#[source]
source: url::ParseError,
},
/// Websocket Error
/// Error thrown by the Tungstenite library when there is an issue with the websocket connection.
#[error("Tungstenite error: {0}")]
WebsocketError(#[from] tokio_tungstenite::tungstenite::Error),
/// Socketeer error when the websocket connection is closed unexpectedly.
#[error("Socket Closed")]
WebsocketClosed,
#[error("Channel Full")]
ChannelFull,
/// Error thrown if a message type not handled by `socketeer` is received.
#[error("Unexpected Message type: {0}")]
UnexpectedMessage(Message),
UnexpectedMessageType(Message),
/// Error thrown if the message received fails to serialize or deserialize.
#[error("Serialization Error: {0}")]
SerializationError(#[from] serde_json::Error),
/// Error thrown if socketeer is dropped without closing the connection.
/// This error will be removed once async destructors are stabilized.
/// See [issue](https://github.com/rust-lang/rust/issues/126482)
#[error("Socketeer dropped without closing")]
SocketeerDropped,
SocketeerDroppedWithoutClosing,
}
53 changes: 43 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![doc = include_str!("../README.md")]

#![deny(missing_docs)]
mod error;
#[cfg(feature = "mocking")]
mod mock_server;
Expand Down Expand Up @@ -32,6 +32,13 @@ struct TxChannelPayload {
response_tx: oneshot::Sender<Result<(), Error>>,
}

/// A WebSocket client that manages the connection to a WebSocket server.
/// The client can send and receive messages, and will transparently handle protocol messages.
/// # Type Parameters
/// - `RxMessage`: The type of message that the client will receive from the server.
/// - `TxMessage`: The type of message that the client will send to the server.
/// - `CHANNEL_SIZE`: The size of the internal channels used to communicate between
/// the task managing the WebSocket connection and the client.
#[derive(Debug)]
pub struct Socketeer<
RxMessage: for<'a> Deserialize<'a> + Debug,
Expand Down Expand Up @@ -84,6 +91,12 @@ impl<
})
}

/// Wait for the next parsed message from the WebSocket connection.
///
/// # Errors
///
/// - If the WebSocket connection is closed or otherwise errored
/// - If the message cannot be deserialized
#[cfg_attr(feature = "tracing", instrument)]
pub async fn next_message(&mut self) -> Result<RxMessage, Error> {
let Some(message) = self.receiever.recv().await else {
Expand All @@ -102,17 +115,24 @@ impl<
let message = serde_json::from_slice(&message)?;
Ok(message)
}
_ => Err(Error::UnexpectedMessage(message)),
_ => Err(Error::UnexpectedMessageType(message)),
}
}

/// Send a message to the WebSocket connection.
/// This function will wait for the message to be sent before returning.
///
/// # Errors
///
/// - If the message cannot be serialized
/// - If the WebSocket connection is closed, or otherwise errored
#[cfg_attr(feature = "tracing", instrument)]
pub async fn send(&self, message: TxMessage) -> Result<(), Error> {
#[cfg(feature = "tracing")]
debug!("Sending message: {:?}", message);

let (tx, rx) = oneshot::channel::<Result<(), Error>>();
let message = serde_json::to_string(&message).unwrap();
let message = serde_json::to_string(&message)?;

self.sender
.send(TxChannelPayload {
Expand All @@ -122,7 +142,10 @@ impl<
.await
.map_err(|_| Error::WebsocketClosed)?;
// We'll ensure that we always respond before dropping the tx channel
rx.await.unwrap()
match rx.await {
Ok(result) => result,
Err(_) => unreachable!("Socket loop always sends response before dropping one-shot"),
}
}

/// Consume self, closing down any remaining send/recieve, and return a new Socketeer instance if successful
Expand All @@ -149,6 +172,11 @@ impl<
Self::connect(&url).await
}

/// Close the WebSocket connection gracefully.
/// This function will wait for the connection to close before returning.
/// # Errors
/// - If the WebSocket connection is already closed
/// - If the WebSocket connection cannot be closed
#[cfg_attr(feature = "tracing", instrument)]
pub async fn close_connection(self) -> Result<(), Error> {
#[cfg(feature = "tracing")]
Expand All @@ -164,9 +192,14 @@ impl<
})
.await
.map_err(|_| Error::WebsocketClosed)?;
rx.await.unwrap()?;
self.socket_handle.await.unwrap()?;
Ok(())
match rx.await {
Ok(result) => result,
Err(_) => unreachable!("Socket loop always sends response before dropping one-shot"),
}?;
match self.socket_handle.await {
Ok(result) => result,
Err(_) => unreachable!("Socket loop does not panic, and is not cancelled"),
}
}
}

Expand Down Expand Up @@ -219,12 +252,12 @@ async fn send_socket_message(
LoopState::Running
}
}
Err(_) => LoopState::Error(Error::SocketeerDropped),
Err(_) => LoopState::Error(Error::SocketeerDroppedWithoutClosing),
}
} else {
#[cfg(feature = "tracing")]
error!("Socketeer dropped without closing connection");
LoopState::Error(Error::SocketeerDropped)
LoopState::Error(Error::SocketeerDroppedWithoutClosing)
}
}

Expand Down Expand Up @@ -264,7 +297,7 @@ async fn socket_message_received(
}
Message::Text(_) | Message::Binary(_) => match sender.send(message).await {
Ok(()) => LoopState::Running,
Err(_) => LoopState::Error(Error::SocketeerDropped),
Err(_) => LoopState::Error(Error::SocketeerDroppedWithoutClosing),
},
_ => LoopState::Running,
},
Expand Down
3 changes: 3 additions & 0 deletions src/mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ use tracing::debug;
/// Control messages for testing with the echo server.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
pub enum EchoControlMessage {
/// Send a message which the server should echo back
Message(String),
/// Request that the server send the client a ping
SendPing,
/// Request that the server close the connection
Close,
}

Expand Down
Loading