Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dash-spv/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::mempool_filter::MempoolFilter;
use crate::network::NetworkManager;
use crate::storage::{
PersistentBlockHeaderStorage, PersistentBlockStorage, PersistentFilterHeaderStorage,
PersistentFilterStorage, StorageManager,
PersistentFilterStorage, PersistentMetadataStorage, StorageManager,
};
use crate::sync::SyncCoordinator;
use crate::types::MempoolState;
Expand Down Expand Up @@ -107,6 +107,7 @@ pub struct DashSpvClient<W: WalletInterface, N: NetworkManager, S: StorageManage
PersistentFilterHeaderStorage,
PersistentFilterStorage,
PersistentBlockStorage,
PersistentMetadataStorage,
W,
>,
pub(super) running: Arc<RwLock<bool>>,
Expand Down
4 changes: 3 additions & 1 deletion dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::mempool_filter::MempoolFilter;
use crate::network::NetworkManager;
use crate::storage::{
PersistentBlockHeaderStorage, PersistentBlockStorage, PersistentFilterHeaderStorage,
PersistentFilterStorage, StorageManager,
PersistentFilterStorage, PersistentMetadataStorage, StorageManager,
};
use crate::sync::{
BlockHeadersManager, BlocksManager, ChainLockManager, FilterHeadersManager, FiltersManager,
Expand Down Expand Up @@ -57,6 +57,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
PersistentFilterHeaderStorage,
PersistentFilterStorage,
PersistentBlockStorage,
PersistentMetadataStorage,
W,
> = Managers::default();

Expand Down Expand Up @@ -94,6 +95,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
));
managers.chainlock = Some(ChainLockManager::new(
storage.block_headers(),
storage.metadata(),
masternode_list_engine.clone(),
));
managers.instantsend = Some(InstantSendManager::new(masternode_list_engine.clone()));
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/storage/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
};

#[async_trait]
pub trait MetadataStorage {
pub trait MetadataStorage: Send + Sync + 'static {
async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()>;

async fn load_metadata(&self, key: &str) -> StorageResult<Option<Vec<u8>>>;
Expand Down
10 changes: 8 additions & 2 deletions dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use tokio::sync::RwLock;

use crate::error::StorageResult;
use crate::storage::lockfile::LockFile;
use crate::storage::metadata::PersistentMetadataStorage;
use crate::storage::transactions::PersistentTransactionStorage;
use crate::types::{HashedBlock, HashedBlockHeader, MempoolState, UnconfirmedTransaction};
use crate::ClientConfig;
Expand All @@ -38,7 +37,7 @@ pub use crate::storage::blocks::{BlockStorage, PersistentBlockStorage};
pub use crate::storage::filter_headers::{FilterHeaderStorage, PersistentFilterHeaderStorage};
pub use crate::storage::filters::{FilterStorage, PersistentFilterStorage};
pub use crate::storage::masternode::{MasternodeStateStorage, PersistentMasternodeStateStorage};
pub use crate::storage::metadata::MetadataStorage;
pub use crate::storage::metadata::{MetadataStorage, PersistentMetadataStorage};
pub use crate::storage::peers::{PeerStorage, PersistentPeerStorage};
pub use crate::storage::transactions::TransactionStorage;

Expand Down Expand Up @@ -83,6 +82,9 @@ pub trait StorageManager:

/// Returns shared access to the block storage.
fn blocks(&self) -> Arc<RwLock<PersistentBlockStorage>>;

/// Returns shared access to the metadata storage.
fn metadata(&self) -> Arc<RwLock<PersistentMetadataStorage>>;
}

/// Disk-based storage manager with segmented files and async background saving.
Expand Down Expand Up @@ -274,6 +276,10 @@ impl StorageManager for DiskStorageManager {
fn blocks(&self) -> Arc<RwLock<PersistentBlockStorage>> {
Arc::clone(&self.blocks)
}

fn metadata(&self) -> Arc<RwLock<PersistentMetadataStorage>> {
Arc::clone(&self.metadata)
}
}

#[async_trait]
Expand Down
214 changes: 206 additions & 8 deletions dash-spv/src/sync/chainlock/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@ use std::collections::HashSet;
use tokio::sync::RwLock;

use crate::error::SyncResult;
use crate::storage::BlockHeaderStorage;
use crate::storage::{BlockHeaderStorage, MetadataStorage};
use crate::sync::{ChainLockProgress, SyncEvent};

/// Metadata key for persisting the best validated ChainLock.
const BEST_CHAINLOCK_KEY: &str = "best_chainlock";

/// ChainLock manager for the parallel sync coordinator.
///
/// This manager:
/// - Subscribes to CLSig messages from the network
/// - Validates ChainLocks only after masternode sync is complete
/// - Tracks only the best (highest) validated ChainLock
/// - Emits ChainLockReceived events
pub struct ChainLockManager<H: BlockHeaderStorage> {
pub struct ChainLockManager<H: BlockHeaderStorage, M: MetadataStorage> {
/// Current progress of the manager.
pub(super) progress: ChainLockProgress,
/// Block header storage for hash verification.
header_storage: Arc<RwLock<H>>,
/// Metadata storage for persisting the best chainlock.
metadata_storage: Arc<RwLock<M>>,
/// Masternode engine for BLS signature validation.
masternode_engine: Arc<RwLock<MasternodeListEngine>>,
/// The best (highest height) validated ChainLock.
Expand All @@ -39,15 +44,17 @@ pub struct ChainLockManager<H: BlockHeaderStorage> {
masternode_ready: bool,
}

impl<H: BlockHeaderStorage> ChainLockManager<H> {
impl<H: BlockHeaderStorage, M: MetadataStorage> ChainLockManager<H, M> {
/// Create a new ChainLock manager.
pub fn new(
header_storage: Arc<RwLock<H>>,
metadata_storage: Arc<RwLock<M>>,
masternode_engine: Arc<RwLock<MasternodeListEngine>>,
) -> Self {
Self {
progress: ChainLockProgress::default(),
header_storage,
metadata_storage,
masternode_engine,
best_chainlock: None,
requested_chainlocks: HashSet::new(),
Expand Down Expand Up @@ -107,8 +114,9 @@ impl<H: BlockHeaderStorage> ChainLockManager<H> {
self.progress.add_valid(1);
self.progress.update_best_validated_height(height);

// Update best ChainLock
// Update best ChainLock and persist to storage
self.best_chainlock = Some(chainlock.clone());
self.save_best_chainlock().await;
} else {
self.progress.add_invalid(1);
}
Expand All @@ -119,6 +127,48 @@ impl<H: BlockHeaderStorage> ChainLockManager<H> {
}])
}

/// Persist the best chainlock to metadata storage.
async fn save_best_chainlock(&self) {
let Some(chainlock) = &self.best_chainlock else {
return;
};
match serde_json::to_vec(chainlock) {
Ok(bytes) => {
let mut storage = self.metadata_storage.write().await;
if let Err(e) = storage.store_metadata(BEST_CHAINLOCK_KEY, &bytes).await {
tracing::warn!("Failed to persist best chainlock: {}", e);
}
}
Err(e) => {
tracing::warn!("Failed to serialize best chainlock: {}", e);
}
}
}

/// Load the best chainlock from metadata storage and restore progress.
pub(super) async fn load_best_chainlock(&mut self) {
let storage = self.metadata_storage.read().await;
match storage.load_metadata(BEST_CHAINLOCK_KEY).await {
Ok(Some(bytes)) => match serde_json::from_slice::<ChainLock>(&bytes) {
Ok(chainlock) => {
let height = chainlock.block_height;
tracing::info!("Restored persisted ChainLock at height {}", height);
self.progress.update_best_validated_height(height);
self.best_chainlock = Some(chainlock);
}
Err(e) => {
tracing::warn!("Failed to deserialize persisted chainlock: {}", e);
}
},
Ok(None) => {
tracing::debug!("No persisted chainlock found (fresh start)");
}
Err(e) => {
tracing::warn!("Failed to load persisted chainlock: {}", e);
}
}
}

/// Verify that the ChainLock block hash matches our stored header.
/// Returns true if the hash matches or we don't have the header yet.
/// Returns false if we have the header and the hash doesn't match.
Expand Down Expand Up @@ -177,7 +227,7 @@ impl<H: BlockHeaderStorage> ChainLockManager<H> {
}
}

impl<H: BlockHeaderStorage> std::fmt::Debug for ChainLockManager<H> {
impl<H: BlockHeaderStorage, M: MetadataStorage> std::fmt::Debug for ChainLockManager<H, M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChainLockManager")
.field("progress", &self.progress)
Expand All @@ -191,20 +241,31 @@ impl<H: BlockHeaderStorage> std::fmt::Debug for ChainLockManager<H> {
mod tests {
use super::*;
use crate::network::MessageType;
use crate::storage::{DiskStorageManager, PersistentBlockHeaderStorage, StorageManager};
use crate::storage::{
DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager,
};
use crate::sync::{ManagerIdentifier, SyncManager, SyncManagerProgress, SyncState};
use crate::Network;
use dashcore::bls_sig_utils::BLSSignature;
use dashcore::hashes::Hash;
use dashcore::BlockHash;

type TestChainLockManager = ChainLockManager<PersistentBlockHeaderStorage>;
type TestChainLockManager =
ChainLockManager<PersistentBlockHeaderStorage, PersistentMetadataStorage>;

async fn create_test_manager() -> TestChainLockManager {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
let engine =
Arc::new(RwLock::new(MasternodeListEngine::default_for_network(Network::Testnet)));
ChainLockManager::new(storage.block_headers(), engine)
ChainLockManager::new(storage.block_headers(), storage.metadata(), engine)
}

async fn create_test_manager_with_storage(
storage: &DiskStorageManager,
) -> TestChainLockManager {
let engine =
Arc::new(RwLock::new(MasternodeListEngine::default_for_network(Network::Testnet)));
ChainLockManager::new(storage.block_headers(), storage.metadata(), engine)
}

fn create_test_chainlock(height: u32) -> ChainLock {
Expand Down Expand Up @@ -307,4 +368,141 @@ mod tests {
assert!(manager.is_block_chainlocked(500));
assert!(!manager.is_block_chainlocked(501));
}

#[tokio::test]
async fn test_load_from_empty_storage_returns_none() {
let mut manager = create_test_manager().await;

manager.load_best_chainlock().await;

assert!(manager.best_chainlock().is_none());
assert_eq!(manager.progress.best_validated_height(), 0);
}

#[tokio::test]
async fn test_save_and_load_chainlock_round_trip() {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
let chainlock = create_test_chainlock(42000);

// Save a chainlock via the first manager
{
let mut manager = create_test_manager_with_storage(&storage).await;
manager.best_chainlock = Some(chainlock.clone());
manager.save_best_chainlock().await;
}

// Load it back via a fresh manager sharing the same storage
{
let mut manager = create_test_manager_with_storage(&storage).await;
assert!(manager.best_chainlock().is_none());

manager.load_best_chainlock().await;

let restored = manager.best_chainlock().expect("chainlock should be restored");
assert_eq!(restored.block_height, 42000);
assert_eq!(restored.block_hash, chainlock.block_hash);
assert_eq!(restored.signature, chainlock.signature);
assert_eq!(manager.progress.best_validated_height(), 42000);
}
}

#[tokio::test]
async fn test_initialize_restores_persisted_chainlock() {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
let chainlock = create_test_chainlock(99999);

// Persist a chainlock directly via metadata storage
{
let bytes = serde_json::to_vec(&chainlock).unwrap();
let meta_storage = storage.metadata();
let mut meta = meta_storage.write().await;
meta.store_metadata(BEST_CHAINLOCK_KEY, &bytes).await.unwrap();
}

// Create a new manager and call initialize (the SyncManager trait method)
let mut manager = create_test_manager_with_storage(&storage).await;
manager.initialize().await.unwrap();

let restored =
manager.best_chainlock().expect("chainlock should be restored after initialize");
assert_eq!(restored.block_height, 99999);
assert_eq!(manager.progress.best_validated_height(), 99999);
assert_eq!(manager.state(), SyncState::WaitingForConnections);
}

#[tokio::test]
async fn test_process_chainlock_persists_on_validation() {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
let mut manager = create_test_manager_with_storage(&storage).await;

// Without masternode ready, chainlocks are not validated and not persisted
let chainlock = create_test_chainlock(500);
manager.process_chainlock(&chainlock).await.unwrap();
assert!(manager.best_chainlock().is_none());

// Verify nothing was persisted
{
let meta_storage = storage.metadata();
let meta = meta_storage.read().await;
let loaded = meta.load_metadata(BEST_CHAINLOCK_KEY).await.unwrap();
assert!(loaded.is_none());
}
}

#[tokio::test]
async fn test_save_overwrites_previous_chainlock() {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();

// Save first chainlock
{
let mut manager = create_test_manager_with_storage(&storage).await;
manager.best_chainlock = Some(create_test_chainlock(100));
manager.save_best_chainlock().await;
}

// Save a higher chainlock
{
let mut manager = create_test_manager_with_storage(&storage).await;
manager.best_chainlock = Some(create_test_chainlock(200));
manager.save_best_chainlock().await;
}

// Load and verify only the latest is stored
{
let mut manager = create_test_manager_with_storage(&storage).await;
manager.load_best_chainlock().await;

let restored = manager.best_chainlock().expect("chainlock should be restored");
assert_eq!(restored.block_height, 200);
}
}

#[tokio::test]
async fn test_lower_chainlock_rejected_after_load() {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();

// Save chainlock at height 200
{
let mut manager = create_test_manager_with_storage(&storage).await;
manager.best_chainlock = Some(create_test_chainlock(200));
manager.save_best_chainlock().await;
}

// Load and try to process a lower chainlock
{
let mut manager = create_test_manager_with_storage(&storage).await;
manager.load_best_chainlock().await;

// Try to process a lower chainlock
let lower_chainlock = create_test_chainlock(100);
let events = manager.process_chainlock(&lower_chainlock).await.unwrap();

// Should be rejected (no events)
assert_eq!(events.len(), 0);

// Best should still be 200
let best = manager.best_chainlock().expect("should have best chainlock");
assert_eq!(best.block_height, 200);
}
}
}
Loading
Loading