From 1cf46045a32c35739ba865906320f34eb19cc53d Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Fri, 12 Sep 2025 13:29:04 +0200 Subject: [PATCH] Add WebSocket support Signed-off-by: Yuki Kishimoto --- Cargo.lock | 174 ++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 12 ++- examples/mempool.rs | 23 ++++++ src/client.rs | 14 ++++ src/error.rs | 34 +++++++++ src/lib.rs | 2 + src/prelude.rs | 2 + src/response.rs | 54 ++++++++++++++ src/websocket.rs | 145 ++++++++++++++++++++++++++++++++++++ 9 files changed, 453 insertions(+), 7 deletions(-) create mode 100644 src/websocket.rs diff --git a/Cargo.lock b/Cargo.lock index 1a07d32..44034cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,15 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -169,6 +178,41 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -187,7 +231,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -241,6 +285,23 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + [[package]] name = "futures-task" version = "0.3.31" @@ -254,9 +315,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-macro", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", ] [[package]] @@ -380,7 +454,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.1", ] [[package]] @@ -614,10 +688,13 @@ name = "mempoolspace" version = "0.2.0" dependencies = [ "bitcoin", + "futures-util", "reqwest", "serde", "serde_json", "tokio", + "tokio-tungstenite", + "tracing", "url", ] @@ -820,7 +897,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -905,7 +982,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 1.0.1", ] [[package]] @@ -944,7 +1021,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1090,6 +1167,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1171,7 +1259,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1268,6 +1356,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", + "webpki-roots 0.26.11", +] + [[package]] name = "tower" version = "0.5.2" @@ -1320,9 +1424,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.34" @@ -1338,6 +1454,31 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -1361,6 +1502,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -1373,6 +1520,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" @@ -1488,6 +1641,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.1", +] + [[package]] name = "webpki-roots" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index ad9aafa..b94be07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,13 +12,15 @@ keywords = ["bitcoin", "mempool"] [features] # Default features -default = ["rustls"] +default = ["rustls", "ws"] # Enable rust TLS rustls = ["reqwest/rustls-tls"] # Enable natile TLS (openssl) nativetls = ["reqwest/native-tls"] # Enable socks proxy socks = ["reqwest/socks"] +# Enable WebSocket client +ws = ["dep:futures-util", "dep:serde_json", "dep:tokio", "dep:tokio-tungstenite", "dep:tracing"] [dependencies] bitcoin = { version = "0.32", default-features = false, features = ["std", "serde"] } @@ -26,9 +28,17 @@ reqwest = { version = "0.12", default-features = false, features = ["json"] } serde = { version = "1.0", features = ["derive"] } url = "2.5" +# WebSocket +futures-util = { version = "0.3", optional = true } +serde_json = { version = "1.0", optional = true } +tokio = { version = "1", features = ["sync", "time"], optional = true } +tokio-tungstenite = { version = "0.27", features = ["rustls-tls-webpki-roots"], optional = true } +tracing = { version = "0.1", optional = true } + [dev-dependencies] serde_json = "1.0" tokio = { version = "1.46", features = ["macros", "rt-multi-thread"] } [[example]] name = "mempool" +required-features = ["ws"] diff --git a/examples/mempool.rs b/examples/mempool.rs index 4aa947a..83377d7 100644 --- a/examples/mempool.rs +++ b/examples/mempool.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use mempoolspace::prelude::*; +use tokio::sync::mpsc::UnboundedReceiver; #[tokio::main] async fn main() { @@ -28,4 +29,26 @@ async fn main() { // Get recommended fees let fees = client.get_recommended_fees().await.unwrap(); println!("{:?}", fees); + + // Subscribe + let req = MempoolSubscriptionRequest::LiveData { + action: LiveDataAction::Want, + data: vec![LiveDataType::Stats], + }; + let sub = client.subscribe(req).await.unwrap(); + + tokio::select! { + _ = sub.worker => { + println!("Worker exited"); + }, + _ = handle_messages(sub.receiver) => { + println!("Receiver exited"); + } + } +} + +async fn handle_messages(mut rx: UnboundedReceiver) { + while let Some(message) = rx.recv().await { + println!("{message:?}"); + } } diff --git a/src/client.rs b/src/client.rs index 907ef46..f9ee90a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,6 +10,8 @@ use crate::response::{ AddressStats, BlockInfo, DifficultyAdjustment, FeeRecommendations, HashrateStats, MempoolBlockFees, MempoolStats, Prices, }; +#[cfg(feature = "ws")] +use crate::websocket::{self, MempoolSubscription, MempoolSubscriptionRequest}; /// Hashrate time period #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -160,4 +162,16 @@ impl MempoolClient { let response: Response = self.client.get(url).send().await?; Ok(response.json().await?) } + + /// Subscribe to mempool space websocket. + /// + /// This creates a new websocket connection! + #[inline] + #[cfg(feature = "ws")] + pub async fn subscribe( + &self, + req: MempoolSubscriptionRequest, + ) -> Result { + websocket::subscribe(&self.url, req).await + } } diff --git a/src/error.rs b/src/error.rs index 65f4d61..a568bb4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,6 +11,18 @@ pub enum Error { Url(ParseError), /// Reqwest error Reqwest(reqwest::Error), + /// Tungstenite error + #[cfg(feature = "ws")] + Tungstenite(tokio_tungstenite::tungstenite::Error), + /// JSON error + #[cfg(feature = "ws")] + Json(serde_json::Error), + /// Can't forward websocket message + #[cfg(feature = "ws")] + CantForwardMessage, + /// Unexpected URL scheme + #[cfg(feature = "ws")] + UnexpectedScheme, } impl std::error::Error for Error {} @@ -20,6 +32,14 @@ impl fmt::Display for Error { match self { Self::Url(e) => write!(f, "{e}"), Self::Reqwest(e) => write!(f, "{e}"), + #[cfg(feature = "ws")] + Self::Tungstenite(e) => write!(f, "{e}"), + #[cfg(feature = "ws")] + Self::Json(e) => write!(f, "{e}"), + #[cfg(feature = "ws")] + Self::CantForwardMessage => write!(f, "Can't forward websocket message"), + #[cfg(feature = "ws")] + Self::UnexpectedScheme => write!(f, "Unexpected URL scheme"), } } } @@ -35,3 +55,17 @@ impl From for Error { Self::Reqwest(e) } } + +#[cfg(feature = "ws")] +impl From for Error { + fn from(e: tokio_tungstenite::tungstenite::Error) -> Self { + Self::Tungstenite(e) + } +} + +#[cfg(feature = "ws")] +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Self::Json(e) + } +} diff --git a/src/lib.rs b/src/lib.rs index 5608abb..5c78b63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,3 +11,5 @@ mod deser; pub mod error; pub mod prelude; pub mod response; +#[cfg(feature = "ws")] +pub mod websocket; diff --git a/src/prelude.rs b/src/prelude.rs index 8410a29..6f38206 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -11,4 +11,6 @@ pub use crate::builder::*; pub use crate::client::*; pub use crate::error::*; pub use crate::response::*; +#[cfg(feature = "ws")] +pub use crate::websocket::*; pub use crate::*; diff --git a/src/response.rs b/src/response.rs index c31a4c9..c90d9fc 100644 --- a/src/response.rs +++ b/src/response.rs @@ -313,6 +313,60 @@ pub struct MempoolBlockFees { pub fee_range: Vec, } +/// Mempool info +#[cfg(feature = "ws")] +#[derive(Debug, Clone, PartialEq, PartialOrd, Deserialize)] +pub struct MempoolInfo { + /// Loaded + pub loaded: bool, + /// Mempool size + pub size: usize, + /// Bytes + pub bytes: usize, + /// Usage + pub usage: usize, + /// Total fee + pub total_fee: f64, + /// Max mempool + #[serde(rename = "maxmempool")] + pub max_mempool: usize, + /// Mempool min fee + #[serde(rename = "mempoolminfee")] + pub mempool_min_fee: f64, + /// Min relay tx fee + #[serde(rename = "minrelaytxfee")] + pub min_relay_tx_fee: f64, + /// Incremental relay fee + #[serde(rename = "incrementalrelayfee")] + pub incremental_relay_fee: f64, + /// Unbroadcast count + #[serde(rename = "unbroadcastcount")] + pub unbroadcast_count: usize, + /// Full RBF + #[serde(rename = "fullrbf")] + pub full_rbf: bool, +} + +/// Mempool subscription response +#[cfg(feature = "ws")] +#[derive(Debug, Clone, PartialEq, PartialOrd, Deserialize)] +pub struct MempoolSubscriptionResponse { + /// vByte/sec + #[serde(rename = "vBytesPerSecond")] + pub vbyte_per_second: Option, + /// Mempool blocks + #[serde(rename = "mempool-blocks")] + pub mempool_blocks: Option>, + /// Mempool info + #[serde(rename = "mempoolInfo")] + pub mempool_info: Option, + /// Fees + pub fees: Option, + /// Difficulty adjustment + #[serde(rename = "da")] + pub difficulty_adjustment: Option, +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/websocket.rs b/src/websocket.rs new file mode 100644 index 0000000..acc2e04 --- /dev/null +++ b/src/websocket.rs @@ -0,0 +1,145 @@ +//! WebSocket + +use std::pin::Pin; +use std::time::Duration; + +use futures_util::{SinkExt, StreamExt}; +use serde::Serialize; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; +use url::Url; + +use crate::error::Error; +use crate::response::MempoolSubscriptionResponse; + +const RECONNECT_DELAY: Duration = Duration::from_secs(10); + +/// Live data action +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] +pub enum LiveDataAction { + /// Want + #[serde(rename = "want")] + Want, +} + +/// Live data type +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] +pub enum LiveDataType { + /// Blocks + #[serde(rename = "blocks")] + Blocks, + /// Mempool blocks + #[serde(rename = "mempool-blocks")] + MempoolBlocks, + /// Live 2h chart + #[serde(rename = "live-2h-chart")] + Live2hChart, + /// Stats + #[serde(rename = "stats")] + Stats, +} + +/// Mempool subscription request +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] +#[serde(untagged)] +pub enum MempoolSubscriptionRequest { + /// Live data + LiveData { + /// Action + action: LiveDataAction, + /// Data + data: Vec, + }, +} + +/// Mempool subscription +pub struct MempoolSubscription { + /// Worker future + pub worker: Pin + Send + 'static>>, + /// Receiver for messages + pub receiver: UnboundedReceiver, +} + +fn upgrade_scheme_from_http_to_wss(url: &Url) -> Result { + match url.scheme() { + "http" => { + let mut url = url.clone(); + let _ = url.set_scheme("ws"); + Ok(url) + } + "https" => { + let mut url = url.clone(); + let _ = url.set_scheme("wss"); + Ok(url) + } + _ => Err(Error::UnexpectedScheme), + } +} + +pub(crate) async fn subscribe( + url: &Url, + payload: MempoolSubscriptionRequest, +) -> Result { + let url: Url = upgrade_scheme_from_http_to_wss(url)?; + let url: Url = url.join("/api/v1/ws")?; + + let (tx, rx) = mpsc::unbounded_channel(); + + let worker = async move { + loop { + match connect_and_subscribe(&url, &tx, &payload).await { + Ok(()) => tracing::warn!( + "Stream terminated. Reconnecting in {} seconds...", + RECONNECT_DELAY.as_secs() + ), + Err(e) => { + tracing::error!(error = %e, "Stream terminated with error. Reconnecting in {} seconds...", RECONNECT_DELAY.as_secs()) + } + } + + tokio::time::sleep(RECONNECT_DELAY).await; + } + }; + + Ok(MempoolSubscription { + worker: Box::pin(worker), + receiver: rx, + }) +} + +async fn connect_and_subscribe( + url: &Url, + tx: &UnboundedSender, + payload: &MempoolSubscriptionRequest, +) -> Result<(), Error> { + tracing::debug!("Connecting to {}", url); + + let (stream, _) = connect_async(url.as_str()).await?; + + tracing::info!("Connected to {}", url); + + // Split stream + let (mut ws_tx, mut ws_rx) = stream.split(); + + tracing::debug!("Subscribing to mempool"); + + // Subscribe to mempool + let payload: String = serde_json::to_string(&payload)?; + ws_tx.send(Message::text(payload)).await?; + + tracing::info!("Subscribed to mempool"); + + // Listen for messages + while let Some(message) = ws_rx.next().await { + if let Message::Text(text) = message? { + // Parse message + let msg: MempoolSubscriptionResponse = serde_json::from_str(&text)?; + + // Send message to receiver + tx.send(msg).map_err(|_| Error::CantForwardMessage)?; + } + } + + Ok(()) +}