diff --git a/src/handlers.rs b/src/handlers.rs index 4631186..033a96b 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -14,6 +14,8 @@ use crate::commands::*; use crate::error::BDKCliError as Error; #[cfg(any(feature = "sqlite", feature = "redb"))] use crate::persister::Persister; +#[cfg(feature = "cbf")] +use crate::utils::BlockchainClient::KyotoClient; use crate::utils::*; #[cfg(feature = "redb")] use bdk_redb::Store as RedbStore; @@ -45,8 +47,6 @@ use bdk_wallet::{ }; use cli_table::{Cell, CellStruct, Style, Table, format::Justify}; use serde_json::json; -#[cfg(feature = "cbf")] -use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select}; #[cfg(feature = "electrum")] use crate::utils::BlockchainClient::Electrum; @@ -602,7 +602,7 @@ pub fn handle_offline_wallet_subcommand( ))] pub(crate) async fn handle_online_wallet_subcommand( wallet: &mut Wallet, - client: BlockchainClient, + client: &mut BlockchainClient, online_subcommand: OnlineWalletSubCommand, ) -> Result { match online_subcommand { @@ -629,7 +629,7 @@ pub(crate) async fn handle_online_wallet_subcommand( client .populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx)); - let update = client.full_scan(request, _stop_gap, batch_size, false)?; + let update = client.full_scan(request, _stop_gap, *batch_size, false)?; wallet.apply_update(update)?; } #[cfg(feature = "esplora")] @@ -638,7 +638,7 @@ pub(crate) async fn handle_online_wallet_subcommand( parallel_requests, } => { let update = client - .full_scan(request, _stop_gap, parallel_requests) + .full_scan(request, _stop_gap, *parallel_requests) .await .map_err(|e| *e)?; wallet.apply_update(update)?; @@ -655,7 +655,7 @@ pub(crate) async fn handle_online_wallet_subcommand( hash: genesis_block.block_hash(), }); let mut emitter = Emitter::new( - &*client, + &**client, genesis_cp.clone(), genesis_cp.height(), NO_EXPECTED_MEMPOOL_TXS, @@ -986,11 +986,12 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { }; let mut wallet = new_persisted_wallet(network, &mut persister, wallet_opts)?; - let blockchain_client = new_blockchain_client(wallet_opts, &wallet, database_path)?; + let mut blockchain_client = + new_blockchain_client(wallet_opts, &wallet, database_path)?; let result = handle_online_wallet_subcommand( &mut wallet, - blockchain_client, + &mut blockchain_client, online_subcommand, ) .await?; @@ -1000,11 +1001,15 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { #[cfg(not(any(feature = "sqlite", feature = "redb")))] let result = { let wallet = new_wallet(network, wallet_opts)?; - let blockchain_client = + let mut blockchain_client = crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?; let mut wallet = new_wallet(network, wallet_opts)?; - handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand) - .await? + handle_online_wallet_subcommand( + &mut wallet, + &mut blockchain_client, + online_subcommand, + ) + .await? }; Ok(result) } @@ -1183,9 +1188,9 @@ async fn respond( ReplSubCommand::Wallet { subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { - let blockchain = + let mut blockchain = new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?; - let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand) + let value = handle_online_wallet_subcommand(wallet, &mut blockchain, online_subcommand) .await .map_err(|e| e.to_string())?; Some(value) @@ -1228,7 +1233,7 @@ async fn respond( feature = "rpc" ))] /// Syncs a given wallet using the blockchain client. -pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> { +pub async fn sync_wallet(client: &mut BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> { #[cfg(any(feature = "electrum", feature = "esplora"))] let request = wallet .start_sync_with_revealed_spks() @@ -1243,7 +1248,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul // already have. client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx)); - let update = client.sync(request, batch_size, false)?; + let update = client.sync(request, *batch_size, false)?; wallet .apply_update(update) .map_err(|e| Error::Generic(e.to_string())) @@ -1254,7 +1259,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul parallel_requests, } => { let update = client - .sync(request, parallel_requests) + .sync(request, *parallel_requests) .await .map_err(|e| *e)?; wallet @@ -1269,7 +1274,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul // reload the last 200 blocks in case of a reorg let emitter_height = wallet_cp.height().saturating_sub(200); let mut emitter = Emitter::new( - &*client, + &**client, wallet_cp, emitter_height, wallet @@ -1320,7 +1325,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul ))] /// Broadcasts a given transaction using the blockchain client. pub async fn broadcast_transaction( - client: BlockchainClient, + client: &mut BlockchainClient, tx: Transaction, ) -> Result { match client { @@ -1347,38 +1352,15 @@ pub async fn broadcast_transaction( #[cfg(feature = "cbf")] KyotoClient { client } => { - let LightClient { - requester, - mut info_subscriber, - mut warning_subscriber, - update_subscriber: _, - node, - } = *client; - - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber) - .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?; - - tokio::task::spawn(async move { node.run().await }); - tokio::task::spawn(async move { - select! { - info = info_subscriber.recv() => { - if let Some(info) = info { - tracing::info!("{info}"); - } - }, - warn = warning_subscriber.recv() => { - if let Some(warn) = warn { - tracing::warn!("{warn}"); - } - } - } - }); let txid = tx.compute_txid(); - let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| { - tracing::warn!("Broadcast was unsuccessful"); - Error::Generic("Transaction broadcast timed out after 30 seconds".into()) - })?; + let wtxid = client + .requester + .broadcast_random(tx.clone()) + .await + .map_err(|_| { + tracing::warn!("Broadcast was unsuccessful"); + Error::Generic("Transaction broadcast timed out after 30 seconds".into()) + })?; tracing::info!("Successfully broadcast WTXID: {wtxid}"); Ok(txid) } diff --git a/src/payjoin/mod.rs b/src/payjoin/mod.rs index f5e1274..d25f3b9 100644 --- a/src/payjoin/mod.rs +++ b/src/payjoin/mod.rs @@ -51,7 +51,7 @@ impl<'a> PayjoinManager<'a> { directory: String, max_fee_rate: Option, ohttp_relays: Vec, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result { let address = self .wallet @@ -119,7 +119,7 @@ impl<'a> PayjoinManager<'a> { uri: String, fee_rate: u64, ohttp_relays: Vec, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result { let uri = payjoin::Uri::try_from(uri) .map_err(|e| Error::Generic(format!("Failed parsing to Payjoin URI: {}", e)))?; @@ -237,7 +237,7 @@ impl<'a> PayjoinManager<'a> { persister: &impl SessionPersister, relay: impl payjoin::IntoUrl, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { match session { ReceiveSession::Initialized(proposal) => { @@ -306,7 +306,7 @@ impl<'a> PayjoinManager<'a> { persister: &impl SessionPersister, relay: impl payjoin::IntoUrl, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let mut current_receiver_typestate = receiver; let next_receiver_typestate = loop { @@ -353,7 +353,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver .assume_interactive_receiver() @@ -386,7 +386,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver .check_inputs_not_owned(&mut |input| { @@ -411,7 +411,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI // yet. If there is support either in the BDK persister or Payjoin persister, this can be @@ -437,7 +437,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| { Ok(self.wallet.is_mine(output_script.to_owned())) @@ -459,7 +459,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { // This is a typestate to modify existing receiver-owned outputs in case the receiver wants // to do that. This is a very simple implementation of Payjoin so we are just going @@ -483,7 +483,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let candidate_inputs: Vec = self .wallet @@ -533,7 +533,7 @@ impl<'a> PayjoinManager<'a> { receiver: Receiver, persister: &impl SessionPersister, max_fee_rate: FeeRate, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| { Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}")) @@ -546,7 +546,7 @@ impl<'a> PayjoinManager<'a> { &mut self, receiver: Receiver, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let next_receiver_typestate = receiver .finalize_proposal(|psbt| { @@ -580,7 +580,7 @@ impl<'a> PayjoinManager<'a> { &mut self, receiver: Receiver, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { let (req, ctx) = receiver.create_post_request( self.relay_manager @@ -607,72 +607,90 @@ impl<'a> PayjoinManager<'a> { .await; } - /// Syncs the blockchain once and then checks whether the Payjoin was broadcasted by the + /// Polls the blockchain periodically and checks whether the Payjoin was broadcasted by the /// sender. /// - /// The currenty implementation does not support checking for the Payjoin broadcast in a loop - /// and returning only when it is detected or if a timeout is reached because the [`sync_wallet`] - /// function consumes the BlockchainClient. BDK CLI supports multiple blockchain clients, and - /// at the time of writing, Kyoto consumes the client since BDK CLI is not designed for long-running - /// tasks. + /// This function syncs the wallet at regular intervals and checks for the Payjoin transaction + /// in a loop until it is detected or a timeout is reached. Since [`sync_wallet`] now accepts + /// a reference to the BlockchainClient, we can call it multiple times in a loop. async fn monitor_payjoin_proposal( &mut self, receiver: Receiver, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result<(), Error> { - let wait_time_for_sync = 3; - let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync); + let poll_interval = tokio::time::Duration::from_millis(200); + let sync_interval = tokio::time::Duration::from_secs(3); + let timeout_duration = tokio::time::Duration::from_secs(15); println!( - "Waiting for {wait_time_for_sync} seconds before syncing the blockchain and checking if the transaction has been broadcast..." + "Polling for Payjoin transaction broadcast. This may take up to {} seconds...", + timeout_duration.as_secs() ); - tokio::time::sleep(poll_internal).await; - sync_wallet(blockchain_client, self.wallet).await?; - - let check_result = receiver - .check_payment( - |txid| { - let Some(tx_details) = self.wallet.tx_details(txid) else { - return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); - }; - - let is_seen = match tx_details.chain_position { - bdk_wallet::chain::ChainPosition::Confirmed { .. } => true, - bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true, - _ => false - }; - - if is_seen { - return Ok(Some(tx_details.tx.as_ref().clone())); + let result = tokio::time::timeout(timeout_duration, async { + let mut poll_timer = tokio::time::interval(poll_interval); + let mut sync_timer = tokio::time::interval(sync_interval); + poll_timer.tick().await; + sync_timer.tick().await; + sync_wallet(blockchain_client, self.wallet).await?; + + loop { + tokio::select! { + _ = poll_timer.tick() => { + // Time to check payment + let check_result = receiver + .check_payment( + |txid| { + let Some(tx_details) = self.wallet.tx_details(txid) else { + return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); + }; + + let is_seen = match tx_details.chain_position { + bdk_wallet::chain::ChainPosition::Confirmed { .. } => true, + bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. } => true, + _ => false + }; + + if is_seen { + return Ok(Some(tx_details.tx.as_ref().clone())); + } + return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); + }, + |outpoint| { + let utxo = self.wallet.get_utxo(outpoint); + match utxo { + Some(_) => Ok(false), + None => Ok(true), + } + } + ) + .save(persister) + .map_err(|e| { + Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}")) + }); + + if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result { + println!("Payjoin transaction detected in the mempool!"); + return Ok(()); + } + // For Stasis or Err, continue polling (implicit - falls through to next loop iteration) } - return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain")); - }, - |outpoint| { - let utxo = self.wallet.get_utxo(outpoint); - match utxo { - Some(_) => Ok(false), - None => Ok(true), + _ = sync_timer.tick() => { + // Time to sync wallet + sync_wallet(blockchain_client, self.wallet).await?; } } - ) - .save(persister) - .map_err(|e| { - Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}")) - }); - - match check_result { - Ok(_) => { - println!("Payjoin transaction detected in the mempool!"); - } - Err(_) => { - println!( - "Transaction was not found in the mempool after {wait_time_for_sync}. Check the state of the transaction manually after running the sync command." - ); } + }) + .await; + + match result { + Ok(ok) => ok, + Err(_) => Err(Error::Generic(format!( + "Timeout waiting for Payjoin transaction broadcast after {:?}. Check the state of the transaction manually after running the sync command.", + timeout_duration + ))), } - - Ok(()) } async fn handle_error( @@ -734,7 +752,7 @@ impl<'a> PayjoinManager<'a> { session: SendSession, persister: &impl SessionPersister, relay: impl payjoin::IntoUrl, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result { match session { SendSession::WithReplyKey(context) => { @@ -757,7 +775,7 @@ impl<'a> PayjoinManager<'a> { sender: Sender, relay: impl payjoin::IntoUrl, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result { let (req, ctx) = sender.create_v2_post_request(relay.as_str()).map_err(|e| { Error::Generic(format!( @@ -780,7 +798,7 @@ impl<'a> PayjoinManager<'a> { sender: Sender, relay: impl payjoin::IntoUrl, persister: &impl SessionPersister, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result { let mut sender = sender.clone(); loop { @@ -815,7 +833,7 @@ impl<'a> PayjoinManager<'a> { async fn process_payjoin_proposal( &self, mut psbt: Psbt, - blockchain_client: BlockchainClient, + blockchain_client: &mut BlockchainClient, ) -> Result { if !self.wallet.sign(&mut psbt, SignOptions::default())? { return Err(Error::Generic( diff --git a/src/utils.rs b/src/utils.rs index 8a3ee04..8166529 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -159,7 +159,15 @@ pub(crate) enum BlockchainClient { }, #[cfg(feature = "cbf")] - KyotoClient { client: Box }, + KyotoClient { client: KyotoClientHandle }, +} + +/// Handle for the Kyoto client after the node has been started. +/// Contains only the components needed for sync and broadcast operations. +#[cfg(feature = "cbf")] +pub struct KyotoClientHandle { + pub requester: bdk_kyoto::Requester, + pub update_subscriber: bdk_kyoto::UpdateSubscriber, } #[cfg(any( @@ -216,13 +224,32 @@ pub(crate) fn new_blockchain_client( let scan_type = Sync; let builder = Builder::new(_wallet.network()); - let client = builder + let light_client = builder .required_peers(wallet_opts.compactfilter_opts.conn_count) .data_dir(&_datadir) .build_with_wallet(_wallet, scan_type)?; + let LightClient { + requester, + info_subscriber, + warning_subscriber, + update_subscriber, + node, + } = light_client; + + let subscriber = tracing_subscriber::FmtSubscriber::new(); + let _ = tracing::subscriber::set_global_default(subscriber); + + tokio::task::spawn(async move { node.run().await }); + tokio::task::spawn( + async move { trace_logger(info_subscriber, warning_subscriber).await }, + ); + BlockchainClient::KyotoClient { - client: Box::new(client), + client: KyotoClientHandle { + requester, + update_subscriber, + }, } } }; @@ -334,29 +361,17 @@ pub async fn trace_logger( // Handle Kyoto Client sync #[cfg(feature = "cbf")] -pub async fn sync_kyoto_client(wallet: &mut Wallet, client: Box) -> Result<(), Error> { - let LightClient { - requester, - info_subscriber, - warning_subscriber, - mut update_subscriber, - node, - } = *client; - - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber) - .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?; - - tokio::task::spawn(async move { node.run().await }); - tokio::task::spawn(async move { trace_logger(info_subscriber, warning_subscriber).await }); - - if !requester.is_running() { +pub async fn sync_kyoto_client( + wallet: &mut Wallet, + handle: &mut KyotoClientHandle, +) -> Result<(), Error> { + if !handle.requester.is_running() { tracing::error!("Kyoto node is not running"); return Err(Error::Generic("Kyoto node failed to start".to_string())); } tracing::info!("Kyoto node is running"); - let update = update_subscriber.update().await?; + let update = handle.update_subscriber.update().await?; tracing::info!("Received update: applying to wallet"); wallet .apply_update(update)