diff --git a/Cargo.toml b/Cargo.toml index e1459c77d..59ad2b767 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -82,7 +82,7 @@ prost = { version = "0.11.6", default-features = false} winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "bb5504ec62d4b7e9d5626d8b1a6de60d71e8d370", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5bf0d1e2427d759fc1ba4108ddc7e9b32e8bacfc", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } diff --git a/src/builder.rs b/src/builder.rs index b5f64c617..ff84505b4 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -70,8 +70,8 @@ use crate::peer_store::PeerStore; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, - OnionMessenger, PaymentStore, PeerManager, Persister, + ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, + MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -544,14 +544,12 @@ impl NodeBuilder { let storage_dir_path = self.config.storage_dir_path.clone(); fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new( - SqliteStore::new( - storage_dir_path.into(), - Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), - Some(io::sqlite_store::KV_TABLE_NAME.to_string()), - ) - .map_err(|_| BuildError::KVStoreSetupFailed)?, - ); + let kv_store = SqliteStore::new( + storage_dir_path.into(), + Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|_| BuildError::KVStoreSetupFailed)?; self.build_with_store(node_entropy, kv_store) } @@ -563,7 +561,7 @@ impl NodeBuilder { fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new(FilesystemStore::new(storage_dir_path)); + let kv_store = FilesystemStore::new(storage_dir_path); self.build_with_store(node_entropy, kv_store) } @@ -595,7 +593,7 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, Arc::new(vss_store)) + self.build_with_store(node_entropy, vss_store) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -622,7 +620,7 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, Arc::new(vss_store)) + self.build_with_store(node_entropy, vss_store) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -647,12 +645,12 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, Arc::new(vss_store)) + self.build_with_store(node_entropy, vss_store) } /// Builds a [`Node`] instance according to the options previously configured. - pub fn build_with_store( - &self, node_entropy: NodeEntropy, kv_store: Arc, + pub fn build_with_store( + &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; @@ -678,7 +676,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - kv_store, + Arc::new(DynStoreWrapper(kv_store)), ) } } @@ -1014,8 +1012,10 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. - pub fn build_with_store( - &self, node_entropy: Arc, kv_store: Arc, + // Note that the generics here don't actually work for Uniffi, but we don't currently expose + // this so its not needed. + pub fn build_with_store( + &self, node_entropy: Arc, kv_store: S, ) -> Result, BuildError> { self.inner.read().unwrap().build_with_store(*node_entropy, kv_store).map(Arc::new) } diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index b3d7880d6..0c3b644ca 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -6,13 +6,14 @@ // accordance with one or both of these licenses. use std::collections::{HashMap, VecDeque}; +use std::future::Future; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use bitcoin::{BlockHash, FeeRate, Network, Transaction, Txid}; +use bitcoin::{BlockHash, FeeRate, Network, OutPoint, Transaction, Txid}; use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; use lightning::chain::{BestBlock, Listen}; use lightning::util::ser::Writeable; @@ -23,7 +24,7 @@ use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; use lightning_block_sync::rest::RestClient; use lightning_block_sync::rpc::{RpcClient, RpcError}; use lightning_block_sync::{ - AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceErrorKind, Cache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceErrorKind, Cache, SpvClient, }; use serde::Serialize; @@ -117,7 +118,7 @@ impl BitcoindChainSource { } } - pub(super) fn as_utxo_source(&self) -> Arc { + pub(super) fn as_utxo_source(&self) -> UtxoSourceClient { self.api_client.utxo_source() } @@ -639,6 +640,78 @@ impl BitcoindChainSource { } } +#[derive(Clone)] +pub(crate) enum UtxoSourceClient { + Rpc(Arc), + Rest(Arc), +} + +impl std::ops::Deref for UtxoSourceClient { + type Target = Self; + fn deref(&self) -> &Self { + self + } +} + +impl BlockSource for UtxoSourceClient { + fn get_header<'a>( + &'a self, header_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_header(header_hash, height_hint).await, + Self::Rest(client) => client.get_header(header_hash, height_hint).await, + } + } + } + + fn get_block<'a>( + &'a self, header_hash: &'a BlockHash, + ) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_block(header_hash).await, + Self::Rest(client) => client.get_block(header_hash).await, + } + } + } + + fn get_best_block<'a>( + &'a self, + ) -> impl Future), BlockSourceError>> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_best_block().await, + Self::Rest(client) => client.get_best_block().await, + } + } + } +} + +impl UtxoSource for UtxoSourceClient { + fn get_block_hash_by_height<'a>( + &'a self, block_height: u32, + ) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_block_hash_by_height(block_height).await, + Self::Rest(client) => client.get_block_hash_by_height(block_height).await, + } + } + } + + fn is_output_unspent<'a>( + &'a self, outpoint: OutPoint, + ) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.is_output_unspent(outpoint).await, + Self::Rest(client) => client.is_output_unspent(outpoint).await, + } + } + } +} + pub enum BitcoindClient { Rpc { rpc_client: Arc, @@ -700,12 +773,10 @@ impl BitcoindClient { } } - pub(crate) fn utxo_source(&self) -> Arc { + fn utxo_source(&self) -> UtxoSourceClient { match self { - BitcoindClient::Rpc { rpc_client, .. } => Arc::clone(rpc_client) as Arc, - BitcoindClient::Rest { rest_client, .. } => { - Arc::clone(rest_client) as Arc - }, + Self::Rpc { rpc_client, .. } => UtxoSourceClient::Rpc(Arc::clone(&rpc_client)), + Self::Rest { rest_client, .. } => UtxoSourceClient::Rest(Arc::clone(&rest_client)), } } @@ -1189,38 +1260,40 @@ impl BitcoindClient { impl BlockSource for BitcoindClient { fn get_header<'a>( &'a self, header_hash: &'a bitcoin::BlockHash, height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - match self { - BitcoindClient::Rpc { rpc_client, .. } => { - Box::pin(async move { rpc_client.get_header(header_hash, height_hint).await }) - }, - BitcoindClient::Rest { rest_client, .. } => { - Box::pin(async move { rest_client.get_header(header_hash, height_hint).await }) - }, + ) -> impl Future> + 'a { + async move { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + rpc_client.get_header(header_hash, height_hint).await + }, + BitcoindClient::Rest { rest_client, .. } => { + rest_client.get_header(header_hash, height_hint).await + }, + } } } fn get_block<'a>( &'a self, header_hash: &'a bitcoin::BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - match self { - BitcoindClient::Rpc { rpc_client, .. } => { - Box::pin(async move { rpc_client.get_block(header_hash).await }) - }, - BitcoindClient::Rest { rest_client, .. } => { - Box::pin(async move { rest_client.get_block(header_hash).await }) - }, + ) -> impl Future> + 'a { + async move { + match self { + BitcoindClient::Rpc { rpc_client, .. } => rpc_client.get_block(header_hash).await, + BitcoindClient::Rest { rest_client, .. } => { + rest_client.get_block(header_hash).await + }, + } } } - fn get_best_block(&self) -> AsyncBlockSourceResult<'_, (bitcoin::BlockHash, Option)> { - match self { - BitcoindClient::Rpc { rpc_client, .. } => { - Box::pin(async move { rpc_client.get_best_block().await }) - }, - BitcoindClient::Rest { rest_client, .. } => { - Box::pin(async move { rest_client.get_best_block().await }) - }, + fn get_best_block<'a>( + &'a self, + ) -> impl Future), BlockSourceError>> + 'a { + async move { + match self { + BitcoindClient::Rpc { rpc_client, .. } => rpc_client.get_best_block().await, + BitcoindClient::Rest { rest_client, .. } => rest_client.get_best_block().await, + } } } } diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 2cd98e20d..a73ce7418 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -mod bitcoind; +pub(crate) mod bitcoind; mod electrum; mod esplora; @@ -15,9 +15,8 @@ use std::time::Duration; use bitcoin::{Script, Txid}; use lightning::chain::{BestBlock, Filter}; -use lightning_block_sync::gossip::UtxoSource; -use crate::chain::bitcoind::BitcoindChainSource; +use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; use crate::chain::electrum::ElectrumChainSource; use crate::chain::esplora::EsploraChainSource; use crate::config::{ @@ -202,7 +201,7 @@ impl ChainSource { } } - pub(crate) fn as_utxo_source(&self) -> Option> { + pub(crate) fn as_utxo_source(&self) -> Option { match &self.kind { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { Some(bitcoind_chain_source.as_utxo_source()) diff --git a/src/data_store.rs b/src/data_store.rs index 87bd831c9..d295ece51 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -177,6 +177,7 @@ mod tests { use super::*; use crate::hex_utils; use crate::io::test_utils::InMemoryStore; + use crate::types::DynStoreWrapper; #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] struct TestObjectId { @@ -235,7 +236,7 @@ mod tests { #[test] fn data_is_persisted() { - let store: Arc = Arc::new(InMemoryStore::new()); + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); let logger = Arc::new(TestLogger::new()); let primary_namespace = "datastore_test_primary".to_string(); let secondary_namespace = "datastore_test_secondary".to_string(); diff --git a/src/event.rs b/src/event.rs index 41f76f216..75270bf53 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1801,10 +1801,11 @@ mod tests { use super::*; use crate::io::test_utils::InMemoryStore; + use crate::types::DynStoreWrapper; #[tokio::test] async fn event_queue_persistence() { - let store: Arc = Arc::new(InMemoryStore::new()); + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); let logger = Arc::new(TestLogger::new()); let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); @@ -1842,7 +1843,7 @@ mod tests { #[tokio::test] async fn event_queue_concurrency() { - let store: Arc = Arc::new(InMemoryStore::new()); + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); let logger = Arc::new(TestLogger::new()); let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); diff --git a/src/gossip.rs b/src/gossip.rs index 01aff4742..563d9e1ea 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -71,7 +71,7 @@ impl GossipSource { if let Some(utxo_source) = chain_source.as_utxo_source() { let spawner = RuntimeSpawner::new(Arc::clone(&runtime)); let gossip_verifier = Arc::new(GossipVerifier::new( - utxo_source, + Arc::new(utxo_source), spawner, Arc::clone(gossip_sync), peer_manager, diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 789330cef..e4091b24e 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -6,12 +6,10 @@ // accordance with one or both of these licenses. //! Objects related to [`SqliteStore`] live here. -use std::boxed::Box; use std::collections::HashMap; use std::fs; use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -92,7 +90,7 @@ impl SqliteStore { impl KVStore for SqliteStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -100,17 +98,17 @@ impl KVStore for SqliteStore { let fut = tokio::task::spawn_blocking(move || { inner.read_internal(&primary_namespace, &secondary_namespace, &key) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -128,17 +126,17 @@ impl KVStore for SqliteStore { buf, ) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -155,29 +153,29 @@ impl KVStore for SqliteStore { &key, ) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); let fut = tokio::task::spawn_blocking(move || { inner.list_internal(&primary_namespace, &secondary_namespace) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } } diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index a360b443b..6eb04df3f 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -5,12 +5,10 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::boxed::Box; use std::collections::{hash_map, HashMap}; use std::future::Future; use std::panic::RefUnwindSafe; use std::path::PathBuf; -use std::pin::Pin; use std::sync::Mutex; use lightning::events::ClosureReason; @@ -106,27 +104,27 @@ impl InMemoryStore { impl KVStore for InMemoryStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + 'static + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let res = self.read_internal(&primary_namespace, &secondary_namespace, &key); - Box::pin(async move { res }) + async move { res } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + 'static + Send>> { + ) -> impl Future> + 'static + Send { let res = self.write_internal(&primary_namespace, &secondary_namespace, &key, buf); - Box::pin(async move { res }) + async move { res } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Pin> + 'static + Send>> { + ) -> impl Future> + 'static + Send { let res = self.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy); - Box::pin(async move { res }) + async move { res } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + 'static + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let res = self.list_internal(primary_namespace, secondary_namespace); - Box::pin(async move { res }) + async move { res } } } diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 05709caa7..eb439ed10 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -13,7 +13,6 @@ use std::fmt; use std::future::Future; #[cfg(test)] use std::panic::RefUnwindSafe; -use std::pin::Pin; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -292,27 +291,27 @@ impl KVStoreSync for VssStore { impl KVStore for VssStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { + async move { inner .read_internal(&inner.async_client, primary_namespace, secondary_namespace, key) .await - }) + } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { + async move { inner .write_internal( &inner.async_client, @@ -325,11 +324,11 @@ impl KVStore for VssStore { buf, ) .await - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -349,22 +348,24 @@ impl KVStore for VssStore { ) .await }; - if lazy { - tokio::task::spawn(async { fut.await }); - Box::pin(async { Ok(()) }) - } else { - Box::pin(async { fut.await }) + async move { + if lazy { + tokio::task::spawn(async move { fut.await }); + Ok(()) + } else { + fut.await + } } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { + async move { inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await - }) + } } } diff --git a/src/lib.rs b/src/lib.rs index bbae8ac72..fdaa0f4f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,12 +158,10 @@ use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, KeysManager, - OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, -}; -pub use types::{ - ChannelDetails, CustomTlvRecord, DynStore, PeerDetails, SyncAndAsyncKVStore, UserChannelId, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; +pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, vss_client, diff --git a/src/payment/asynchronous/static_invoice_store.rs b/src/payment/asynchronous/static_invoice_store.rs index 45125cfee..cd0e2ebd2 100644 --- a/src/payment/asynchronous/static_invoice_store.rs +++ b/src/payment/asynchronous/static_invoice_store.rs @@ -161,11 +161,11 @@ mod tests { use crate::io::test_utils::InMemoryStore; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; - use crate::types::DynStore; + use crate::types::{DynStore, DynStoreWrapper}; #[tokio::test] async fn static_invoice_store_test() { - let store: Arc = Arc::new(InMemoryStore::new()); + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); let static_invoice_store = StaticInvoiceStore::new(Arc::clone(&store)); let static_invoice = invoice(); diff --git a/src/peer_store.rs b/src/peer_store.rs index 59cd3d94f..ce8a9810e 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -156,10 +156,11 @@ mod tests { use super::*; use crate::io::test_utils::InMemoryStore; + use crate::types::DynStoreWrapper; #[test] fn peer_info_persistence() { - let store: Arc = Arc::new(InMemoryStore::new()); + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); let logger = Arc::new(TestLogger::new()); let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); diff --git a/src/scoring.rs b/src/scoring.rs index e85abade3..6385f2f56 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -12,7 +12,8 @@ use crate::config::{ use crate::io::utils::write_external_pathfinding_scores_to_cache; use crate::logger::LdkLogger; use crate::runtime::Runtime; -use crate::{write_node_metrics, DynStore, Logger, NodeMetrics, Scorer}; +use crate::types::DynStore; +use crate::{write_node_metrics, Logger, NodeMetrics, Scorer}; /// Start a background task that periodically downloads scores via an external url and merges them into the local /// pathfinding scores. diff --git a/src/types.rs b/src/types.rs index 38519eca7..ea4de2a63 100644 --- a/src/types.rs +++ b/src/types.rs @@ -6,6 +6,8 @@ // accordance with one or both of these licenses. use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use bitcoin::secp256k1::PublicKey; @@ -23,10 +25,11 @@ use lightning::sign::InMemorySigner; use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; -use lightning_block_sync::gossip::{GossipVerifier, UtxoSource}; +use lightning_block_sync::gossip::GossipVerifier; use lightning_liquidity::utils::time::DefaultTimeProvider; use lightning_net_tokio::SocketDescriptor; +use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; use crate::config::ChannelConfig; use crate::data_store::DataStore; @@ -47,8 +50,139 @@ where { } -/// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers; -pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send; +pub(crate) trait DynStoreTrait: Send + Sync { + fn read_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; + fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send + 'static>>; + fn remove_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send + 'static>>; + fn list_async( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; + + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, bitcoin::io::Error>; + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), bitcoin::io::Error>; + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), bitcoin::io::Error>; + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, bitcoin::io::Error>; +} + +impl<'a> KVStore for dyn DynStoreTrait + 'a { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, bitcoin::io::Error>> + Send + 'static { + DynStoreTrait::read_async(self, primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + Send + 'static { + DynStoreTrait::write_async(self, primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + Send + 'static { + DynStoreTrait::remove_async(self, primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, bitcoin::io::Error>> + Send + 'static { + DynStoreTrait::list_async(self, primary_namespace, secondary_namespace) + } +} + +impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, bitcoin::io::Error> { + DynStoreTrait::read(self, primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), bitcoin::io::Error> { + DynStoreTrait::write(self, primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), bitcoin::io::Error> { + DynStoreTrait::remove(self, primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, bitcoin::io::Error> { + DynStoreTrait::list(self, primary_namespace, secondary_namespace) + } +} + +pub(crate) type DynStore = dyn DynStoreTrait; + +pub(crate) struct DynStoreWrapper(pub(crate) T); + +impl DynStoreTrait for DynStoreWrapper { + fn read_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + Box::pin(KVStore::read(&self.0, primary_namespace, secondary_namespace, key)) + } + + fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send + 'static>> { + Box::pin(KVStore::write(&self.0, primary_namespace, secondary_namespace, key, buf)) + } + + fn remove_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send + 'static>> { + Box::pin(KVStore::remove(&self.0, primary_namespace, secondary_namespace, key, lazy)) + } + + fn list_async( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + Box::pin(KVStore::list(&self.0, primary_namespace, secondary_namespace)) + } + + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, bitcoin::io::Error> { + KVStoreSync::read(&self.0, primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), bitcoin::io::Error> { + KVStoreSync::write(&self.0, primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), bitcoin::io::Error> { + KVStoreSync::remove(&self.0, primary_namespace, secondary_namespace, key, lazy) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, bitcoin::io::Error> { + KVStoreSync::list(&self.0, primary_namespace, secondary_namespace) + } +} pub type Persister = MonitorUpdatingPersister< Arc, @@ -119,7 +253,7 @@ pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>; -pub(crate) type UtxoLookup = GossipVerifier, Arc>; +pub(crate) type UtxoLookup = GossipVerifier, Arc>; pub(crate) type P2PGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 2f8daa500..b2366a45f 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -7,7 +7,6 @@ use std::future::Future; use std::ops::Deref; -use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -859,20 +858,18 @@ impl Listen for Wallet { impl WalletSource for Wallet { fn list_confirmed_utxos<'a>( &'a self, - ) -> Pin, ()>> + Send + 'a>> { - Box::pin(async move { self.list_confirmed_utxos_inner() }) + ) -> impl Future, ()>> + Send + 'a { + async move { self.list_confirmed_utxos_inner() } } - fn get_change_script<'a>( - &'a self, - ) -> Pin> + Send + 'a>> { - Box::pin(async move { self.get_change_script_inner() }) + fn get_change_script<'a>(&'a self) -> impl Future> + Send + 'a { + async move { self.get_change_script_inner() } } fn sign_psbt<'a>( &'a self, psbt: Psbt, - ) -> Pin> + Send + 'a>> { - Box::pin(async move { self.sign_psbt_inner(psbt) }) + ) -> impl Future> + Send + 'a { + async move { self.sign_psbt_inner(psbt) } } } @@ -1018,17 +1015,15 @@ impl SignerProvider for WalletKeysManager { impl ChangeDestinationSource for WalletKeysManager { fn get_change_destination_script<'a>( &'a self, - ) -> Pin> + Send + 'a>> { - let wallet = Arc::clone(&self.wallet); - let logger = Arc::clone(&self.logger); - Box::pin(async move { - wallet + ) -> impl Future> + Send + 'a { + async move { + self.wallet .get_new_internal_address() .map_err(|e| { - log_error!(logger, "Failed to retrieve new address from wallet: {}", e); + log_error!(self.logger, "Failed to retrieve new address from wallet: {}", e); }) .map(|addr| addr.script_pubkey()) .map_err(|_| ()) - }) + } } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 1783ee1af..96f58297c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,12 +10,10 @@ pub(crate) mod logging; -use std::boxed::Box; use std::collections::{HashMap, HashSet}; use std::env; use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -423,7 +421,7 @@ pub(crate) fn setup_node_for_async_payments( let node = match config.store_type { TestStoreType::TestSyncStore => { - let kv_store = Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.into())); + let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), @@ -1291,6 +1289,7 @@ pub(crate) async fn do_channel_full_cycle( } // A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity. +#[derive(Clone)] pub(crate) struct TestSyncStore { inner: Arc, } @@ -1305,7 +1304,7 @@ impl TestSyncStore { impl KVStore for TestSyncStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -1313,16 +1312,16 @@ impl KVStore for TestSyncStore { let fut = tokio::task::spawn_blocking(move || { inner.read_internal(&primary_namespace, &secondary_namespace, &key) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -1330,16 +1329,16 @@ impl KVStore for TestSyncStore { let fut = tokio::task::spawn_blocking(move || { inner.write_internal(&primary_namespace, &secondary_namespace, &key, buf) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -1347,28 +1346,28 @@ impl KVStore for TestSyncStore { let fut = tokio::task::spawn_blocking(move || { inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); let fut = tokio::task::spawn_blocking(move || { inner.list_internal(&primary_namespace, &secondary_namespace) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 7c1ed8344..892afedcc 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -31,7 +31,7 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, QrPaymentResult, }; -use ldk_node::{Builder, DynStore, Event, NodeError}; +use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; @@ -252,15 +252,14 @@ async fn start_stop_reinit() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let test_sync_store: Arc = - Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.clone().into())); + let test_sync_store = TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); let sync_config = EsploraSyncConfig { background_sync_config: None }; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let node = - builder.build_with_store(config.node_entropy.into(), Arc::clone(&test_sync_store)).unwrap(); + builder.build_with_store(config.node_entropy.into(), test_sync_store.clone()).unwrap(); node.start().unwrap(); let expected_node_id = node.node_id(); @@ -299,7 +298,7 @@ async fn start_stop_reinit() { builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); let reinitialized_node = - builder.build_with_store(config.node_entropy.into(), Arc::clone(&test_sync_store)).unwrap(); + builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id);