diff --git a/dash-spv/src/client/core.rs b/dash-spv/src/client/core.rs index 5a0c5a7b0..ce3424b16 100644 --- a/dash-spv/src/client/core.rs +++ b/dash-spv/src/client/core.rs @@ -18,7 +18,7 @@ use crate::mempool_filter::MempoolFilter; use crate::network::NetworkManager; use crate::storage::{ PersistentBlockHeaderStorage, PersistentBlockStorage, PersistentFilterHeaderStorage, - PersistentFilterStorage, StorageManager, + PersistentFilterStorage, PersistentMetadataStorage, StorageManager, }; use crate::sync::SyncCoordinator; use crate::types::MempoolState; @@ -107,6 +107,7 @@ pub struct DashSpvClient, pub(super) running: Arc>, diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index d4d0bc374..1379972a3 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -19,7 +19,7 @@ use crate::mempool_filter::MempoolFilter; use crate::network::NetworkManager; use crate::storage::{ PersistentBlockHeaderStorage, PersistentBlockStorage, PersistentFilterHeaderStorage, - PersistentFilterStorage, StorageManager, + PersistentFilterStorage, PersistentMetadataStorage, StorageManager, }; use crate::sync::{ BlockHeadersManager, BlocksManager, ChainLockManager, FilterHeadersManager, FiltersManager, @@ -57,6 +57,7 @@ impl DashSpvClient = Managers::default(); @@ -94,6 +95,7 @@ impl DashSpvClient StorageResult<()>; async fn load_metadata(&self, key: &str) -> StorageResult>>; diff --git a/dash-spv/src/storage/mod.rs b/dash-spv/src/storage/mod.rs index f05924860..1efb6c996 100644 --- a/dash-spv/src/storage/mod.rs +++ b/dash-spv/src/storage/mod.rs @@ -26,7 +26,6 @@ use tokio::sync::RwLock; use crate::error::StorageResult; use crate::storage::lockfile::LockFile; -use crate::storage::metadata::PersistentMetadataStorage; use crate::storage::transactions::PersistentTransactionStorage; use crate::types::{HashedBlock, HashedBlockHeader, MempoolState, UnconfirmedTransaction}; use crate::ClientConfig; @@ -38,7 +37,7 @@ pub use crate::storage::blocks::{BlockStorage, PersistentBlockStorage}; pub use crate::storage::filter_headers::{FilterHeaderStorage, PersistentFilterHeaderStorage}; pub use crate::storage::filters::{FilterStorage, PersistentFilterStorage}; pub use crate::storage::masternode::{MasternodeStateStorage, PersistentMasternodeStateStorage}; -pub use crate::storage::metadata::MetadataStorage; +pub use crate::storage::metadata::{MetadataStorage, PersistentMetadataStorage}; pub use crate::storage::peers::{PeerStorage, PersistentPeerStorage}; pub use crate::storage::transactions::TransactionStorage; @@ -83,6 +82,9 @@ pub trait StorageManager: /// Returns shared access to the block storage. fn blocks(&self) -> Arc>; + + /// Returns shared access to the metadata storage. + fn metadata(&self) -> Arc>; } /// Disk-based storage manager with segmented files and async background saving. @@ -274,6 +276,10 @@ impl StorageManager for DiskStorageManager { fn blocks(&self) -> Arc> { Arc::clone(&self.blocks) } + + fn metadata(&self) -> Arc> { + Arc::clone(&self.metadata) + } } #[async_trait] diff --git a/dash-spv/src/sync/chainlock/manager.rs b/dash-spv/src/sync/chainlock/manager.rs index 5fca675a4..da089980a 100644 --- a/dash-spv/src/sync/chainlock/manager.rs +++ b/dash-spv/src/sync/chainlock/manager.rs @@ -14,9 +14,12 @@ use std::collections::HashSet; use tokio::sync::RwLock; use crate::error::SyncResult; -use crate::storage::BlockHeaderStorage; +use crate::storage::{BlockHeaderStorage, MetadataStorage}; use crate::sync::{ChainLockProgress, SyncEvent}; +/// Metadata key for persisting the best validated ChainLock. +const BEST_CHAINLOCK_KEY: &str = "best_chainlock"; + /// ChainLock manager for the parallel sync coordinator. /// /// This manager: @@ -24,11 +27,13 @@ use crate::sync::{ChainLockProgress, SyncEvent}; /// - Validates ChainLocks only after masternode sync is complete /// - Tracks only the best (highest) validated ChainLock /// - Emits ChainLockReceived events -pub struct ChainLockManager { +pub struct ChainLockManager { /// Current progress of the manager. pub(super) progress: ChainLockProgress, /// Block header storage for hash verification. header_storage: Arc>, + /// Metadata storage for persisting the best chainlock. + metadata_storage: Arc>, /// Masternode engine for BLS signature validation. masternode_engine: Arc>, /// The best (highest height) validated ChainLock. @@ -39,15 +44,17 @@ pub struct ChainLockManager { masternode_ready: bool, } -impl ChainLockManager { +impl ChainLockManager { /// Create a new ChainLock manager. pub fn new( header_storage: Arc>, + metadata_storage: Arc>, masternode_engine: Arc>, ) -> Self { Self { progress: ChainLockProgress::default(), header_storage, + metadata_storage, masternode_engine, best_chainlock: None, requested_chainlocks: HashSet::new(), @@ -107,8 +114,9 @@ impl ChainLockManager { self.progress.add_valid(1); self.progress.update_best_validated_height(height); - // Update best ChainLock + // Update best ChainLock and persist to storage self.best_chainlock = Some(chainlock.clone()); + self.save_best_chainlock().await; } else { self.progress.add_invalid(1); } @@ -119,6 +127,48 @@ impl ChainLockManager { }]) } + /// Persist the best chainlock to metadata storage. + async fn save_best_chainlock(&self) { + let Some(chainlock) = &self.best_chainlock else { + return; + }; + match serde_json::to_vec(chainlock) { + Ok(bytes) => { + let mut storage = self.metadata_storage.write().await; + if let Err(e) = storage.store_metadata(BEST_CHAINLOCK_KEY, &bytes).await { + tracing::warn!("Failed to persist best chainlock: {}", e); + } + } + Err(e) => { + tracing::warn!("Failed to serialize best chainlock: {}", e); + } + } + } + + /// Load the best chainlock from metadata storage and restore progress. + pub(super) async fn load_best_chainlock(&mut self) { + let storage = self.metadata_storage.read().await; + match storage.load_metadata(BEST_CHAINLOCK_KEY).await { + Ok(Some(bytes)) => match serde_json::from_slice::(&bytes) { + Ok(chainlock) => { + let height = chainlock.block_height; + tracing::info!("Restored persisted ChainLock at height {}", height); + self.progress.update_best_validated_height(height); + self.best_chainlock = Some(chainlock); + } + Err(e) => { + tracing::warn!("Failed to deserialize persisted chainlock: {}", e); + } + }, + Ok(None) => { + tracing::debug!("No persisted chainlock found (fresh start)"); + } + Err(e) => { + tracing::warn!("Failed to load persisted chainlock: {}", e); + } + } + } + /// Verify that the ChainLock block hash matches our stored header. /// Returns true if the hash matches or we don't have the header yet. /// Returns false if we have the header and the hash doesn't match. @@ -177,7 +227,7 @@ impl ChainLockManager { } } -impl std::fmt::Debug for ChainLockManager { +impl std::fmt::Debug for ChainLockManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChainLockManager") .field("progress", &self.progress) @@ -191,20 +241,31 @@ impl std::fmt::Debug for ChainLockManager { mod tests { use super::*; use crate::network::MessageType; - use crate::storage::{DiskStorageManager, PersistentBlockHeaderStorage, StorageManager}; + use crate::storage::{ + DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager, + }; use crate::sync::{ManagerIdentifier, SyncManager, SyncManagerProgress, SyncState}; use crate::Network; use dashcore::bls_sig_utils::BLSSignature; use dashcore::hashes::Hash; use dashcore::BlockHash; - type TestChainLockManager = ChainLockManager; + type TestChainLockManager = + ChainLockManager; async fn create_test_manager() -> TestChainLockManager { let storage = DiskStorageManager::with_temp_dir().await.unwrap(); let engine = Arc::new(RwLock::new(MasternodeListEngine::default_for_network(Network::Testnet))); - ChainLockManager::new(storage.block_headers(), engine) + ChainLockManager::new(storage.block_headers(), storage.metadata(), engine) + } + + async fn create_test_manager_with_storage( + storage: &DiskStorageManager, + ) -> TestChainLockManager { + let engine = + Arc::new(RwLock::new(MasternodeListEngine::default_for_network(Network::Testnet))); + ChainLockManager::new(storage.block_headers(), storage.metadata(), engine) } fn create_test_chainlock(height: u32) -> ChainLock { @@ -307,4 +368,141 @@ mod tests { assert!(manager.is_block_chainlocked(500)); assert!(!manager.is_block_chainlocked(501)); } + + #[tokio::test] + async fn test_load_from_empty_storage_returns_none() { + let mut manager = create_test_manager().await; + + manager.load_best_chainlock().await; + + assert!(manager.best_chainlock().is_none()); + assert_eq!(manager.progress.best_validated_height(), 0); + } + + #[tokio::test] + async fn test_save_and_load_chainlock_round_trip() { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let chainlock = create_test_chainlock(42000); + + // Save a chainlock via the first manager + { + let mut manager = create_test_manager_with_storage(&storage).await; + manager.best_chainlock = Some(chainlock.clone()); + manager.save_best_chainlock().await; + } + + // Load it back via a fresh manager sharing the same storage + { + let mut manager = create_test_manager_with_storage(&storage).await; + assert!(manager.best_chainlock().is_none()); + + manager.load_best_chainlock().await; + + let restored = manager.best_chainlock().expect("chainlock should be restored"); + assert_eq!(restored.block_height, 42000); + assert_eq!(restored.block_hash, chainlock.block_hash); + assert_eq!(restored.signature, chainlock.signature); + assert_eq!(manager.progress.best_validated_height(), 42000); + } + } + + #[tokio::test] + async fn test_initialize_restores_persisted_chainlock() { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let chainlock = create_test_chainlock(99999); + + // Persist a chainlock directly via metadata storage + { + let bytes = serde_json::to_vec(&chainlock).unwrap(); + let meta_storage = storage.metadata(); + let mut meta = meta_storage.write().await; + meta.store_metadata(BEST_CHAINLOCK_KEY, &bytes).await.unwrap(); + } + + // Create a new manager and call initialize (the SyncManager trait method) + let mut manager = create_test_manager_with_storage(&storage).await; + manager.initialize().await.unwrap(); + + let restored = + manager.best_chainlock().expect("chainlock should be restored after initialize"); + assert_eq!(restored.block_height, 99999); + assert_eq!(manager.progress.best_validated_height(), 99999); + assert_eq!(manager.state(), SyncState::WaitingForConnections); + } + + #[tokio::test] + async fn test_process_chainlock_persists_on_validation() { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let mut manager = create_test_manager_with_storage(&storage).await; + + // Without masternode ready, chainlocks are not validated and not persisted + let chainlock = create_test_chainlock(500); + manager.process_chainlock(&chainlock).await.unwrap(); + assert!(manager.best_chainlock().is_none()); + + // Verify nothing was persisted + { + let meta_storage = storage.metadata(); + let meta = meta_storage.read().await; + let loaded = meta.load_metadata(BEST_CHAINLOCK_KEY).await.unwrap(); + assert!(loaded.is_none()); + } + } + + #[tokio::test] + async fn test_save_overwrites_previous_chainlock() { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + + // Save first chainlock + { + let mut manager = create_test_manager_with_storage(&storage).await; + manager.best_chainlock = Some(create_test_chainlock(100)); + manager.save_best_chainlock().await; + } + + // Save a higher chainlock + { + let mut manager = create_test_manager_with_storage(&storage).await; + manager.best_chainlock = Some(create_test_chainlock(200)); + manager.save_best_chainlock().await; + } + + // Load and verify only the latest is stored + { + let mut manager = create_test_manager_with_storage(&storage).await; + manager.load_best_chainlock().await; + + let restored = manager.best_chainlock().expect("chainlock should be restored"); + assert_eq!(restored.block_height, 200); + } + } + + #[tokio::test] + async fn test_lower_chainlock_rejected_after_load() { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + + // Save chainlock at height 200 + { + let mut manager = create_test_manager_with_storage(&storage).await; + manager.best_chainlock = Some(create_test_chainlock(200)); + manager.save_best_chainlock().await; + } + + // Load and try to process a lower chainlock + { + let mut manager = create_test_manager_with_storage(&storage).await; + manager.load_best_chainlock().await; + + // Try to process a lower chainlock + let lower_chainlock = create_test_chainlock(100); + let events = manager.process_chainlock(&lower_chainlock).await.unwrap(); + + // Should be rejected (no events) + assert_eq!(events.len(), 0); + + // Best should still be 200 + let best = manager.best_chainlock().expect("should have best chainlock"); + assert_eq!(best.block_height, 200); + } + } } diff --git a/dash-spv/src/sync/chainlock/sync_manager.rs b/dash-spv/src/sync/chainlock/sync_manager.rs index 8082f3450..0f9537c9b 100644 --- a/dash-spv/src/sync/chainlock/sync_manager.rs +++ b/dash-spv/src/sync/chainlock/sync_manager.rs @@ -1,6 +1,6 @@ use crate::error::SyncResult; use crate::network::{Message, MessageType, RequestSender}; -use crate::storage::BlockHeaderStorage; +use crate::storage::{BlockHeaderStorage, MetadataStorage}; use crate::sync::{ ChainLockManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState, }; @@ -9,7 +9,7 @@ use dashcore::network::message::NetworkMessage; use dashcore::network::message_blockdata::Inventory; #[async_trait] -impl SyncManager for ChainLockManager { +impl SyncManager for ChainLockManager { fn identifier(&self) -> ManagerIdentifier { ManagerIdentifier::ChainLock } @@ -26,6 +26,13 @@ impl SyncManager for ChainLockManager { &[MessageType::CLSig, MessageType::Inv] } + async fn initialize(&mut self) -> SyncResult<()> { + self.load_best_chainlock().await; + self.set_state(SyncState::WaitingForConnections); + tracing::info!("{} initialized", self.identifier()); + Ok(()) + } + async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/sync_coordinator.rs b/dash-spv/src/sync/sync_coordinator.rs index 1999505b8..35d50fdb2 100644 --- a/dash-spv/src/sync/sync_coordinator.rs +++ b/dash-spv/src/sync/sync_coordinator.rs @@ -14,7 +14,9 @@ use tokio_util::sync::CancellationToken; use crate::error::SyncResult; use crate::network::NetworkManager; -use crate::storage::{BlockHeaderStorage, BlockStorage, FilterHeaderStorage, FilterStorage}; +use crate::storage::{ + BlockHeaderStorage, BlockStorage, FilterHeaderStorage, FilterStorage, MetadataStorage, +}; use crate::sync::{ BlockHeadersManager, BlocksManager, ChainLockManager, FilterHeadersManager, FiltersManager, InstantSendManager, ManagerIdentifier, MasternodesManager, SyncEvent, SyncManager, @@ -59,12 +61,13 @@ macro_rules! spawn_manager { } /// Container for all manager instances. -pub struct Managers +pub struct Managers where H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, B: BlockStorage, + M: MetadataStorage, W: WalletInterface + 'static, { pub block_headers: Option>, @@ -72,16 +75,17 @@ where pub filters: Option>, pub blocks: Option>, pub masternode: Option>, - pub chainlock: Option>, + pub chainlock: Option>, pub instantsend: Option, } -impl Default for Managers +impl Default for Managers where H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, B: BlockStorage, + M: MetadataStorage, W: WalletInterface + 'static, { fn default() -> Self { @@ -102,16 +106,17 @@ where /// - Spawns each manager in its own tokio task /// - Tracks and aggregates progress via watch channels /// - Coordinates graceful shutdown -pub struct SyncCoordinator +pub struct SyncCoordinator where H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, B: BlockStorage, + M: MetadataStorage, W: WalletInterface + 'static, { /// Manager instances provided on construction and consumed in start spawned tasks. - managers: Managers, + managers: Managers, /// Progress receivers from spawned manager tasks. progress_receivers: Vec>, /// JoinSet for managing spawned tasks. @@ -130,18 +135,19 @@ where progress_task: Option>, } -impl SyncCoordinator +impl SyncCoordinator where H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, B: BlockStorage, + M: MetadataStorage, W: WalletInterface + 'static, { /// Create a new coordinator with the given config. /// /// Managers are passed to `start()` when sync begins. - pub fn new(managers: Managers) -> Self { + pub fn new(managers: Managers) -> Self { let (progress_sender, progress_receiver) = watch::channel(SyncProgress::default()); Self { managers, @@ -299,12 +305,13 @@ where } } -impl std::fmt::Debug for SyncCoordinator +impl std::fmt::Debug for SyncCoordinator where H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, B: BlockStorage, + M: MetadataStorage, W: WalletInterface + 'static, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {