Skip to content

feat: rewrite, fix and improve the sync architecture#411

Merged
xdustinface merged 1 commit intov0.42-devfrom
feat/sync-rewrite
Feb 4, 2026
Merged

feat: rewrite, fix and improve the sync architecture#411
xdustinface merged 1 commit intov0.42-devfrom
feat/sync-rewrite

Conversation

@xdustinface
Copy link
Collaborator

@xdustinface xdustinface commented Feb 3, 2026

Modular, parallel, manager-based sync architecture. Each sync concern (BlockHeadersManager, FilterHeadersManager, FiltersManager, BlocksManager, MasternodesManager, ChainLockManager, InstantSendManager) has its own manager implementing a common SyncManager trait while the SyncCoordinator orchestrates them and coordinates the communication between them through a SyncEvent system.

Progress reporting is streamlined with each manager reporting its own progress through a unified SyncProgress system which updates the subscribers for every change in progress. Event handling follows the same pattern, with a consistent event model across sync, wallet, and network layers. The FFI can now subscribe to all events from sync progress, wallet, and network.

Filter sync got a lot of fixes and now processes filters and blocks in ordered batches. When gap limit maintenance generates new addresses a rescan is triggered automatically to pick up any transactions that match the new addresses.

Post sync is working stable now too, i was constantly running this branch in the last couple weeks monitoring things and didnt run into any more issues by now.

Summary by CodeRabbit

  • New Features

    • Non-blocking background client run with event-driven callbacks for sync, network, wallet and live progress updates.
    • Per-manager sync progress visibility and wallet/network event streams.
  • Improvements

    • Richer, modular progress reporting and lifecycle controls; safer memory/cleanup helpers.
    • CLI: data-directory and mnemonic options; improved logging and startup/shutdown flow.
  • Documentation

    • Expanded API docs and safety notes for callback-based usage.
  • Tests

    • Test suites updated to exercise new event/progress model.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

Replaces the legacy drain/poll FFI model with an asynchronous, callback-driven client run; introduces a parallel, event-driven sync architecture (SyncCoordinator + per-manager SyncManager implementations), new per-manager progress types and destroy helpers, expanded network/request APIs, and substantial storage, filters, masternode, and tests restructuring.

Changes

Cohort / File(s) Summary
FFI API & headers
dash-spv-ffi/FFI_API.md, dash-spv-ffi/include/dash_spv_ffi.h, dash-spv-ffi/dash_spv_ffi.h
Expanded FFI surface: new enums/types (FFIManagerId, FFISyncState), per-manager progress structs + destroy helpers, config APIs, new set/clear callback APIs (sync/network/wallet/progress), replaced drain API with dash_spv_ffi_client_run, added dash_spv_ffi_client_get_manager_sync_progress.
FFI runtime & wiring
dash-spv-ffi/src/callbacks.rs, dash-spv-ffi/src/client.rs, dash-spv-ffi/src/types.rs, dash-spv-ffi/src/bin/ffi_cli.rs
Rewrote to per-client per-domain callback containers and background monitors; added many extern "C" callbacks, progress dispatch + memory ownership, replaced polling/drain with non-blocking client_run, CLI updated to use new callback model and mnemonic/data-dir options.
FFI tests & docs
dash-spv-ffi/tests/*, dash-spv-ffi/FFI_API.md
Removed old drain-based C/Rust tests; added/updated tests for per-manager progress conversions and destroy helpers; docs updated to reflect callback-based API and lifecycle notes.
Client API & lifecycle
dash-spv-ffi/dash_spv_ffi.h, dash-spv-ffi/dash_spv_ffi.h (public header), dash-spv-ffi/src/client.rs, dash-spv/src/client/*.rs
Changed public entrypoint names, added setters/clearers for sync/network/wallet/progress callbacks, added client_run and per-manager progress getter; DashSpvClient rewired to use SyncCoordinator, per-type callback fields, background monitors, and RequestSender exposure.
Sync core & coordinator
dash-spv/src/sync/*, dash-spv/src/sync/sync_coordinator.rs, dash-spv/src/sync/sync_manager.rs, dash-spv/src/sync/progress.rs, dash-spv/src/sync/events.rs, dash-spv/src/sync/identifier.rs
Introduced SyncManager trait, SyncCoordinator, SyncEvent, ManagerIdentifier, SyncProgress/SyncState, SyncManagerTaskContext, progress/event channels, manager run-loop framework, and aggregation of per-manager progress.
Managers, pipelines & progress
dash-spv/src/sync/block_headers/*, dash-spv/src/sync/filter_headers/*, dash-spv/src/sync/filters/*, dash-spv/src/sync/blocks/*, dash-spv/src/sync/masternodes/*, dash-spv/src/sync/chainlock/*, dash-spv/src/sync/instantsend/*, dash-spv/src/sync/download_coordinator.rs
Added many new manager modules, pipelines, progress types and SyncManager implementations (block headers, filter headers, filters with batching/lookahead/rescan, blocks, masternodes QRInfo/MnListDiff, chainlocks, instantsend) and a DownloadCoordinator for pipelined downloads.
Network & request layer
dash-spv/src/network/mod.rs, dash-spv/src/network/manager.rs, dash-spv/src/network/event.rs
Added NetworkEvent, NetworkRequest/RequestSender, outbound request queue/processor, broadcast network-event bus, and NetworkManager extensions to provide request_sender and subscribe_network_events.
Storage & traits
dash-spv/src/storage/*, dash-spv/src/storage/mod.rs
Storage traits tightened to Send+Sync+'static; added hashed-header store APIs and filter_tip_height; DiskStorageManager exposes Arc<RwLock<...>> accessors to support concurrent manager usage.
Wallet & validation
key-wallet-manager/src/*, key-wallet/src/*, dash-spv/src/validation/filter.rs
Added WalletEvent + broadcast in WalletManager (subscribe_events/event_sender) emitting TransactionReceived and BalanceUpdated; ManagedCoreAccount tracks spent_outpoints; added FilterValidator for parallel filter validation.
Client queries & integration
dash-spv/src/client/queries.rs, dash-spv/src/client/events.rs, dash-spv/src/client/lifecycle.rs
Adjusted masternode engine API to return Result<Arc<RwLock<...>>>; queries became async; added subscribe_progress/subscribe_sync_events and changed lifecycle wiring to construct Managers and SyncCoordinator.
Tests removed or migrated
many dash-spv/tests/*, dash-spv-ffi/tests/*, dash-spv-ffi/tests/c_tests/*
Removed multiple legacy integration and drain-based tests; updated unit/integration tests to the new event/callback and per-manager progress model; some tests rewritten to poll/subscribe instead of drain.
Misc & docs
dash-spv/ARCHITECTURE.md, dash-spv/Cargo.toml, dash-spv/src/error.rs, assorted small changes
Updated architecture docs to event-driven parallel design; added tokio-stream/futures deps; added SyncError variant (MasternodeSyncFailed) and small platform integration/error message improvements.

Sequence Diagram(s)

sequenceDiagram
    participant FFI as FFI Client
    participant Runner as Client Run<br/>dash_spv_ffi_client_run
    participant Coord as SyncCoordinator
    participant MgrA as BlockHeaders<br/>Manager
    participant MgrB as Filters<br/>Manager
    participant Net as Network
    participant Stor as Storage
    participant Wallet as WalletMgr
    participant Events as Event Bus

    FFI->>Runner: dash_spv_ffi_client_run()
    Runner->>Coord: start()
    activate Coord
    Coord->>MgrA: spawn manager task
    Coord->>MgrB: spawn manager task
    loop managers operate
      Net-->>MgrA: NetworkMessage / NetworkEvent
      MgrA->>Stor: read/write headers
      MgrA->>Events: emit SyncEvent
      Events-->>MgrB: deliver SyncEvent
      MgrB->>Stor: read/write filters/blocks
      MgrB->>Wallet: notify matches -> WalletEvent
      Wallet->>Events: emit WalletEvent
      MgrA->>Coord: progress update
      MgrB->>Coord: progress update
    end
    Coord->>Runner: aggregated progress / SyncComplete
    Runner->>FFI: invoke registered FFI callbacks (sync/network/wallet/progress)
    deactivate Coord
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

"🐰 I hopped through code at break of dawn,
Managers sprouted on a sync-filled lawn.
Events like carrots fell in a trail,
Progress and callbacks tell the tall tale.
Hop—sync done—two noses twitch in song!"

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat: rewrite, fix and improve the sync architecture' clearly summarizes the main change—a comprehensive rewrite of the sync system with architectural improvements.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/sync-rewrite

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@xdustinface xdustinface force-pushed the feat/sync-rewrite branch 2 times, most recently from 05763c2 to 9680d12 Compare February 3, 2026 15:28
@xdustinface xdustinface marked this pull request as ready for review February 3, 2026 15:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
dash-spv/src/client/chainlock.rs (1)

31-43: ⚠️ Potential issue | 🟠 Major

Move network penalization outside storage and state locks
Drop the state read guard and storage lock before calling penalize_peer_invalid_chainlock to avoid holding locks across an await.

Refactor suggestion
-        let chain_state = self.state.read().await;
-        {
-            let mut storage = self.storage.lock().await;
-            if let Err(e) = self
-                .chainlock_manager
-                .process_chain_lock(chainlock.clone(), &chain_state, &mut *storage)
-                .await
-            {
-                let reason = format!("Invalid ChainLock: {}", e);
-                self.network.penalize_peer_invalid_chainlock(peer_address, &reason).await;
-                return Err(SpvError::Validation(e));
-            }
-        }
-        drop(chain_state);
+        let chain_state = self.state.read().await;
+        let process_result = {
+            let mut storage = self.storage.lock().await;
+            self.chainlock_manager
+                .process_chain_lock(chainlock.clone(), &chain_state, &mut *storage)
+                .await
+        };
+        drop(chain_state);
+
+        if let Err(e) = process_result {
+            let reason = format!("Invalid ChainLock: {}", e);
+            self.network
+                .penalize_peer_invalid_chainlock(peer_address, &reason)
+                .await;
+            return Err(SpvError::Validation(e));
+        }
dash-spv/src/storage/filter_headers.rs (1)

68-74: ⚠️ Potential issue | 🟠 Major

Handle errors in PersistentFilterHeaderStorage::persist
Calling SegmentCache::persist swallows all failures. Use the StorageResult-returning persist API on each segment and propagate errors with ? rather than just logging them.

key-wallet-manager/src/wallet_manager/mod.rs (1)

87-117: ⚠️ Potential issue | 🔴 Critical

Replace broadcast::Sender::new with broadcast::channel
Tokio’s broadcast API requires creating a channel via broadcast::channel(capacity), not Sender::new.

Proposed fix
pub fn new(network: Network) -> Self {
+    #[cfg(feature = "std")]
+    let (event_sender, _event_receiver) = broadcast::channel(DEFAULT_WALLET_EVENT_CAPACITY);

    Self {
        network,
        synced_height: 0,
        wallets: BTreeMap::new(),
        wallet_infos: BTreeMap::new(),
        #[cfg(feature = "std")]
-       event_sender: broadcast::Sender::new(DEFAULT_WALLET_EVENT_CAPACITY),
+       event_sender,
    }
}
dash-spv/ARCHITECTURE.md (1)

76-112: ⚠️ Potential issue | 🟡 Minor

Add language identifiers to fenced code blocks (MD040).
markdownlint flags the untyped fences; adding a language fixes the lint and improves readability.

🛠️ Suggested fix
@@
-```
+```text
@@
-```
+```text
@@
-```
+```text

Also applies to: 999-1042, 1074-1081

dash-spv-ffi/FFI_API.md (1)

1018-1061: ⚠️ Potential issue | 🟡 Minor

Usage example still references removed callback API.
The snippet calls dash_spv_ffi_client_start and dash_spv_ffi_client_set_event_callbacks, but the new flow uses dash_spv_ffi_client_run plus per-domain callback setters. Please update the example to avoid copy‑pasting a dead path.

dash-spv-ffi/src/bin/ffi_cli.rs (1)

264-303: ⚠️ Potential issue | 🟡 Minor

data-dir help text doesn’t match the actual default.
Help says “unique directory in /tmp”, but the default is .tmp/ffi-cli. Align the help text or adjust the default to match.

dash-spv-ffi/src/client.rs (1)

651-689: ⚠️ Potential issue | 🟠 Major

Align get_sync_progress with its destroy function to avoid leaks.
This returns FFISyncProgress (with nested heap allocations), but dash_spv_ffi_sync_progress_destroy only drops the outer box, leaking the nested pointers unless callers manually use the manager destroy. Either return the legacy progress type here, or delegate the destroy function to dash_spv_ffi_manager_sync_progress_destroy and update the docs accordingly. As per coding guidelines: All FFI types must have corresponding _destroy() functions for explicit memory management.

🧹 Example fix (delegate destroy)
-use crate::{
-    null_check, set_last_error, FFIClientConfig, FFIErrorCode, FFINetworkEventCallbacks,
-    FFIProgressCallback, FFISyncEventCallbacks, FFISyncProgress, FFIWalletEventCallbacks,
-    FFIWalletManager,
-};
+use crate::{
+    dash_spv_ffi_manager_sync_progress_destroy, null_check, set_last_error, FFIClientConfig,
+    FFIErrorCode, FFINetworkEventCallbacks, FFIProgressCallback, FFISyncEventCallbacks,
+    FFISyncProgress, FFIWalletEventCallbacks, FFIWalletManager,
+};
@@
 pub unsafe extern "C" fn dash_spv_ffi_sync_progress_destroy(progress: *mut FFISyncProgress) {
-    if !progress.is_null() {
-        let _ = Box::from_raw(progress);
-    }
+    dash_spv_ffi_manager_sync_progress_destroy(progress);
 }
🤖 Fix all issues with AI agents
In `@dash-spv-ffi/include/dash_spv_ffi.h`:
- Around line 178-361: The callback typedefs that accept pointer parameters
(e.g. OnManagerErrorCallback, OnPeerConnectedCallback,
OnTransactionReceivedCallback, OnBlockProcessedCallback,
OnInstantLockReceivedCallback, OnBlocksNeededCallback and the corresponding
fields in FFISyncEventCallbacks, FFINetworkEventCallbacks,
FFIWalletEventCallbacks) need explicit lifetime docs: annotate each typedef’s
comment to state that any const char* or const uint8_t(*)[32] pointer is
borrowed and only valid for the duration of the callback and that callers must
memcpy/duplicate any data they need to retain after the callback returns; update
the summary above each group (Sync/Network/Wallet event callbacks) to mention
this borrowed-pointer lifetime rule so users know to copy strings, hashes,
txids, and addresses if they persist them.
- Around line 621-703: The PR is missing tests for several new FFI entry points;
add Rust unit tests that construct a test FFIDashSpvClient, call and assert
successful return values for dash_spv_ffi_client_run,
dash_spv_ffi_client_get_manager_sync_progress,
dash_spv_ffi_client_set_network_event_callbacks,
dash_spv_ffi_client_set_wallet_event_callbacks, and
dash_spv_ffi_client_set_progress_callback (use simple no-op or counting
callbacks and ensure to call the corresponding clear functions afterwards), and
add C integration tests under dash-spv-ffi/tests/c_tests/ that mirror those
checks: create/initialize an FFIDashSpvClient, register no-op
FFINetworkEventCallbacks/FFIWalletEventCallbacks/FFIProgressCallback, call
dash_spv_ffi_client_run and dash_spv_ffi_client_get_manager_sync_progress to
validate expected return codes/state, then clear callbacks and teardown the
client; ensure tests assert non-error return codes and proper callback
invocation where applicable.

In `@dash-spv-ffi/src/client.rs`:
- Around line 1074-1135: The current
dash_spv_ffi_client_set_wallet_event_callbacks both stores callbacks and spawns
a monitor thread, causing duplicate monitors since dash_spv_ffi_client_run also
spawns one; change this function to only store the callbacks (i.e., set
client.wallet_event_callbacks) and remove the thread-spawning block, or add a
guard (e.g., a boolean like wallet_event_monitor_started or check an existing
handle in client.active_threads) so you never spawn a second monitor; ensure
dash_spv_ffi_client_run remains the sole place that starts the monitor and uses
client.wallet_event_callbacks to subscribe and push the spawned thread handle
into client.active_threads.

In `@dash-spv/src/network/manager.rs`:
- Around line 1083-1150: The code only handles NetworkMessage::GetHeaders when
deciding whether to upgrade/downgrade per-peer; update the outbound-message
match in send_distributed to also handle NetworkMessage::GetHeaders2: when the
incoming message is GetHeaders2, check the selected peer (peer.read().await)
with can_request_headers2() and headers2_disabled to decide whether to send
GetHeaders2 as-is or downgrade to NetworkMessage::GetHeaders; likewise keep the
existing GetHeaders branch behavior (upgrade to GetHeaders2 if peer supports
it). Modify the match arms that currently reference
NetworkMessage::GetHeaders(get_headers) so they include a second arm for
NetworkMessage::GetHeaders2(get_headers2) and perform the peer capability checks
and conversion there.

In `@dash-spv/src/storage/filters.rs`:
- Around line 12-18: The current FilterStorage::filter_tip_height() conflates
"no filters" with height 0; change its signature to async fn
filter_tip_height(&self) -> StorageResult<Option<u32>> (and adjust any impls of
the FilterStorage trait), update any places that currently call
filter_tip_height() and rely on unwrap_or(0) to instead handle None explicitly
(e.g., storage manager / sync progress), and ensure implementations of
load_filters and store_filter remain unchanged but return None when the store is
empty rather than 0 so callers can distinguish absent-tip from a real tip
height.

In `@dash-spv/src/sync/chainlock/sync_manager.rs`:
- Around line 51-63: The code marks ChainLock hashes in requested_chainlocks
before calling requests.request_inventory, so a failing request leaves them
permanently marked; change the flow to call
self.requests.request_inventory(chainlocks_to_request)? first and only upon Ok
iterate over chainlocks_to_request and insert hashes into
self.requested_chainlocks (or if you prefer, perform the insertion in a closure
after a successful call) so that marks are only set when the network request
succeeds; use the existing symbols chainlocks_to_request,
requests.request_inventory, and requested_chainlocks to locate and update the
logic.

In `@dash-spv/src/sync/filter_headers/pipeline.rs`:
- Around line 183-193: The loop currently removes stop_hashes with
self.coordinator.take_pending(count) but silently drops any stop_hash for which
self.batch_starts.get(&stop_hash) returns None; modify the error path so the
missing entry is re-queued instead of lost (e.g., call a coordinator reinsert
method such as self.coordinator.return_pending(stop_hash) or implement/rename to
self.coordinator.requeue(&stop_hash)) and then continue; alternatively change
the take_pending usage to peek or only remove after successful lookup and
request (ensure coordinator.mark_sent(&[stop_hash]) is still called only after
request_filter_headers succeeds). Ensure you update or add the matching
coordinator API used (return_pending/requeue) if it does not already exist.

In `@dash-spv/src/sync/filter_headers/sync_manager.rs`:
- Around line 74-77: The batch_end calculation can underflow when count == 0
(count comes from compute_filter_headers(...).len()); update the two places that
compute batch_end (where process_cfheaders(&data, start_height).await? returns
count and later where count is used again) to avoid start_height + count - 1
wrapping by using a saturating subtraction or asserting count > 0; for example,
replace the subtraction with count.saturating_sub(1) when computing batch_end or
add an assert!(count > 0) immediately after obtaining count in the scope around
process_cfheaders and the second occurrence to guarantee safety.

In `@dash-spv/src/sync/filters/manager.rs`:
- Around line 104-125: In load_filters, validate that loaded_filters and
loaded_headers lengths match the expected range before zipping to avoid silent
misalignment: compute expected_len = (end_height - start_height + 1) as usize,
call self.filter_storage.read().await.load_filters(...) and
self.header_storage.read().await.load_headers(...), then check
loaded_filters.len() == expected_len and loaded_headers.len() == expected_len
and return an appropriate SyncResult::Err (e.g. a storage/validation error) if
not; only after these checks iterate and construct FilterMatchKey and
BlockFilter to insert into the HashMap (do not rely on zip to detect
mismatches).

In `@dash-spv/src/sync/filters/sync_manager.rs`:
- Around line 113-119: The code silently drops CFilter entries when
header_storage.read().await.get_header_height_by_hash(&cfilter.block_hash).await
returns None; since CFilter is untrusted input, change the behavior in
sync_manager.rs (around the get_header_height_by_hash call) to treat a missing
header as a validation error rather than returning Ok(vec![]). Specifically,
when height is None for a given CFilter, log the condition with context
(including cfilter.block_hash), apply the appropriate penalization/metric for
invalid input, and return an Err (or a ValidationError variant used by the
module) so the filter sync does not silently proceed on data loss.

In `@dash-spv/src/sync/masternodes/manager.rs`:
- Around line 165-199: In verify_and_complete, don’t treat quorum verification
failures or missing masternode lists as benign: when
engine.verify_non_rotating_masternode_list_quorums(height, &[]) returns Err(e)
call self.set_state(SyncState::Error) and return
Err(SyncError::MasternodeSyncFailed(format!("Quorum verification failed at
height {}: {}", height, e))); likewise, if engine.masternode_lists.keys().last()
is None and is_initial_sync is true call self.set_state(SyncState::Error) and
return Err(SyncError::MasternodeSyncFailed("No masternode lists
available".into())); keep references to verify_and_complete,
engine.verify_non_rotating_masternode_list_quorums,
engine.masternode_lists.keys().last(), self.set_state, SyncState::Error and
SyncError::MasternodeSyncFailed to locate and update the logic (ensure any held
locks are dropped before returning if needed).

In `@dash-spv/src/sync/progress.rs`:
- Around line 44-99: The aggregate logic in state() and is_synced() omits the
ChainLock and InstantSend managers so overall status can wrongly report Synced
or hide Error; update both functions to include the chain lock and instant send
manager states by adding their entries (e.g. self.chainlocks.as_ref().map(|c|
c.state()) and self.instant_send.as_ref().map(|i| i.state())) into the same
array used to build states (the array currently containing headers,
filter_headers, filters, blocks, masternodes) so they are flattened/collected
and then participate in the existing priority checks and the is_synced()
all-Synced check.

In `@dash-spv/tests/header_sync_test.rs`:
- Around line 43-49: client.start() can spawn async managers so calling
client.sync_progress().headers() immediately is racy; change the test to poll
until headers() returns Ok and current_height() == 0 with a short timeout (use
tokio::time::timeout and std::time::Duration). Specifically, after
client.start().await.unwrap(), repeatedly call client.sync_progress().headers()
(or check headers().map(|h| h.current_height())) until it returns Ok(0) or the
timeout elapses, then assert the result; update imports to include
tokio::time::timeout and std::time::Duration.
🟡 Minor comments (19)
dash-spv/src/validation/filter.rs-352-381 (1)

352-381: ⚠️ Potential issue | 🟡 Minor

Test test_verify_noncontiguous_heights may be misleading.

This test passes because it builds expected_headers using the same chaining logic as the validator (each height's header becomes prev for the next sorted height). However, in real BIP158 semantics, a filter at height 20 must chain from filter_header[19], not filter_header[10].

If non-contiguous batches are unsupported (as suggested above), consider removing this test or renaming it to clarify it tests internal consistency rather than BIP158 correctness.

dash-spv/src/validation/filter.rs-46-63 (1)

46-63: ⚠️ Potential issue | 🟡 Minor

Non-contiguous heights would produce incorrect validation results.

The chain-building logic uses expected_headers[height] as the prev header for the next height in sorted order. For contiguous batches this is correct, but for non-contiguous heights (e.g., [10, 20, 30]), height 20's prev would incorrectly be filter_header[10] instead of filter_header[19].

If non-contiguous batches are not a supported use case, consider adding a contiguity check or documenting this assumption. If they should be supported, the caller would need to provide prev headers for each height - 1.

🛡️ Suggested contiguity validation
         // Sort keys by height to build chain correctly
         let mut heights: Vec<u32> = input.filters.keys().map(|k| k.height()).collect();
         heights.sort();
 
+        // Verify heights are contiguous (required for correct chain verification)
+        for window in heights.windows(2) {
+            if window[1] != window[0] + 1 {
+                return Err(ValidationError::InvalidFilterHeaderChain(
+                    format!("Non-contiguous heights detected: {} -> {}", window[0], window[1])
+                ));
+            }
+        }
+
         // Build prev_header map
dash-spv/src/sync/filters/progress.rs-6-25 (1)

6-25: ⚠️ Potential issue | 🟡 Minor

PartialEq derive with Instant field may cause unexpected behavior.

The struct derives PartialEq but contains last_activity: Instant. While Instant does implement PartialEq, comparing two FiltersProgress instances will almost never be equal due to differing timestamps, even if all other fields match. This could cause issues in tests or equality checks.

Consider either:

  1. Implementing PartialEq manually to exclude last_activity
  2. Removing PartialEq if not needed for equality comparisons
🔧 Manual PartialEq implementation excluding last_activity
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
 pub struct FiltersProgress {
     // ... fields ...
 }
+
+impl PartialEq for FiltersProgress {
+    fn eq(&self, other: &Self) -> bool {
+        self.state == other.state
+            && self.current_height == other.current_height
+            && self.target_height == other.target_height
+            && self.filter_header_tip_height == other.filter_header_tip_height
+            && self.downloaded == other.downloaded
+            && self.processed == other.processed
+            && self.matched == other.matched
+        // Intentionally exclude last_activity from comparison
+    }
+}
dash-spv/src/client/queries.rs-69-69 (1)

69-69: ⚠️ Potential issue | 🟡 Minor

Typo in variable name: masternode_endine.

Should be masternode_engine for consistency.

✏️ Fix typo
-        let masternode_endine = self.masternode_list_engine()?;
-        let masternode_engine_guard = masternode_endine.read().await;
+        let masternode_engine = self.masternode_list_engine()?;
+        let masternode_engine_guard = masternode_engine.read().await;
dash-spv/src/sync/blocks/progress.rs-127-142 (1)

127-142: ⚠️ Potential issue | 🟡 Minor

Display format label mismatch: "last_relevant" vs field name "last_processed".

The Display implementation shows last_relevant: but the field being displayed is last_processed. This could cause confusion when reading logs or status output.

🔧 Suggested fix
         write!(
             f,
-            "{:?} last_relevant: {} | requested: {}, from_storage: {}, downloaded: {}, processed: {}, relevant: {}, transactions: {}, last_activity: {}s",
+            "{:?} last_processed: {} | requested: {}, from_storage: {}, downloaded: {}, processed: {}, relevant: {}, transactions: {}, last_activity: {}s",
             self.state,
             self.last_processed,
dash-spv/src/client/lifecycle.rs-78-86 (1)

78-86: ⚠️ Potential issue | 🟡 Minor

Same expect() issue for filter-related storage references.

These expect() calls should also be converted to proper error handling for consistency and robustness.

🛠️ Suggested fix
-            let filter_headers_storage = storage
-                .filter_header_storage_ref()
-                .expect("Filters headers storage must exist if filters are enabled");
-            let filters_storage = storage
-                .filter_storage_ref()
-                .expect("Filters storage must exist if filters are enabled");
-            let blocks_storage = storage
-                .block_storage_ref()
-                .expect("Blocks storage must exist if filters are enabled");
+            let filter_headers_storage = storage.filter_header_storage_ref().ok_or_else(|| {
+                SpvError::Config("Filter headers storage must exist if filters are enabled".to_string())
+            })?;
+            let filters_storage = storage.filter_storage_ref().ok_or_else(|| {
+                SpvError::Config("Filters storage must exist if filters are enabled".to_string())
+            })?;
+            let blocks_storage = storage.block_storage_ref().ok_or_else(|| {
+                SpvError::Config("Blocks storage must exist if filters are enabled".to_string())
+            })?;
dash-spv/src/sync/masternodes/pipeline.rs-134-147 (1)

134-147: ⚠️ Potential issue | 🟡 Minor

Doc comment doesn't match implementation: handle_timeouts() returns nothing.

The doc comment states "Returns hashes that exceeded max retries and were dropped" but the function returns (). Either update the doc to match the implementation or change the return type.

📝 Suggested doc fix
     /// Handle timeouts, re-queuing failed requests.
-    ///
-    /// Returns hashes that exceeded max retries and were dropped.
     pub(super) fn handle_timeouts(&mut self) {

Or if the dropped hashes are needed by callers:

-    pub(super) fn handle_timeouts(&mut self) {
+    pub(super) fn handle_timeouts(&mut self) -> Vec<BlockHash> {
+        let mut dropped = Vec::new();
         for target_hash in self.coordinator.check_timeouts() {
             if !self.coordinator.enqueue_retry(target_hash) {
                 tracing::warn!(
                     "MnListDiff request for {} exceeded max retries, dropping",
                     target_hash
                 );
                 self.base_hashes.remove(&target_hash);
+                dropped.push(target_hash);
             }
         }
+        dropped
     }
dash-spv/src/client/lifecycle.rs-67-75 (1)

67-75: ⚠️ Potential issue | 🟡 Minor

Avoid expect() in library code; propagate errors instead.

The coding guidelines state to avoid unwrap() and expect() in library code. These expect() calls will panic if storage references are None, which could occur due to misconfiguration. Consider propagating a proper error.

🛠️ Suggested fix
-        let header_storage = storage.header_storage_ref().expect("Headers storage must exist");
+        let header_storage = storage
+            .header_storage_ref()
+            .ok_or_else(|| SpvError::Config("Headers storage must exist".to_string()))?;
         let checkpoints = match config.network {
             dashcore::Network::Dash => mainnet_checkpoints(),
             dashcore::Network::Testnet => testnet_checkpoints(),
             _ => Vec::new(),
         };
         let checkpoint_manager = Arc::new(CheckpointManager::new(checkpoints));

As per coding guidelines: "Avoid unwrap() and expect() in library code; use explicit error types."

dash-spv/src/sync/block_headers/progress.rs-92-99 (1)

92-99: ⚠️ Potential issue | 🟡 Minor

Doc comment mismatch and inconsistent bump_last_activity behavior.

Two issues:

  1. Line 92's doc comment says "Add a number to the buffered counter" but buffered() is a getter
  2. update_buffered() doesn't call bump_last_activity() unlike all other mutators in this struct and sibling progress types
🔧 Suggested fix
-    /// Add a number to the buffered counter.
+    /// Get the number of headers currently buffered in the pipeline.
     pub fn buffered(&self) -> u32 {
         self.buffered
     }
     /// Update the buffered counter.
     pub fn update_buffered(&mut self, count: u32) {
         self.buffered = count;
+        self.bump_last_activity();
     }
dash-spv/src/client/lifecycle.rs-103-106 (1)

103-106: ⚠️ Potential issue | 🟡 Minor

Same expect() issue for masternode engine.

🛠️ Suggested fix
-            let masternode_list_engine = masternode_engine
-                .clone()
-                .expect("Masternode list engine must exist if masternodes are enabled");
+            let masternode_list_engine = masternode_engine.clone().ok_or_else(|| {
+                SpvError::Config("Masternode list engine must exist if masternodes are enabled".to_string())
+            })?;
dash-spv/src/sync/block_headers/manager.rs-199-203 (1)

199-203: ⚠️ Potential issue | 🟡 Minor

Propagate storage errors instead of masking them.
Treating storage read failures as “header missing” hides I/O or corruption issues and can trigger unnecessary header requests.

🛠️ Suggested fix
@@
-                if let Ok(Some(_)) =
-                    self.header_storage.read().await.get_header_height_by_hash(block_hash).await
-                {
-                    continue;
-                }
+                if self
+                    .header_storage
+                    .read()
+                    .await
+                    .get_header_height_by_hash(block_hash)
+                    .await?
+                    .is_some()
+                {
+                    continue;
+                }
As per coding guidelines: Use proper error types (thiserror) and propagate errors appropriately.
dash-spv/src/sync/block_headers/segment_state.rs-114-152 (1)

114-152: ⚠️ Potential issue | 🟡 Minor

Fix tip/hash drift when stopping at checkpoint.
If a response includes headers past the checkpoint, current_tip_hash is set from headers.last() even though only processed headers were applied. That desyncs height/hash and can confuse later matching or logging. Track the last processed hash and update the tip from that.

🛠️ Suggested fix
@@
-        for header in headers {
+        let mut last_processed_hash = None;
+        for header in headers {
             let hashed = HashedBlockHeader::from(*header);
             let hash = *hashed.hash();
             let height = self.current_height + processed as u32 + 1;
@@
-                        self.buffered_headers.push(hashed);
-                        processed += 1;
-                        self.complete = true;
-                        break;
+                        self.buffered_headers.push(hashed);
+                        processed += 1;
+                        self.complete = true;
+                        last_processed_hash = Some(hash);
+                        break;
                     } else {
@@
-            self.buffered_headers.push(hashed);
-            processed += 1;
+            self.buffered_headers.push(hashed);
+            processed += 1;
+            last_processed_hash = Some(hash);
         }
 
-        if let Some(last) = headers.last() {
-            self.current_tip_hash = last.block_hash();
-            self.current_height += processed as u32;
-        }
+        if let Some(last_hash) = last_processed_hash {
+            self.current_tip_hash = last_hash;
+            self.current_height += processed as u32;
+        }
dash-spv/src/sync/blocks/sync_manager.rs-74-95 (1)

74-95: ⚠️ Potential issue | 🟡 Minor

Avoid buffering blocks before height resolution.
If header height lookup fails, the block has already been buffered, leaving the pipeline in an inconsistent state. Consider resolving height first (using the block hash) and only then buffering.

🛠️ Suggested reordering
@@
-        let hashed_block = HashedBlock::from(block);
-
-        // Check if this is a block we requested (pipeline handles buffering with height)
-        if !self.pipeline.receive_block(block) {
-            tracing::debug!("Received unrequested block {}", hashed_block.hash());
-            return Ok(vec![]);
-        }
-
-        // Look up height for storage
-        let height = self
-            .header_storage
-            .read()
-            .await
-            .get_header_height_by_hash(hashed_block.hash())
-            .await?
-            .ok_or_else(|| {
-                SyncError::InvalidState(format!(
-                    "Block {} has no stored header - cannot determine height",
-                    hashed_block.hash()
-                ))
-            })?;
+        let hash = block.block_hash();
+        // Resolve height first to avoid buffering a block we can't map to a header.
+        let height = self
+            .header_storage
+            .read()
+            .await
+            .get_header_height_by_hash(&hash)
+            .await?
+            .ok_or_else(|| {
+                SyncError::InvalidState(format!(
+                    "Block {} has no stored header - cannot determine height",
+                    hash
+                ))
+            })?;
+
+        // Check if this is a block we requested (pipeline handles buffering with height)
+        if !self.pipeline.receive_block(block) {
+            tracing::debug!("Received unrequested block {}", hash);
+            return Ok(vec![]);
+        }
+
+        let hashed_block = HashedBlock::from(block);
dash-spv/src/sync/block_headers/segment_state.rs-68-75 (1)

68-75: ⚠️ Potential issue | 🟡 Minor

Use the coordinator’s pending queue for retries
handle_timeouts pushes timed-out hashes into coordinator.pending, but send_request always resends current_tip_hash directly, so retries in the pending queue are never consumed. Modify send_request to pull the next hash from coordinator.pending (e.g., via take_pending) before calling mark_sent.

dash-spv/src/sync/block_headers/pipeline.rs-238-241 (1)

238-241: ⚠️ Potential issue | 🟡 Minor

Guard is_complete() against uninitialized pipeline.

With no segments, all() returns true and can report completion before init(). Consider gating on initialized.

🐛 Proposed fix
-        self.segments.iter().all(|s| s.complete && s.buffered_headers.is_empty())
+        self.initialized
+            && self.segments.iter().all(|s| s.complete && s.buffered_headers.is_empty())
dash-spv/src/sync/filters/pipeline.rs-182-229 (1)

182-229: ⚠️ Potential issue | 🟡 Minor

Increment filters_received only on new inserts. BatchTracker::insert_filter unconditionally inserts and doesn’t signal duplicates, so retries will inflate the counter. Change it to return a bool (or otherwise detect duplicate keys) and only increment filters_received when a new filter is actually inserted.

dash-spv/src/sync/filter_headers/pipeline.rs-267-281 (1)

267-281: ⚠️ Potential issue | 🟡 Minor

Memory leak: batch_starts not cleaned up for permanently failed batches.

When enqueue_retry returns false (max retries exceeded), the stop_hash remains in batch_starts indefinitely. The hash should be removed to prevent unbounded memory growth.

🐛 Proposed fix
     pub(super) fn handle_timeouts(&mut self) -> Vec<u32> {
         let mut failed = Vec::new();
         for stop_hash in self.coordinator.check_timeouts() {
             if !self.coordinator.enqueue_retry(stop_hash) {
                 if let Some(&start_height) = self.batch_starts.get(&stop_hash) {
                     tracing::warn!(
                         "CFHeaders batch at height {} exceeded max retries, dropping",
                         start_height
                     );
                     failed.push(start_height);
+                    self.batch_starts.remove(&stop_hash);
                 }
             }
         }
         failed
     }
dash-spv-ffi/src/bin/ffi_cli.rs-96-103 (1)

96-103: ⚠️ Potential issue | 🟡 Minor

Guard against null txid in the InstantLock callback.
This callback dereferences txid unconditionally; a null pointer would be UB. Add a fast null check before encoding. As per coding guidelines: Always validate inputs from untrusted sources.

🛡️ Proposed fix
 extern "C" fn on_instantlock_received(
     txid: *const [u8; 32],
     validated: bool,
     _user_data: *mut c_void,
 ) {
+    if txid.is_null() {
+        return;
+    }
     let txid_hex = unsafe { hex::encode(*txid) };
     println!("[Sync] InstantLock received: txid={} validated={}", txid_hex, validated);
 }
dash-spv-ffi/include/dash_spv_ffi.h-609-703 (1)

609-703: ⚠️ Potential issue | 🟡 Minor

Clarify callback struct lifetime: only user_data must remain valid

Callback structs are copied by value; only the user_data pointer inside needs to remain valid until callbacks are cleared.

Suggested wording tweak (apply to all set_*_callbacks)
- * - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared.
+ * - Callbacks are copied by value; only `user_data` must remain valid until callbacks are cleared.
🧹 Nitpick comments (30)
dash-spv/src/error.rs (1)

246-246: Consider adding test coverage for the new category.

The category mapping is correct, but test_sync_error_category doesn't verify the MasternodeSyncFailed variant returns "masternode".

💡 Suggested test addition

Add this assertion to the test at line 339:

         assert_eq!(SyncError::MissingDependency("test".to_string()).category(), "dependency");
+        assert_eq!(SyncError::MasternodeSyncFailed("test".to_string()).category(), "masternode");
+        assert_eq!(
+            SyncError::Headers2DecompressionFailed("test".to_string()).category(),
+            "headers2"
+        );
dash-spv/src/validation/filter.rs (1)

40-41: Consider adding configurable validation modes for consistency.

Based on learnings, validators in this crate should implement configurable validation modes (ValidationMode::None, ValidationMode::Basic, ValidationMode::Full). While filter validation may only have one meaningful mode currently, adding a ValidationMode parameter would maintain consistency with other validators and allow future flexibility (e.g., skipping validation in trusted scenarios).

dash-spv/src/client/chainlock.rs (1)

54-80: Drop the write lock before emitting the ChainLock event
Holding the write lock during self.emit_event can block handlers that access client state; capture the event, drop the lock, then call self.emit_event(event).

Suggested change
@@ dash-spv/src/client/chainlock.rs
-        // Emit ChainLock event
-        self.emit_event(SpvEvent::ChainLockReceived {
-            height: chainlock.block_height,
-            hash: chainlock.block_hash,
-        });
+        let event = SpvEvent::ChainLockReceived {
+            height: chainlock.block_height,
+            hash: chainlock.block_hash,
+        };
+        drop(state);
+
+        // Emit ChainLock event
+        self.emit_event(event);
key-wallet-manager/src/events.rs (1)

56-66: Consider including locked in the description for consistency.

The BalanceUpdated variant includes a locked field, but description() omits it. For debugging parity, consider adding it:

📝 Optional: Include locked in description
 WalletEvent::BalanceUpdated {
     spendable,
     unconfirmed,
     immature,
+    locked,
     ..
 } => {
     format!(
-        "BalanceUpdated(spendable={}, unconfirmed={}, immature={})",
-        spendable, unconfirmed, immature
+        "BalanceUpdated(spendable={}, unconfirmed={}, immature={}, locked={})",
+        spendable, unconfirmed, immature, locked
     )
 }
dash-spv/src/sync/filters/batch_tracker.rs (1)

31-36: Consider validating height bounds in insert_filter.

The method accepts any height without validating it's within the batch range. While callers may handle this, defensive validation could prevent subtle bugs.

🛡️ Optional defensive check
 pub(super) fn insert_filter(&mut self, height: u32, block_hash: BlockHash, filter_data: &[u8]) {
+    debug_assert!(height <= self.end_height, "height {} exceeds batch end_height {}", height, self.end_height);
     self.received.insert(height);
     let key = FilterMatchKey::new(height, block_hash);
     let filter = BlockFilter::new(filter_data);
     self.filters.insert(key, filter);
 }
dash-spv/src/test_utils/network.rs (1)

52-52: Consider extracting the broadcast channel buffer size to a constant.

The hardcoded 100 for the broadcast channel buffer is reasonable but could be more maintainable as a named constant.

♻️ Extract constant
+const MOCK_NETWORK_EVENT_BUFFER_SIZE: usize = 100;
+
 impl MockNetworkManager {
     /// Create a new mock network manager
     pub fn new() -> Self {
         let (request_tx, request_rx) = unbounded_channel();
         Self {
             // ...
-            network_event_sender: broadcast::Sender::new(100),
+            network_event_sender: broadcast::Sender::new(MOCK_NETWORK_EVENT_BUFFER_SIZE),
         }
     }
 }
dash-spv/src/sync/filters/batch.rs (1)

108-126: Document the non-standard PartialEq/Eq behavior prominently.

The PartialEq and Ord implementations compare only start_height, which is intentional for BTreeSet ordering but violates the typical invariant that equal structs have identical field values. While this is tested and documented in the test, consider adding a doc comment on the trait implementations to prevent confusion.

📝 Add clarifying documentation
+/// Note: Equality is based solely on `start_height` to support unique batch
+/// identification in ordered collections like `BTreeSet`. Two batches with
+/// the same `start_height` are considered equal regardless of other fields.
 impl PartialEq for FiltersBatch {
     fn eq(&self, other: &Self) -> bool {
         self.start_height == other.start_height
     }
 }
dash-spv-ffi/tests/integration/test_full_workflow.rs (1)

200-200: Assert the broadcast result instead of only printing it.
Line 200 prints the status; the test won’t fail on a failed broadcast.

✅ Suggested assertion
-            println!("Broadcast result: {}", result);
+            assert_eq!(
+                result,
+                FFIErrorCode::Success as i32,
+                "Broadcast transaction failed"
+            );
dash-spv/src/sync/instantsend/progress.rs (1)

5-18: Consider excluding last_activity from equality comparison.

The #[derive(PartialEq)] includes last_activity: Instant, which could cause unexpected behavior in tests or comparisons since Instant values are unlikely to match even for logically equivalent progress states.

Consider manual PartialEq implementation
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
 pub struct InstantSendProgress {
     // ...
 }
+
+impl PartialEq for InstantSendProgress {
+    fn eq(&self, other: &Self) -> bool {
+        self.state == other.state
+            && self.pending == other.pending
+            && self.valid == other.valid
+            && self.invalid == other.invalid
+        // Intentionally exclude last_activity from equality
+    }
+}
dash-spv/src/sync/masternodes/progress.rs (2)

7-7: PartialEq derive may be unreliable due to Instant field.

The Instant type doesn't provide meaningful equality—two Instant::now() calls are virtually never equal. This makes PartialEq unreliable for comparing MasternodesProgress instances (e.g., in tests or state change detection). Consider either:

  1. Excluding last_activity from equality checks via a manual PartialEq impl
  2. Removing PartialEq if not needed

71-75: Inconsistent monotonicity between update_current_height and update_target_height.

update_target_height (line 79) only updates if the new height is greater (monotonic increase), but update_current_height always overwrites. If this is intentional (e.g., to handle reorgs), consider adding a doc comment explaining the design choice. Otherwise, consider aligning the behavior.

dash-spv/src/sync/filter_headers/sync_manager.rs (1)

54-67: Duplicate sync completion logic.

The sync completion check (pipeline complete, state is Syncing, current >= target) and associated logging/event emission appear in two places (lines 54-67 and 127-139). Consider extracting to a private helper method to reduce duplication.

♻️ Suggested helper extraction
fn check_and_emit_sync_complete(&mut self) -> Option<SyncEvent> {
    if self.pipeline.is_complete()
        && self.state() == SyncState::Syncing
        && self.progress.current_height() >= self.progress.target_height()
    {
        self.set_state(SyncState::Synced);
        tracing::info!(
            "Filter header sync complete at height {}",
            self.progress.current_height()
        );
        Some(SyncEvent::FilterHeadersSyncComplete {
            tip_height: self.progress.current_height(),
        })
    } else {
        None
    }
}

Also applies to: 126-139

dash-spv/src/sync/chainlock/progress.rs (2)

45-48: Doc comment mismatch with method name.

The doc comment says "Number of ChainLocks dropped after max retries" but the getter is named invalid() and the field is invalid. This is inconsistent—either the doc should say "Number of ChainLocks that failed validation" to match the field semantics, or the naming should be more specific.

📝 Suggested doc fix
-    /// Number of ChainLocks dropped after max retries.
+    /// Number of ChainLocks that failed validation.
     pub fn invalid(&self) -> u32 {

6-6: Same PartialEq concern with Instant field.

As noted for MasternodesProgress, deriving PartialEq on a struct containing Instant may lead to unreliable equality comparisons.

dash-spv/src/sync/instantsend/sync_manager.rs (1)

91-95: tick() only prunes—no timeout handling unlike other managers.

Other sync managers (e.g., FilterHeadersManager) return timeout errors in tick() when requests exceed max retries. This manager only prunes old entries. This seems intentional since InstantSend is event-driven without a sync target, but consider adding a brief doc comment explaining why no timeout handling is needed here.

dash-spv/src/network/mod.rs (2)

111-121: Hardcoded extra_share: true in request_qr_info.

The extra_share parameter is hardcoded to true. The existing request_qr_info method on NetworkManager trait (lines 174-200) accepts extra_share as a parameter. Consider parameterizing this to maintain consistency and flexibility.

♻️ Suggested fix
     pub fn request_qr_info(
         &self,
         known_block_hashes: Vec<BlockHash>,
         target_block_hash: BlockHash,
+        extra_share: bool,
     ) -> NetworkResult<()> {
         self.send_message(NetworkMessage::GetQRInfo(GetQRInfo {
             base_block_hashes: known_block_hashes,
             block_request_hash: target_block_hash,
-            extra_share: true,
+            extra_share,
         }))
     }

41-46: Single-variant enum may benefit from a doc comment about future extensibility.

NetworkRequest currently has only SendMessage. If there are plans for variants like BroadcastToAll or SendToPeer(SocketAddr, NetworkMessage), a brief doc comment noting the extensibility intent would help maintainers.

dash-spv/src/sync/instantsend/manager.rs (2)

170-181: O(n) removal from front of Vec in queue_pending.

Vec::remove(0) is O(n) because it shifts all elements. If this becomes a hot path, consider using VecDeque which provides O(1) pop_front().

♻️ Using VecDeque for efficient FIFO operations
+use std::collections::VecDeque;
 
-    pending_instantlocks: Vec<PendingInstantLock>,
+    pending_instantlocks: VecDeque<PendingInstantLock>,
 
     fn queue_pending(&mut self, pending: PendingInstantLock) {
         if self.pending_instantlocks.len() >= MAX_PENDING_INSTANTLOCKS {
-            let dropped = self.pending_instantlocks.remove(0);
+            let dropped = self.pending_instantlocks.pop_front().unwrap();
             // ... rest unchanged
         }
-        self.pending_instantlocks.push(pending);
+        self.pending_instantlocks.push_back(pending);
     }

183-195: O(n) scan to find oldest entry when cache exceeds limit.

store_instantlock iterates the entire HashMap to find the oldest entry on every insert when at capacity. For a cache of 5000 entries, this could become a performance concern. Consider using an LRU cache (e.g., lru crate) or IndexMap with insertion-order tracking for O(1) eviction.

dash-spv/src/client/core.rs (1)

109-116: sync_coordinator uses concrete storage types rather than generics.

The sync_coordinator field uses concrete Persistent*Storage types while the struct itself is generic over S: StorageManager. This creates a coupling between the coordinator and specific storage implementations. If this is intentional (e.g., the coordinator needs specific storage features), consider documenting this design decision. Otherwise, it may limit the testability benefit described in the struct's documentation.

dash-spv/src/network/manager.rs (1)

669-716: Bound request-processor concurrency to avoid task spikes.
An unbounded queue plus a spawn-per-request strategy can create large task bursts under load. Consider a bounded channel or a semaphore to cap concurrent sends.

🛠️ Example approach (requires a request_semaphore field)
@@
-        let this = self.clone();
+        let this = self.clone();
+        let request_semaphore = self.request_semaphore.clone();
@@
-                                let this = this.clone();
-                                tokio::spawn(async move {
+                                let this = this.clone();
+                                let permit = request_semaphore.acquire_owned().await;
+                                tokio::spawn(async move {
+                                    let _permit = permit;
                                     let result = match &msg {
                                         // Distribute across peers for parallel sync
dash-spv/src/sync/filters/sync_manager.rs (1)

193-194: Track the TODO with a follow-up issue.
If you want, I can help sketch a decoupled send/processing flow for this TODO.

dash-spv/src/sync/blocks/pipeline.rs (1)

16-27: Make download limits configurable instead of hardcoded.
MAX_CONCURRENT_BLOCK_DOWNLOADS, BLOCK_TIMEOUT, BLOCK_MAX_RETRIES, and BLOCKS_PER_REQUEST are network-tuning knobs; consider wiring them through ClientConfig or a sync settings struct.

As per coding guidelines: Never hardcode network parameters, addresses, or keys.

dash-spv/src/sync/progress.rs (1)

101-109: Revisit overall percentage calculation.

percentage() only averages headers/filter headers/filters and defaults missing managers to 1.0, so overall progress can show 100% while blocks/masternodes/chainlocks/instantsend are still active. If intentional, please document it; otherwise consider averaging only started managers (and include any additional progress types that expose percentage()).

💡 Possible refactor
-        let percentages = [
-            self.headers.as_ref().map(|h| h.percentage()).unwrap_or(1.0),
-            self.filter_headers.as_ref().map(|f| f.percentage()).unwrap_or(1.0),
-            self.filters.as_ref().map(|f| f.percentage()).unwrap_or(1.0),
-        ];
-        percentages.iter().sum::<f64>() / percentages.len() as f64
+        let mut percentages = Vec::new();
+        if let Some(h) = &self.headers { percentages.push(h.percentage()); }
+        if let Some(fh) = &self.filter_headers { percentages.push(fh.percentage()); }
+        if let Some(f) = &self.filters { percentages.push(f.percentage()); }
+        if percentages.is_empty() { return 0.0; }
+        percentages.iter().sum::<f64>() / percentages.len() as f64
dash-spv/src/storage/block_headers.rs (1)

53-62: Track the TODO’d API change explicitly.

The trait-level TODO suggests a significant API shift. Consider filing an issue or adding a short rationale/timeline so it doesn’t get lost.

If you want, I can draft an issue template or outline a migration plan.

dash-spv/src/sync/download_coordinator.rs (2)

194-204: check_and_retry_timeouts silently drops items that exceeded max retries.

The method calls enqueue_retry but ignores its return value, so callers have no way to know which items permanently failed. Consider either returning failed items separately or logging them.

♻️ Proposed fix to return failed items
     /// Check for timed-out items and re-enqueue them for retry.
     ///
     /// Combines `check_timeouts()` and `enqueue_retry()` in one call.
-    /// Returns the items that were re-queued.
-    pub(crate) fn check_and_retry_timeouts(&mut self) -> Vec<K> {
+    /// Returns a tuple of (re-queued items, permanently failed items).
+    pub(crate) fn check_and_retry_timeouts(&mut self) -> (Vec<K>, Vec<K>) {
         let timed_out = self.check_timeouts();
+        let mut requeued = Vec::new();
+        let mut failed = Vec::new();
         for item in &timed_out {
-            let _ = self.enqueue_retry(item.clone());
+            if self.enqueue_retry(item.clone()) {
+                requeued.push(item.clone());
+            } else {
+                failed.push(item.clone());
+            }
         }
-        timed_out
+        (requeued, failed)
     }

74-76: last_progress field is tracked but never used.

The field is updated in receive() and clear() but there's no getter or stall-detection logic. If this is intended for future stall detection, consider either implementing it or removing the field to reduce confusion.

dash-spv/src/sync/filter_headers/pipeline.rs (1)

23-25: Duplicate doc comment.

Lines 23-24 contain overlapping documentation for the same constant.

🧹 Proposed fix
-/// Timeout for CFHeaders requests (shorter for faster retry on multi-peer).
-/// Timeout for CFHeaders requests. Single response but allow time for network latency.
+/// Timeout for CFHeaders requests. Allow time for network latency on single response.
 const FILTER_HEADERS_TIMEOUT: Duration = Duration::from_secs(20);
dash-spv/src/sync/sync_coordinator.rs (1)

329-331: Unnecessary identity map in stream transformation.

The .map(move |p| p) is a no-op that adds overhead.

🧹 Proposed fix
     let streams: Vec<_> =
-        receivers.into_iter().map(|rx| WatchStream::new(rx).map(move |p| p)).collect();
+        receivers.into_iter().map(WatchStream::new).collect();
dash-spv-ffi/include/dash_spv_ffi.h (1)

513-574: Disambiguate which destroy helper matches each sync-progress getter.

Two getters return the same FFISyncProgress type, but there are two destroy helpers. Without an explicit contract, consumers can easily pick the wrong destroy path, risking leaks or double-frees. Consider a distinct type or tighten the docs to make the intended pairing unambiguous.

📌 Doc clarification example
 /**
  * Get the current sync progress snapshot.
  *
  * # Safety
  * - `client` must be a valid, non-null pointer.
+ *
+ * # Returns
+ * - Use `dash_spv_ffi_sync_progress_destroy` to free the snapshot from this API.
+ * - Per-manager fields in `FFISyncProgress` are always NULL for this function.
  */
 struct FFISyncProgress *dash_spv_ffi_client_get_sync_progress(struct FFIDashSpvClient *client);

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
dash-spv/src/client/chainlock.rs (1)

31-44: ⚠️ Potential issue | 🟠 Major

Drop the storage lock before awaiting network operations.

The storage lock is held unnecessarily during the penalize_peer_invalid_chainlock await (lines 34–45), blocking other storage users. While the current network implementation doesn't access storage, this pattern violates the lock-dropping conventions used elsewhere in the codebase (e.g., status_display.rs lines 103–106). Move the network penalization outside the lock scope:

Suggested fix
-        {
-            let mut storage = self.storage.lock().await;
-            if let Err(e) = self
-                .chainlock_manager
-                .process_chain_lock(chainlock.clone(), &chain_state, &mut *storage)
-                .await
-            {
-                // Penalize the peer that relayed the invalid ChainLock
-                let reason = format!("Invalid ChainLock: {}", e);
-                self.network.penalize_peer_invalid_chainlock(peer_address, &reason).await;
-                return Err(SpvError::Validation(e));
-            }
-        }
+        let process_result = {
+            let mut storage = self.storage.lock().await;
+            self.chainlock_manager
+                .process_chain_lock(chainlock.clone(), &chain_state, &mut *storage)
+                .await
+        };
+        if let Err(e) = process_result {
+            // Penalize the peer that relayed the invalid ChainLock
+            let reason = format!("Invalid ChainLock: {}", e);
+            self.network
+                .penalize_peer_invalid_chainlock(peer_address, &reason)
+                .await;
+            return Err(SpvError::Validation(e));
+        }
dash-spv-ffi/src/bin/ffi_cli.rs (1)

265-303: ⚠️ Potential issue | 🟡 Minor

Help text doesn’t match the actual default data dir.
The help says “unique directory in /tmp” but the default is .tmp/ffi-cli. Either adjust the help or change the default.

📝 Suggested doc fix
-                .help("Data directory for storage (default: unique directory in /tmp)"),
+                .help("Data directory for storage (default: .tmp/ffi-cli)"),
🤖 Fix all issues with AI agents
In `@dash-spv-ffi/dash_spv_ffi.h`:
- Around line 339-342: The header exposes two FFI declarations
dash_spv_ffi_client_start_sync and dash_spv_ffi_client_start_sync_with_progress
that have no Rust implementations and will cause linker errors; either remove
these declarations from dash_spv_ffi.h so they are not publicly exposed, or add
corresponding Rust extern "C" implementations with the matching signatures
(including the completion_callback and user_data parameters) so the symbols are
emitted; update the header to reflect only functions that have working Rust
implementations and ensure the function names match exactly between the header
and Rust exports.

In `@dash-spv/src/client/lifecycle.rs`:
- Around line 224-233: The client sets the `running` flag before attempting to
start the sync coordinator and connect to the network, but if either
`self.sync_coordinator.start(&mut self.network).await` or
`self.network.connect().await` fails the flag remains true and future `start()`
calls will mistakenly see the client as running; fix by either moving the
`running` update to after both operations succeed or by resetting `self.running`
to false inside each error branch (e.g., in the `if let Err(e)` block that
returns `SpvError::Sync` and after the `?`-style connect error), ensuring
`self.running` reflects the actual running state when `start()` returns an
error.
- Around line 67-100: The code currently calls .expect(...) on storage refs
(header_storage, filter_header_storage_ref, filter_storage_ref,
block_storage_ref) which panics on missing components; change each .expect to
propagate a typed error by converting the Option/Result into a Result using the
? operator or .ok_or_else(...) to return SpvError::Config with the messages
indicated (e.g., "Headers storage must exist", "Filters headers storage must
exist if filters are enabled", "Filters storage must exist if filters are
enabled", "Blocks storage must exist if filters are enabled"), then update the
surrounding initialization (BlockHeadersManager::new, FilterHeadersManager::new,
FiltersManager::new, BlocksManager::new and assignments to
managers.block_headers/managers.filter_headers/managers.filters/managers.blocks)
to use the unwrapped values; since the function returns Result<Self>, this will
propagate configuration errors instead of panicking.

In `@dash-spv/src/client/queries.rs`:
- Around line 69-70: Rename the misspelled local variable masternode_endine to
masternode_engine where it's assigned from self.masternode_list_engine()? and
update the subsequent read guard variable initialization
(masternode_engine_guard) to reference the corrected masternode_engine; ensure
all occurrences in the function use the new name so the call to
masternode_engine.read().await compiles and is consistent with the
masternode_list_engine() result.

In `@dash-spv/src/sync/block_headers/manager.rs`:
- Around line 74-90: The debug-only check in store_headers allows empty headers
in release builds; update the start of store_headers(&mut self, headers:
&[HashedBlockHeader]) to explicitly guard against an empty slice (either return
early with Ok(current tip) or use a non-debug assertion) before calling
BlockHeaderValidator::new().validate(...) and before interacting with
header_storage, then ensure progress and tip logic (tip(),
progress.update_current_height, progress.add_processed) are only executed when
headers is non-empty so no silent no-op occurs in release builds.

In `@dash-spv/src/sync/block_headers/progress.rs`:
- Around line 92-99: The doc comment for the buffered() getter is incorrect and
update_buffered() should mirror other mutators by bumping last activity; change
the doc comment on buffered() to reflect that it returns the buffered counter
(e.g., "Return the buffered counter.") and modify update_buffered(&mut self,
count: u32) to set self.buffered = count and call self.bump_last_activity()
after updating (consistent with update_current_height, update_target_height, and
add_processed); locate these by the symbols buffered(), update_buffered(), and
bump_last_activity().

In `@dash-spv/src/sync/block_headers/segment_state.rs`:
- Around line 170-182: In handle_timeouts, you're ignoring enqueue_retry's
boolean return when retries are exhausted; update handle_timeouts to check the
return value from self.coordinator.enqueue_retry(hash) and when it returns false
emit an error-level log including self.segment_id and hash and mark this segment
as failed/needs recovery (e.g., call an existing method like
self.mark_failed(...) or set a require_recovery flag on the SegmentState) so the
dropped request is not silent; use symbols check_timeouts, enqueue_retry,
handle_timeouts, segment_id, and coordinator to locate the changes.

In `@dash-spv/src/sync/events.rs`:
- Around line 12-19: The doc comment for the SyncStart enum variant incorrectly
describes stored block headers; update the comment to state that a manager has
started a sync operation (e.g., "A sync has started.") and keep/clarify the
existing field docs for identifier: ManagerIdentifier; modify the
emitted/consumed lines if needed so they accurately reflect which components
emit or consume SyncStart (reference the SyncStart variant and ManagerIdentifier
type in events.rs).

In `@dash-spv/src/sync/filter_headers/sync_manager.rs`:
- Around line 48-144: The code uses debug_assert!(count > 0) in handle_message
after calling process_cfheaders, which is removed in release builds; replace
both occurrences with an explicit runtime check that returns
Err(SyncError::Network(...)) when count == 0 to treat an empty CFHeaders batch
as a protocol violation. Locate the two debug_asserts in handle_message (after
calls to self.process_cfheaders(&data, start_height) and
self.process_cfheaders(&data, height)) and change them to check if count == 0
and return Err(SyncError::Network("CFHeaders batch contained no
headers".to_string())) so state and events remain consistent. Ensure the
function signature and error propagation remain compatible with SyncResult so
the Err is returned from handle_message.

In `@dash-spv/src/sync/sync_manager.rs`:
- Around line 249-300: The error branches for sync_event_receiver.recv() and
context.network_event_receiver.recv() incorrectly break on any recv error;
update the Err(error) handling to match broadcast::RecvError and only break when
it is Closed, while logging and continuing when it is Lagged(n). Specifically,
in the sync_event_receiver.recv() and context.network_event_receiver.recv()
match arms, inspect the recv error (from broadcast::Receiver::recv) and on
RecvError::Lagged(n) call tracing::warn/debug with the lag count and continue
the loop, but on RecvError::Closed break the loop; keep the existing tracing
messages and the surrounding calls to handle_sync_event and handle_network_event
unchanged.

In `@dash-spv/src/validation/filter.rs`:
- Around line 51-63: The prev_headers construction incorrectly chains only over
input.filters' heights, allowing gaps to inherit the last seen header; change
the logic to iterate over sorted keys from input.expected_headers (not
input.filters) starting from input.prev_filter_header, ensure heights are
contiguous (reject or return an error on gaps) while inserting into
prev_headers, and update the noncontiguous test to expect rejection when gaps
exist; reference the variables prev_headers, input.prev_filter_header,
input.expected_headers, and the heights collection when locating where to change
the loop.

In `@key-wallet-manager/src/wallet_manager/mod.rs`:
- Around line 525-546: The current loop over check_result.affected_accounts uses
account_match.account_type_match.account_index().unwrap_or(0) which can
misattribute events; change the logic in the loop that builds and sends
WalletEvent::TransactionReceived (the block that calls account_index =
account_type_match.account_index().unwrap_or(0) and then
self.event_sender.send(event)) to skip entries where
account_type_match.account_index() returns None (or alternatively emit a
wallet-level event instead of TransactionReceived for those non-indexed matches)
so only true indexed accounts produce TransactionReceived events and non-indexed
matches are handled explicitly.
🧹 Nitpick comments (20)
dash-spv/src/validation/filter.rs (1)

16-38: Consider threading ValidationMode into FilterValidator.

Validation always runs the full check right now; if the client supports ValidationMode::None/Basic/Full, this should be gated or documented.

Based on learnings: Implement configurable validation modes (ValidationMode::None, ValidationMode::Basic, ValidationMode::Full).

dash-spv/src/sync/filters/batch.rs (1)

108-126: Consider documenting the equality semantic in the struct docstring.

PartialEq and Eq comparing only start_height means batches with different end_height or filters are considered equal if they share the same start_height. While this is intentional for BTreeSet ordering (and tested at line 186), this non-structural equality can surprise callers.

Consider adding a note to the struct's doc comment clarifying this behavior.

📝 Suggested doc addition
 /// A completed batch of compact block filters ready for verification.
 ///
 /// Represents a contiguous range of filters that have all been received
 /// and can now be verified against their expected filter headers.
 /// Ordered by start_height for sequential processing.
+///
+/// **Note:** Equality and ordering are based solely on `start_height` to support
+/// `BTreeSet` usage. Two batches with the same `start_height` are considered equal
+/// regardless of other fields.
 #[derive(Debug)]
 pub(super) struct FiltersBatch {
dash-spv/src/sync/filters/batch_tracker.rs (1)

52-55: Minor: Potential truncation when casting usize to u32.

On 64-bit systems, if received.len() exceeds u32::MAX, the cast truncates. In practice, filter batches are much smaller, so this is unlikely to be an issue.

🔧 Optional: Use saturating or checked conversion
     pub(super) fn received(&self) -> u32 {
-        self.received.len() as u32
+        self.received.len().try_into().unwrap_or(u32::MAX)
     }
dash-spv/src/client/sync_coordinator.rs (1)

60-62: Consider using debug! level for frequent progress updates.

Logging at info! level on every progress change may produce excessive output during sync. Consider debug! or trace! level, or throttle the logging.

🔧 Suggested change
                 _ = progress_updates.changed() => {
-                    tracing::info!("Sync progress:{}", *progress_updates.borrow());
+                    tracing::debug!("Sync progress:{}", *progress_updates.borrow());
                 }
dash-spv/src/sync/download_coordinator.rs (1)

198-204: Consider returning items that exceeded max retries.

check_and_retry_timeouts discards the bool result from enqueue_retry, silently dropping items that exceed max_retries. Callers may want to know which items permanently failed.

🔧 Suggested enhancement
-    /// Check for timed-out items and re-enqueue them for retry.
-    ///
-    /// Combines `check_timeouts()` and `enqueue_retry()` in one call.
-    /// Returns the items that were re-queued.
-    pub(crate) fn check_and_retry_timeouts(&mut self) -> Vec<K> {
+    /// Check for timed-out items and re-enqueue them for retry.
+    ///
+    /// Combines `check_timeouts()` and `enqueue_retry()` in one call.
+    /// Returns (re-queued items, permanently failed items).
+    pub(crate) fn check_and_retry_timeouts(&mut self) -> (Vec<K>, Vec<K>) {
         let timed_out = self.check_timeouts();
+        let mut requeued = Vec::new();
+        let mut failed = Vec::new();
         for item in &timed_out {
-            let _ = self.enqueue_retry(item.clone());
+            if self.enqueue_retry(item.clone()) {
+                requeued.push(item.clone());
+            } else {
+                failed.push(item.clone());
+            }
         }
-        timed_out
+        (requeued, failed)
     }
dash-spv/src/sync/blocks/pipeline.rs (1)

133-156: Minor documentation inconsistency.

Line 152 references queue_with_heights() but only queue() exists (which does track heights). The fallback path handles direct coordinator.enqueue() usage.

📝 Suggested comment fix
         } else {
             // Block was tracked by coordinator but not by height mapping.
-            // This can happen if queue() was used instead of queue_with_heights().
+            // This can happen if coordinator.enqueue() was called directly instead of queue().
             self.completed_count += 1;
             true
         }
dash-spv-ffi/tests/integration/test_full_workflow.rs (1)

67-91: Remove unused callback definitions.

The callbacks on_sync_progress and on_sync_complete are defined but never registered or used after the migration away from callback-based synchronization. The variables sync_completed and errors (lines 68-69) are also unused since the callbacks that would use them are never invoked.

♻️ Suggested cleanup
     unsafe {
         let mut ctx = IntegrationTestContext::new(FFINetwork::Regtest);
 
-        // Set up callbacks
-        let sync_completed = ctx.sync_completed.clone();
-        let errors = ctx.errors.clone();
-
-        extern "C" fn on_sync_progress(progress: f64, msg: *const c_char, user_data: *mut c_void) {
-            let ctx = unsafe { &*(user_data as *const IntegrationTestContext) };
-            if progress >= 100.0 {
-                ctx.sync_completed.store(true, Ordering::SeqCst);
-            }
-
-            if !msg.is_null() {
-                let msg_str = unsafe { CStr::from_ptr(msg).to_str().unwrap() };
-                ctx.events.lock().unwrap().push(format!("Progress {:.1}%: {}", progress, msg_str));
-            }
-        }
-
-        extern "C" fn on_sync_complete(success: bool, error: *const c_char, user_data: *mut c_void) {
-            let ctx = unsafe { &*(user_data as *const IntegrationTestContext) };
-            ctx.sync_completed.store(true, Ordering::SeqCst);
-
-            if !success && !error.is_null() {
-                let error_str = unsafe { CStr::from_ptr(error).to_str().unwrap() };
-                ctx.errors.lock().unwrap().push(error_str.to_string());
-            }
-        }
-
         // Start the client
         let result = dash_spv_ffi_client_start(ctx.client);
key-wallet-manager/src/events.rs (1)

56-65: Include locked in the BalanceUpdated description for completeness.

♻️ Suggested tweak
-            WalletEvent::BalanceUpdated {
-                spendable,
-                unconfirmed,
-                immature,
-                ..
-            } => {
-                format!(
-                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={})",
-                    spendable, unconfirmed, immature
-                )
-            }
+            WalletEvent::BalanceUpdated {
+                spendable,
+                unconfirmed,
+                immature,
+                locked,
+                ..
+            } => {
+                format!(
+                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={}, locked={})",
+                    spendable, unconfirmed, immature, locked
+                )
+            }
dash-spv/src/sync/instantsend/manager.rs (1)

183-195: Consider using a more efficient eviction strategy for the cache.

The current O(n) scan to find the oldest entry on every insert above capacity could become a performance concern at MAX_CACHE_SIZE=5000. Consider using an IndexMap or a separate BTreeMap<Instant, Txid> for O(log n) eviction, or accept the current approach if cache churn is expected to be low.

♻️ Optional: Use IndexMap for O(1) oldest removal
+use indexmap::IndexMap;
+
 pub struct InstantSendManager {
     pub(super) progress: InstantSendProgress,
     engine: Arc<RwLock<MasternodeListEngine>>,
-    instantlocks: HashMap<Txid, InstantLockEntry>,
+    instantlocks: IndexMap<Txid, InstantLockEntry>,
     pending_instantlocks: Vec<PendingInstantLock>,
 }

 fn store_instantlock(&mut self, txid: Txid, entry: InstantLockEntry) {
     self.instantlocks.insert(txid, entry);
 
     if self.instantlocks.len() > MAX_CACHE_SIZE {
-        let oldest =
-            self.instantlocks.iter().min_by_key(|(_, e)| e.received_at).map(|(k, _)| *k);
-        if let Some(key) = oldest {
-            self.instantlocks.remove(&key);
-        }
+        self.instantlocks.shift_remove_index(0);
     }
 }
dash-spv/ARCHITECTURE.md (2)

116-160: Add language specifier to code block at line 116.

The static analysis tool flagged this code block as missing a language specifier. While this is a minor documentation issue, adding a specifier improves syntax highlighting and consistency.

📝 Add language specifier
-```
+```text
 ┌──────────────────────────────────────────────────────────────────────────┐

999-1042: Add language specifiers to code blocks for consistency.

Similar to the earlier block, these code blocks at lines 999 and 1074 should have language specifiers (e.g., text for ASCII diagrams or directory structures).

Also applies to: 1074-1082

dash-spv/src/storage/block_headers.rs (1)

153-183: Consider avoiding variable shadowing for clarity.

The let mut height = height; on line 173 shadows the function parameter. While this is valid Rust and functionally correct, it can reduce readability. Consider renaming to current_height or similar.

♻️ Rename to avoid shadowing
     async fn store_hashed_headers_at_height(
         &mut self,
         headers: &[HashedBlockHeader],
         height: u32,
     ) -> StorageResult<()> {
-        let mut height = height;
+        let mut current_height = height;
 
-        self.block_headers.write().await.store_items_at_height(headers, height).await?;
+        self.block_headers.write().await.store_items_at_height(headers, current_height).await?;
 
         for header in headers {
-            self.header_hash_index.insert(*header.hash(), height);
-            height += 1;
+            self.header_hash_index.insert(*header.hash(), current_height);
+            current_height += 1;
         }
 
         Ok(())
     }
dash-spv/src/sync/masternodes/pipeline.rs (1)

65-74: Consider logging queued count, not total map size.

The log at line 72 reports base_hashes.len() after insertion, which reflects the total accumulated entries rather than the count just queued in this call. This could be misleading when queue_requests is called multiple times.

♻️ Suggested improvement
     pub(super) fn queue_requests(&mut self, requests: Vec<(BlockHash, BlockHash)>) {
+        let count = requests.len();
         for (base_hash, target_hash) in requests {
             self.coordinator.enqueue([target_hash]);
             self.base_hashes.insert(target_hash, base_hash);
         }

-        if !self.base_hashes.is_empty() {
-            tracing::info!("Queued {} MnListDiff requests", self.base_hashes.len());
+        if count > 0 {
+            tracing::info!("Queued {} MnListDiff requests (total: {})", count, self.base_hashes.len());
         }
     }
dash-spv/src/network/mod.rs (1)

111-121: Consider parameterizing extra_share in request_qr_info.

The extra_share flag is hardcoded to true. If callers might need different behavior, consider adding it as a parameter. However, if true is always the correct value for this SPV client's use case, the current approach is acceptable.

♻️ Optional: parameterize extra_share
     pub fn request_qr_info(
         &self,
         known_block_hashes: Vec<BlockHash>,
         target_block_hash: BlockHash,
+        extra_share: bool,
     ) -> NetworkResult<()> {
         self.send_message(NetworkMessage::GetQRInfo(GetQRInfo {
             base_block_hashes: known_block_hashes,
             block_request_hash: target_block_hash,
-            extra_share: true,
+            extra_share,
         }))
     }
dash-spv/src/client/core.rs (1)

102-126: Document the architectural separation between generic StorageManager and trait-specific storage implementations in sync_coordinator.

The struct maintains two distinct storage layers: the generic storage: Arc<Mutex<S>> parameter (high-level StorageManager interface) and the sync_coordinator's concrete Persistent*Storage implementations (specific traits like BlockHeaderStorage, FilterHeaderStorage). This separation is intentional—the coordinator manages specialized storage concerns while the generic S provides client-level storage operations. Add a doc comment explaining this separation of concerns and clarifying when each storage layer is used to prevent confusion for maintainers.

dash-spv/src/network/manager.rs (2)

104-106: Consider bounded channel for request queue.

The unbounded channel can grow indefinitely if request production outpaces consumption, potentially causing memory pressure under sustained load. A bounded channel with backpressure would be more resilient.

♻️ Optional: Use bounded channel with backpressure
-        // Create request queue for outgoing messages
-        let (request_tx, request_rx) = unbounded_channel();
+        // Create request queue for outgoing messages with bounded capacity
+        let (request_tx, request_rx) = tokio::sync::mpsc::channel(1000);

This would require updating RequestSender to handle SendError from the bounded channel.


1099-1117: Multiple lock acquisitions per peer in headers2 selection loop.

Inside the loop (lines 1103-1111), both peer.read().await and self.headers2_disabled.lock().await are acquired for each peer. This could be optimized by acquiring headers2_disabled once before the loop.

♻️ Proposed optimization
             NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_) => {
                 // Prefer headers2 peers, fall back to all
+                let disabled = self.headers2_disabled.lock().await;
                 let headers2_peers: Vec<_> = {
                     let mut result = Vec::new();
                     for (addr, peer) in &peers {
                         let peer_guard = peer.read().await;
                         if peer_guard.supports_headers2()
-                            && !self.headers2_disabled.lock().await.contains(addr)
+                            && !disabled.contains(addr)
                         {
                             result.push((*addr, peer.clone()));
                         }
                     }
                     result
                 };
+                drop(disabled);
dash-spv/src/sync/chainlock/manager.rs (1)

127-145: Silently accepting ChainLock on storage error may mask issues.

When storage fails (line 135-143), the ChainLock is accepted with a warning log. While the comment explains the rationale (verify when header arrives), this could allow invalid ChainLocks to propagate if storage is persistently failing.

Consider tracking unverified ChainLocks separately for later validation, or adding a counter/metric for storage failures to aid debugging.

dash-spv/src/sync/progress.rs (1)

101-109: Percentage calculation excludes blocks/masternodes/chainlocks/instantsend.

The percentage() method only averages headers, filter_headers, and filters progress. This may be intentional since those represent the "main" sync progress, but it could be misleading if other managers are still working.

Consider documenting why only these three components are included, or adding a separate method for full progress calculation.

dash-spv-ffi/src/client.rs (1)

22-101: Avoid panics from poisoned callback mutexes in monitor threads.
callbacks.lock().unwrap() will panic on poisoning and silently kill the monitoring thread. Consider handling lock errors and logging instead so event dispatch remains resilient.

🛠️ Suggested hardening (apply similarly in both monitors)
-                                let guard = callbacks.lock().unwrap();
-                                if let Some(ref cb) = *guard {
-                                    dispatch_fn(cb, &event);
-                                }
+                                if let Ok(guard) = callbacks.lock() {
+                                    if let Some(ref cb) = *guard {
+                                        dispatch_fn(cb, &event);
+                                    }
+                                } else {
+                                    tracing::error!("{} callbacks mutex poisoned; skipping event", name);
+                                }

As per coding guidelines: Avoid unwrap() and expect() in library code; use explicit error types (e.g., thiserror).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
dash-spv/src/client/sync_coordinator.rs (1)

51-67: ⚠️ Potential issue | 🟠 Major

Break the loop when channels close to avoid hot spinning.

recv() returning None (when the sender is dropped) immediately matches the branch in tokio::select! and without an explicit break, the loop will repeatedly poll the same closed channel, causing tight spinning and log spam. Similarly, progress_updates.changed() returning Err will be polled again immediately each iteration.

While the code has a self.running flag check and cancellation token handler, these are external signals. Proper channel-state handling is needed for defense-in-depth error recovery.

🔧 Proposed fix
-                received = command_receiver.recv() => {
-                    match received {
-                    None => {tracing::warn!("DashSpvClientCommand channel closed.");},
-                    Some(command) => {
-                            self.handle_command(command).await.unwrap_or_else(|e| tracing::error!("Failed to handle command: {}", e));
-                        }
-                    }
-                }
+                received = command_receiver.recv() => {
+                    match received {
+                        None => {
+                            tracing::warn!("DashSpvClientCommand channel closed.");
+                            break;
+                        }
+                        Some(command) => {
+                            if let Err(e) = self.handle_command(command).await {
+                                tracing::error!("Failed to handle command: {}", e);
+                            }
+                        }
+                    }
+                }
-                _ = progress_updates.changed() => {
-                    tracing::info!("Sync progress:{}", *progress_updates.borrow());
-                }
+                changed = progress_updates.changed() => {
+                    if changed.is_err() {
+                        tracing::debug!("Sync progress channel closed");
+                        break;
+                    }
+                    tracing::info!("Sync progress:{}", *progress_updates.borrow());
+                }
dash-spv-ffi/src/client.rs (1)

651-689: ⚠️ Potential issue | 🟠 Major

Potential memory leak: sync progress destroy doesn’t free nested manager progress.

FFISyncProgress::from allocates nested progress pointers, but dash_spv_ffi_sync_progress_destroy only frees the top-level box. If callers use the legacy destroy function (as the docs suggest), nested allocations will leak. Consider delegating to dash_spv_ffi_manager_sync_progress_destroy or updating the docs/return type to ensure nested pointers are freed.

🧹 Suggested fix (delegate to manager destroy)
 pub unsafe extern "C" fn dash_spv_ffi_sync_progress_destroy(progress: *mut FFISyncProgress) {
-    if !progress.is_null() {
-        let _ = Box::from_raw(progress);
-    }
+    dash_spv_ffi_manager_sync_progress_destroy(progress);
 }

As per coding guidelines: Be careful with FFI memory management.

dash-spv-ffi/src/bin/ffi_cli.rs (1)

265-304: ⚠️ Potential issue | 🟡 Minor

Help text doesn’t match the actual default data directory.

The flag says “default: unique directory in /tmp” but the code defaults to .tmp/ffi-cli. Either update the help text or change the default to match.

🔧 Suggested help‑text fix
         .arg(
             Arg::new("data-dir")
                 .short('d')
                 .long("data-dir")
                 .value_name("DIR")
-                .help("Data directory for storage (default: unique directory in /tmp)"),
+                .help("Data directory for storage (default: .tmp/ffi-cli)"),
         )
🤖 Fix all issues with AI agents
In `@dash-spv-ffi/src/client.rs`:
- Around line 22-113: Both spawn_broadcast_monitor and spawn_progress_monitor
currently invoke dispatch_fn while holding the callbacks mutex, risking
re-entrancy deadlocks; fix by snapping a clone/copy of the callback out of the
Arc<Mutex<Option<C>>> while holding the lock, then drop the lock and call
dispatch_fn on the cloned value (so the lock is not held during callback
execution), and ensure your FFI callback type implements Copy/Clone to make the
snapshot cheap and safe.

In `@dash-spv-ffi/tests/unit/test_client_lifecycle.rs`:
- Around line 193-207: The test dereferences the FFI result and its fields
unsafely: when calling dash_spv_ffi_client_get_sync_progress() the returned
pointer (progress1) and its nested pointers (progress.headers and
progress.filter_headers) can be null; update the test around
dash_spv_ffi_client_get_sync_progress, progress1, progress.headers, and
progress.filter_headers to check for null before dereferencing, skip or assert
appropriate safe behavior if null, and ensure
dash_spv_ffi_sync_progress_destroy(progress1) and
dash_spv_ffi_client_destroy(client) still run (or are guarded) to avoid leaks or
double-free; use explicit null-checks on progress1, then on progress.headers and
progress.filter_headers before accessing their fields.

In `@dash-spv/src/client/queries.rs`:
- Around line 52-58: Update the doc comment for masternode_list_engine to
reflect that it returns Result<Arc<RwLock<MasternodeListEngine>>> and will
return an Err(SpvError::Config) when the masternode engine is not initialized
instead of returning None; mention the success case returns an
Arc<RwLock<MasternodeListEngine>> inside Ok and the failure case returns an
error describing the missing initialization (refer to masternode_list_engine and
SpvError::Config).
- Around line 61-68: The docstring for get_quorum_at_height is out of date: it
says the function returns None when a quorum is not found but the signature
returns Result<QualifiedQuorumEntry> and the implementation returns an error on
missing quorum; update the comment above get_quorum_at_height to describe that
the function returns a Result and will return an Err when the quorum is not
found (include which error variant/type is returned if applicable) and
remove/replace the sentence claiming it returns None so the doc matches the
actual return behavior.

In `@dash-spv/src/network/mod.rs`:
- Around line 80-98: Replace the hardcoded filter_type: 0 in
request_filter_headers and request_filters with a named constant (e.g., pub
const BASIC_FILTER_TYPE: u8 = 0) and use that constant when constructing
GetCFHeaders and GetCFilters (the NetworkMessage::GetCFHeaders/GetCFilters
calls) so the protocol parameter is centralized and not littered as a magic
literal; make the constant public if other modules may need the same protocol
value.

In `@dash-spv/src/sync/filter_headers/manager.rs`:
- Around line 53-90: In process_cfheaders, avoid blindly overwriting the
previous_filter_header at start_height - 1: before calling
storage.store_filter_headers_at_height for the checkpoint case, query the
existing stored header for height start_height - 1 via the filter_header_storage
API (e.g., a get/has method) and only call store_filter_headers_at_height when
no header exists; if a header exists but does not equal
cfheaders.previous_filter_header, return an error (or log and abort the sync)
instead of overwriting; ensure you still clear checkpoint_start_height only
after successful storage or when detecting an identical existing header to avoid
repeated checks.

In `@dash-spv/src/sync/filters/batch_tracker.rs`:
- Around line 30-47: The is_complete implementation currently only compares
received.len() to an expected count which allows out-of-range heights or
underflow when start_height > end_height to mark completion; update is_complete
to first validate the range (return false if start_height > self.end_height to
avoid underflow), then compute the inclusive height range and verify membership
for each height in start_height..=self.end_height by checking self.received
contains that exact height (rather than relying on count); ensure this aligns
with how insert_filter populates received and keys (see insert_filter,
FilterMatchKey, received, end_height) so out-of-range entries cannot falsely
satisfy completion.

In `@dash-spv/src/sync/instantsend/manager.rs`:
- Around line 95-112: When structural validation fails in the InstantLock
handling block (the validate_structure(...) branch), increment the invalid
counter on the progress tracker before returning so metrics reflect dropped
locks; call the appropriate method on self.progress (e.g.,
self.progress.add_invalid(1)) and then return early, keeping the existing
behavior of not queuing the lock. This change should be applied around the
validate_structure, self.progress, add_valid, queue_pending, update_pending, and
pending_instantlocks logic so invalid counts are tracked consistently.
🧹 Nitpick comments (15)
key-wallet/src/managed_account/mod.rs (1)

84-96: Consider using a flag to avoid redundant rebuilds for receive-only accounts.

The lazy rebuild condition self.spent_outpoints.is_empty() && !self.transactions.is_empty() will repeatedly trigger rebuilds if the account only has receiving transactions (no spent outpoints). While functionally correct, this causes O(n) iteration on every check for such accounts.

Also, Line 88's self.spent_outpoints.clear() is redundant since the set was just verified to be empty.

♻️ Suggested fix using a rebuild flag
     #[cfg_attr(feature = "serde", serde(skip))]
     spent_outpoints: HashSet<OutPoint>,
+    /// Tracks whether spent_outpoints has been rebuilt after deserialization
+    #[cfg_attr(feature = "serde", serde(skip))]
+    spent_outpoints_rebuilt: bool,
 }

Then update the initialization and helper:

             spent_outpoints: HashSet::new(),
+            spent_outpoints_rebuilt: false,
     fn is_outpoint_spent(&mut self, outpoint: &OutPoint) -> bool {
-        if self.spent_outpoints.is_empty() && !self.transactions.is_empty() {
-            self.spent_outpoints.clear();
+        if !self.spent_outpoints_rebuilt && !self.transactions.is_empty() {
             for tx_record in self.transactions.values() {
                 for input in &tx_record.transaction.input {
                     self.spent_outpoints.insert(input.previous_output);
                 }
             }
+            self.spent_outpoints_rebuilt = true;
         }
         self.spent_outpoints.contains(outpoint)
     }
key-wallet-manager/src/wallet_manager/process_block.rs (1)

177-177: Minor typo in test comment.

"Inrease" should be "Increase".

📝 Suggested fix
-        // Inrease synced height
+        // Increase synced height
key-wallet-manager/src/events.rs (1)

56-66: BalanceUpdated::description() omits the locked field.

The struct has four balance fields (spendable, unconfirmed, immature, locked), but description() only includes three. Consider including locked for consistency with the struct definition and to aid debugging.

📝 Suggested fix
             WalletEvent::BalanceUpdated {
                 spendable,
                 unconfirmed,
                 immature,
-                ..
+                locked,
             } => {
                 format!(
-                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={})",
-                    spendable, unconfirmed, immature
+                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={}, locked={})",
+                    spendable, unconfirmed, immature, locked
                 )
             }
dash-spv/src/validation/filter.rs (1)

52-53: Consider using sort_unstable() for performance.

Since heights are u32 values with no semantic meaning to their relative ordering when equal, sort_unstable() would be more efficient than sort().

📝 Suggested fix
         let mut heights: Vec<u32> = input.filters.keys().map(|k| k.height()).collect();
-        heights.sort();
+        heights.sort_unstable();
dash-spv/src/sync/masternodes/progress.rs (1)

7-21: PartialEq derivation with Instant field may cause unexpected behavior.

Deriving PartialEq includes last_activity: Instant in the comparison, but two Instant values from separate Instant::now() calls are never equal even if created at the "same" wall-clock time. This means two otherwise-identical MasternodesProgress instances will compare as unequal.

If equality comparison is used (e.g., for change detection or testing), consider either:

  1. Excluding last_activity from equality via manual PartialEq impl, or
  2. Documenting that equality comparison is intentionally strict.
♻️ Manual PartialEq excluding last_activity
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
 pub struct MasternodesProgress {
     // ...fields...
 }
+
+impl PartialEq for MasternodesProgress {
+    fn eq(&self, other: &Self) -> bool {
+        self.state == other.state
+            && self.current_height == other.current_height
+            && self.target_height == other.target_height
+            && self.block_header_tip_height == other.block_header_tip_height
+            && self.diffs_processed == other.diffs_processed
+        // last_activity intentionally excluded
+    }
+}
dash-spv/src/sync/filter_headers/progress.rs (1)

7-21: Same PartialEq with Instant consideration as MasternodesProgress.

This has the same pattern where deriving PartialEq includes last_activity: Instant, which may cause unexpected inequality between otherwise-identical progress instances. Consider consistent handling across all progress types.

dash-spv/src/sync/filters/progress.rs (1)

119-135: Consider saturating counters to avoid wrap in long sessions.
downloaded/processed/matched are u32 and can wrap if the session is long or replay-heavy.

♻️ Suggested change
     pub fn add_downloaded(&mut self, count: u32) {
-        self.downloaded += count;
+        self.downloaded = self.downloaded.saturating_add(count);
         self.bump_last_activity();
     }
@@
     pub fn add_processed(&mut self, count: u32) {
-        self.processed += count;
+        self.processed = self.processed.saturating_add(count);
         self.bump_last_activity();
     }
@@
     pub fn add_matched(&mut self, count: u32) {
-        self.matched += count;
+        self.matched = self.matched.saturating_add(count);
         self.bump_last_activity();
     }
dash-spv/src/sync/block_headers/pipeline.rs (1)

290-395: Good test coverage for pipeline basics.

Tests cover construction, initialization with different starting heights, send behavior, and initial completion state. Consider adding tests for receive_headers and take_ready_to_store with actual header data to validate the full lifecycle.

dash-spv/src/client/lifecycle.rs (1)

102-115: Consider replacing expect() with proper error handling for consistency.

While the expect() on line 106 is guarded by the same condition that creates masternode_engine, using consistent error handling improves maintainability.

♻️ Suggested refactor
         if config.enable_masternodes {
-            let masternode_list_engine = masternode_engine
-                .clone()
-                .expect("Masternode list engine must exist if masternodes are enabled");
+            let masternode_list_engine = masternode_engine.clone().ok_or_else(|| {
+                SpvError::Config("Masternode list engine must exist if masternodes are enabled".to_string())
+            })?;
dash-spv/src/sync/masternodes/pipeline.rs (1)

62-74: Minor: Log message may be misleading.

The log on line 72 reports base_hashes.len() which is the total count after adding new requests, not the count of requests just queued. This could be confusing when called multiple times.

♻️ Suggested improvement
     pub(super) fn queue_requests(&mut self, requests: Vec<(BlockHash, BlockHash)>) {
+        let added = requests.len();
         for (base_hash, target_hash) in requests {
             self.coordinator.enqueue([target_hash]);
             self.base_hashes.insert(target_hash, base_hash);
         }
 
-        if !self.base_hashes.is_empty() {
-            tracing::info!("Queued {} MnListDiff requests", self.base_hashes.len());
+        if added > 0 {
+            tracing::info!("Queued {} MnListDiff requests ({} total pending)", added, self.base_hashes.len());
         }
     }
dash-spv/src/sync/blocks/progress.rs (1)

7-7: Consider removing PartialEq derive due to Instant field.

PartialEq on a struct containing Instant will compare timestamps, which is unlikely to be useful. Two BlocksProgress instances with identical counters but different last_activity times will not compare equal.

If equality comparison is needed for tests, consider implementing a custom PartialEq that ignores last_activity, or remove the derive if not needed.

dash-spv/src/sync/blocks/pipeline.rs (1)

138-156: Consider avoiding block.clone() for large blocks.

Line 147 clones the entire block when inserting into downloaded. Since blocks can be large (up to several MB with transactions), this could cause memory pressure during sync. Consider storing block directly by taking ownership, or using Arc<Block> if shared access is needed.

♻️ Proposed refactor to avoid clone
-    pub(super) fn receive_block(&mut self, block: &Block) -> bool {
+    pub(super) fn receive_block(&mut self, block: Block) -> bool {
         let hash = block.block_hash();
         if !self.coordinator.receive(&hash) {
             tracing::debug!("Ignoring unrequested block: {}", hash);
             return false;
         }
 
         if let Some(height) = self.hash_to_height.remove(&hash) {
             self.pending_heights.remove(&height);
-            self.downloaded.insert(height, block.clone());
+            self.downloaded.insert(height, block);
             self.completed_count += 1;
             true
         } else {
             // Block was tracked by coordinator but not by height mapping.
-            // This can happen if queue() was used instead of queue_with_heights().
             self.completed_count += 1;
             true
         }
     }
dash-spv/src/network/manager.rs (2)

1099-1117: Lock acquired inside loop may cause contention.

The headers2_disabled lock is acquired inside the loop for each peer check (Line 1106). With many peers, this repeatedly acquires and releases the lock. Consider acquiring it once before the loop.

♻️ Proposed refactor to reduce lock contention
             NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_) => {
                 // Prefer headers2 peers, fall back to all
+                let disabled = self.headers2_disabled.lock().await;
                 let headers2_peers: Vec<_> = {
                     let mut result = Vec::new();
                     for (addr, peer) in &peers {
                         let peer_guard = peer.read().await;
                         if peer_guard.supports_headers2()
-                            && !self.headers2_disabled.lock().await.contains(addr)
+                            && !disabled.contains(addr)
                         {
                             result.push((*addr, peer.clone()));
                         }
                     }
                     result
                 };
+                drop(disabled);
                 if headers2_peers.is_empty() {
                     (peers.clone(), false)
                 } else {
                     (headers2_peers, false)
                 }
             }

692-715: Consider rate-limiting spawned send tasks.

Each SendMessage request spawns a new tokio task (Line 696). With an unbounded channel and no backpressure, this could lead to many concurrent tasks under high load. While network throughput provides natural backpressure, consider adding a semaphore or bounded task pool for defense-in-depth.

dash-spv/src/storage/mod.rs (1)

59-97: Document thread-safety expectations for shared storage refs.
The new *_storage_ref() methods expose Arc<RwLock<_>> access alongside &mut self APIs; a brief note about interior mutability and intended concurrency would reduce ambiguity for implementers.

♻️ Suggested doc addition
 pub trait StorageManager:
     BlockHeaderStorage
@@
 {
+    /// Thread-safety note: implementations may use interior mutability to remain `Send + Sync`.
+    /// Callers relying on `*_storage_ref` should assume shared access is synchronized via the
+    /// returned `Arc<RwLock<...>>`.
     /// Deletes in-disk and in-memory data
     async fn clear(&mut self) -> StorageResult<()>;
Based on learnings: The `StorageManager` trait in `dash-spv/src/storage/mod.rs` uses `&mut self` methods but is also `Send + Sync`, and implementations often use interior mutability for concurrency. This can be confusing, so explicit documentation should clarify thread-safety expectations and the rationale for the API design.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
dash-spv/src/storage/block_headers.rs (1)

168-181: ⚠️ Potential issue | 🟠 Major

Fix reverse-index consistency when overwriting header ranges.

If store_items_at_height replaces existing headers (e.g., reorgs or checkpoint replays), the old hashes remain in header_hash_index, so get_header_height_by_hash can return heights for headers that no longer exist. Consider removing old hashes in the overwrite range before inserting new ones.

🐛 Proposed fix
 async fn store_hashed_headers_at_height(
     &mut self,
     headers: &[HashedBlockHeader],
     height: u32,
 ) -> StorageResult<()> {
-    let mut height = height;
-
-    self.block_headers.write().await.store_items_at_height(headers, height).await?;
-
-    for header in headers {
-        self.header_hash_index.insert(*header.hash(), height);
-        height += 1;
-    }
+    if headers.is_empty() {
+        return Ok(());
+    }
+
+    let mut block_headers = self.block_headers.write().await;
+    // Remove existing hashes in the overwrite range.
+    if let Ok(existing) = block_headers
+        .get_items(height..height + headers.len() as u32)
+        .await
+    {
+        for header in existing {
+            self.header_hash_index.remove(header.hash());
+        }
+    }
+
+    block_headers.store_items_at_height(headers, height).await?;
+
+    let mut current_height = height;
+    for header in headers {
+        self.header_hash_index.insert(*header.hash(), current_height);
+        current_height += 1;
+    }
 
     Ok(())
 }
dash-spv/ARCHITECTURE.md (1)

76-112: ⚠️ Potential issue | 🟡 Minor

Add language identifiers to fenced code blocks.
Markdownlint flags these blocks; consider text for diagrams and rust for code samples.

Also applies to: 1025-1042, 1074-1081

dash-spv-ffi/FFI_API.md (1)

1040-1061: ⚠️ Potential issue | 🟡 Minor

Example code references deprecated API.

The "Event Callbacks" example uses FFIEventCallbacks and dash_spv_ffi_client_set_event_callbacks, which have been replaced by the new per-type callback structs (FFISyncEventCallbacks, FFINetworkEventCallbacks, etc.) and their corresponding setter functions.

📝 Suggested updated example
void on_block_headers_stored(uint32_t tip_height, void* user_data) {
    printf("Block headers tip: %u\n", tip_height);
}

void on_peer_connected(const char* address, void* user_data) {
    printf("Peer connected: %s\n", address);
}

// Set up sync callbacks
FFISyncEventCallbacks sync_callbacks = {
    .on_block_headers_stored = on_block_headers_stored,
    .user_data = NULL
};

FFINetworkEventCallbacks network_callbacks = {
    .on_peer_connected = on_peer_connected,
    .user_data = NULL
};

dash_spv_ffi_client_set_sync_event_callbacks(client, sync_callbacks);
dash_spv_ffi_client_set_network_event_callbacks(client, network_callbacks);
🤖 Fix all issues with AI agents
In `@dash-spv-ffi/src/bin/ffi_cli.rs`:
- Around line 87-100: The on_chainlock_received FFI handler dereferences the raw
pointers hash and signature unsafely; add null checks using std::ptr::is_null
for both pointers at the top of on_chainlock_received and return early (or log
an error) if either is null to avoid UB, then perform the existing unsafe
dereference only after those checks; use the existing variable names (hash,
signature) and ensure any logging clarifies which pointer was null before
returning.

In `@dash-spv-ffi/src/client.rs`:
- Around line 506-551: The run() path only spawns monitors for callbacks present
at startup, so callbacks set later (fields like client.sync_event_callbacks,
client.network_event_callbacks, client.wallet_event_callbacks,
client.progress_callback) will never get a monitor; update the callback setters
to detect when the client is already running (e.g. a run_called or
shutdown_token/state on the Client used by dash_spv_ffi_client_run) and either
(preferred) dynamically spawn the same monitors used in run()
(spawn_broadcast_monitor for broadcast callbacks and spawn_progress_monitor for
progress_callback) and push the returned handle into client.active_threads, or
(simpler) log a clear warning if run has already been called; ensure you check
whether a monitor is already running (via active_threads or the callback Option)
to avoid duplicate monitors and reuse the same closure signatures (|cb, event|
cb.dispatch(event|progress)) as in run().

In `@dash-spv/src/network/mod.rs`:
- Around line 94-100: The request_filters function uses a hardcoded filter_type:
0 which is inconsistent with request_filter_headers; replace the literal 0 in
NetworkMessage::GetCFilters within the request_filters method with the
FILTER_TYPE_DEFAULT constant so both request_filter_headers and request_filters
use the same FILTER_TYPE_DEFAULT value (look for request_filters,
request_filter_headers, NetworkMessage::GetCFilters and the GetCFilters struct
to make the change).

In `@dash-spv/src/storage/mod.rs`:
- Around line 79-97: The current default storage ref methods
(header_storage_ref, filter_header_storage_ref, filter_storage_ref,
block_storage_ref) return stale Arcs after clear() replaces internal storage;
update clear_storage()/clear() in DiskStorageManager/DashSpvClient to either (A)
reinitialize and replace sync_coordinator and its managers (BlockHeadersManager,
FilterHeadersManager, etc.) so they call the new storage refs, or (B) refactor
storage fields to use an atomic indirection (e.g., ArcSwap) and change
header_storage_ref/filter_header_storage_ref/filter_storage_ref/block_storage_ref
to return clones of the ArcSwap-backed Arc so existing manager clones remain
valid after swap; pick one approach and implement it consistently across the
trait defaults, DiskStorageManager implementations, and
DashSpvClient.clear()/clear_storage().

In `@dash-spv/src/sync/block_headers/segment_state.rs`:
- Around line 114-152: The update of current_tip_hash should use the last header
that was actually processed/buffered rather than headers.last(); when a
checkpoint ends processing early the trailing headers are unbuffered and using
headers.last() advances the tip past current_height. Change the logic in the
end-of-loop update (where current_tip_hash and current_height are set) to pick
the header at index processed-1 from headers (if processed > 0) as the source
for block_hash(), otherwise leave current_tip_hash unchanged; then increment
current_height by processed as before. Refer to symbols: current_tip_hash,
current_height, headers, processed, buffered_headers, segment_id when locating
the code to change.

In `@dash-spv/src/sync/block_headers/sync_manager.rs`:
- Around line 131-156: The code currently leaves the manager in
SyncState::Synced while issuing fallback/catch-up GetHeaders; update the state
machine so that before sending fallback requests you set the manager to
SyncState::Syncing (call whatever setter used by state()/SyncState) and then
transition back to SyncState::Synced when those requests complete or when the
pipeline reports completion; specifically modify the block that detects stale
entries (uses pending_announcements, UNSOLICITED_HEADERS_WAIT_TIMEOUT,
pipeline.reset_tip_segment, pipeline.send_pending) to set state to Syncing prior
to reset_tip_segment/send_pending and ensure the corresponding completion path
(where headers are processed or pipeline callbacks finish) sets state back to
Synced; apply the same change to the analogous handling around lines 182-196.

In `@dash-spv/src/sync/download_coordinator.rs`:
- Around line 194-203: The method check_and_retry_timeouts currently returns all
items from check_timeouts even if enqueue_retry fails; change it to only return
items that were actually re-enqueued by invoking enqueue_retry for each
timed_out item and collecting only those for which enqueue_retry indicates
success (e.g., Ok or true), so the returned Vec<K> reflects real re-queues;
update the docstring to state it returns only successfully re-queued items and
reference the functions check_and_retry_timeouts, check_timeouts, and
enqueue_retry in the change.

In `@dash-spv/src/sync/filter_headers/pipeline.rs`:
- Around line 270-284: In handle_timeouts(), when
coordinator.enqueue_retry(stop_hash) returns false you must remove the orphaned
entry from batch_starts to avoid memory growth; locate the loop in
handle_timeouts and after determining let Some(&start_height) =
self.batch_starts.get(&stop_hash) capture start_height, call
self.batch_starts.remove(&stop_hash) to remove the mapping, then log the warning
and push start_height into failed as before so the batch is cleaned up from
batch_starts when max retries are exceeded.

In `@dash-spv/src/sync/filter_headers/sync_manager.rs`:
- Around line 54-67: When the early-return branch after
pipeline.match_response(...) detects completion and calls
self.set_state(SyncState::Synced), also emit the FilterHeadersSyncComplete event
so consumers like FiltersManager are notified; use the same event-emission
mechanism used elsewhere in this module where FilterHeadersSyncComplete is
currently emitted (i.e., call the same function/field that sends
Event::FilterHeadersSyncComplete immediately after set_state), keeping the state
transition and log intact.

In `@dash-spv/src/sync/filters/manager.rs`:
- Around line 678-684: The code currently calls batch.mark_scanned() before
checking batch.filters().is_empty(), which allows empty batches to be marked
scanned and committed prematurely; move the call to batch.mark_scanned() so it
occurs only after the empty-check (or replace the empty branch with an error
return) — locate the scan_batch flow where batch.mark_scanned() and
batch.filters().is_empty() are used and change the order so that the empty check
(batch.filters().is_empty()) runs first and returns/errs, and only then call
batch.mark_scanned() after confirming filters exist.
- Around line 335-363: The code currently builds filter_headers_map from
load_filter_headers without checking that the number of returned headers matches
the requested batch range, which can produce an incomplete expected_headers map
and break FilterValidator::validate; after calling load_filter_headers(...)
verify that filter_headers.len() == (batch.end_height() - batch.start_height() +
1) and if not return an Err (with a clear message) before constructing
filter_headers_map or invoking FilterValidator; keep references to
batch.start_height(), batch.end_height(), load_filter_headers,
filter_headers_map, get_prev_filter_header and FilterValidationInput to locate
and update the logic.

In `@dash-spv/src/sync/masternodes/sync_manager.rs`:
- Around line 354-376: The code currently marks a MnListDiff as received when
get_header_height_by_hash(&diff.block_hash) returns None or Err by calling
self.sync_state.mnlistdiff_pipeline.receive(diff), which incorrectly completes
the request without applying it; instead, in the None/Err branches of the match
inside the apply_mnlistdiff (or equivalent) function, do not call
mnlistdiff_pipeline.receive(diff) — either requeue the diff for retry using the
pipeline's retry/requeue method (if one exists) or return an Err (e.g.,
propagate a specific MissingHeader/Retryable error) so the caller can retry
later; update the branches referencing get_header_height_by_hash,
self.sync_state.mnlistdiff_pipeline.receive, and the surrounding apply routine
accordingly.
- Around line 383-396: The apply_diff failure is currently only logged and the
diff is still marked received, which can hide critical chain gaps; update the
error handling in the match on engine.apply_diff (the block around
engine.apply_diff(diff.clone(), Some(target_height), false, None)) to propagate
failures into the sync state instead of silently continuing: on Err(e) record
the failed target_height and diff (e.g., add to a sync_state.failed_diffs or set
a sync_state.error flag), stop marking the diff as successfully received (remove
insertion into known_mn_list_heights/known_block_hashes when Err), and ensure
verify_and_complete() checks sync_state.failed_diffs or the error flag and
returns a sync error so the caller can retry or abort the sync pipeline.

In `@key-wallet/src/managed_account/mod.rs`:
- Around line 84-96: The lazy-rebuild logic in is_outpoint_spent incorrectly
uses spent_outpoints.is_empty() as a sentinel and also calls
spent_outpoints.clear(); add a dedicated boolean field (e.g.
spent_outpoints_initialized) to the ManagedAccount struct, set it false after
deserialization and true after rebuilding, then change is_outpoint_spent to
check !spent_outpoints_initialized instead of is_empty(), remove the redundant
clear(), and ensure the rebuild loop populates spent_outpoints from transactions
(referencing is_outpoint_spent, spent_outpoints, transactions).
🧹 Nitpick comments (16)
dash-spv/src/validation/filter.rs (1)

118-122: Avoid temporary String allocation in error message.

unwrap_or(&"unknown".to_string()) creates a temporary String allocation on every call, even when details is non-empty. Use map_or with a &str instead.

♻️ Proposed fix
             return Err(ValidationError::InvalidFilterHeaderChain(format!(
                 "Filter verification failed for {} filters. First failure: {}",
                 failures.len(),
-                details.first().unwrap_or(&"unknown".to_string())
+                details.first().map_or("unknown", String::as_str)
             )));
key-wallet/src/managed_account/mod.rs (1)

361-372: Consider bounded spent-outpoint tracking for long-lived wallets.

The spent_outpoints set grows indefinitely as transactions are processed. For long-running wallets with high transaction volume, this could become a memory concern over time.

A potential optimization would be to prune outpoints that are older than a certain confirmation depth (e.g., 100+ blocks deep), since rescans typically only need to handle recent out-of-order arrivals.

key-wallet-manager/src/events.rs (1)

45-68: Consider including locked in the BalanceUpdated description.

Right now the log text omits a field that can change independently.

Proposed tweak
-            WalletEvent::BalanceUpdated {
-                spendable,
-                unconfirmed,
-                immature,
-                ..
-            } => {
-                format!(
-                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={})",
-                    spendable, unconfirmed, immature
-                )
-            }
+            WalletEvent::BalanceUpdated {
+                spendable,
+                unconfirmed,
+                immature,
+                locked,
+                ..
+            } => {
+                format!(
+                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={}, locked={})",
+                    spendable, unconfirmed, immature, locked
+                )
+            }
dash-spv/src/client/chainlock.rs (1)

1-7: Module documentation is stale.

The doc comment mentions "InstantSendLock processing" and "ChainLock validation updates" but the corresponding methods (process_instantsendlock, update_chainlock_validation) have been removed. Update the documentation to reflect the current module contents.

✏️ Suggested fix
 //! ChainLock processing and validation.
 //!
 //! This module contains:
 //! - ChainLock processing
-//! - InstantSendLock processing
-//! - ChainLock validation updates
 //! - Pending ChainLock validation
dash-spv/src/test_utils/network.rs (1)

56-58: Consider documenting that take_receiver can only be called once.

Since this returns Option<...> and uses take(), subsequent calls return None. A doc comment would clarify this one-shot behavior for test authors.

✏️ Suggested documentation
+    /// Takes ownership of the request receiver for test verification.
+    /// 
+    /// Returns `None` if already taken. Can only be called once.
     pub fn take_receiver(&mut self) -> Option<UnboundedReceiver<NetworkRequest>> {
         self.request_rx.take()
     }
dash-spv/src/client/sync_coordinator.rs (1)

60-62: Consider reducing progress logging verbosity.

Logging at info level on every progress change (line 61) may produce excessive output during active sync. Consider using debug level or throttling the log frequency.

♻️ Suggested change
                 _ = progress_updates.changed() => {
-                    tracing::info!("Sync progress:{}", *progress_updates.borrow());
+                    tracing::debug!("Sync progress:{}", *progress_updates.borrow());
                 }
dash-spv/src/sync/filters/pipeline.rs (1)

232-240: Consider using a reverse lookup map for find_batch_for_height.

The linear scan through batch_trackers to find the batch for a given height is O(n) where n is the number of active batches. While bounded by MAX_CONCURRENT_FILTER_BATCHES (20), this is called for every received filter. A reverse lookup map (height_to_batch_start) could provide O(1) lookup.

♻️ Optional optimization
 pub(super) struct FiltersPipeline {
     coordinator: DownloadCoordinator<u32>,
     batch_trackers: HashMap<u32, BatchTracker>,
     completed_batches: BTreeSet<FiltersBatch>,
     target_height: u32,
     filters_received: u32,
     highest_received: u32,
+    /// Reverse lookup: height -> batch start for O(1) batch lookup.
+    height_to_batch: HashMap<u32, u32>,
 }

Populate in send_pending when creating trackers, and use in find_batch_for_height.

dash-spv/src/network/manager.rs (2)

1099-1117: Lock contention inside loop when checking headers2_disabled.

The headers2_disabled lock is acquired inside the loop for each peer when building headers2_peers. For large peer sets, this could cause contention.

♻️ Proposed optimization to acquire lock once
             NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_) => {
                 // Prefer headers2 peers, fall back to all
+                let disabled = self.headers2_disabled.lock().await;
                 let headers2_peers: Vec<_> = {
                     let mut result = Vec::new();
                     for (addr, peer) in &peers {
                         let peer_guard = peer.read().await;
                         if peer_guard.supports_headers2()
-                            && !self.headers2_disabled.lock().await.contains(addr)
+                            && !disabled.contains(addr)
                         {
                             result.push((*addr, peer.clone()));
                         }
                     }
                     result
                 };
+                drop(disabled);

686-730: Request processor spawns unbounded concurrent tasks.

Each incoming request spawns a new tokio::spawn without backpressure. Under high load, this could lead to resource exhaustion. Consider using a semaphore or bounded task pool.

♻️ Proposed bounded concurrency
+use tokio::sync::Semaphore;
+
+const MAX_CONCURRENT_REQUESTS: usize = 100;
+
 async fn start_request_processor(&self) {
+    let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
     // ... existing setup ...
     
     tasks.spawn(async move {
         loop {
             tokio::select! {
                 request = request_rx.recv() => {
                     match request {
                         Some(NetworkRequest::SendMessage(msg)) => {
+                            let permit = semaphore.clone().acquire_owned().await.unwrap();
                             let this = this.clone();
                             tokio::spawn(async move {
                                 // ... send logic ...
+                                drop(permit);
                             });
dash-spv/src/sync/blocks/pipeline.rs (1)

138-156: Consider avoiding full block clone on receive.

block.clone() on line 147 clones the entire block including all transactions. Since receive_block takes a reference, the caller likely already owns the block. Consider taking ownership to avoid the clone.

♻️ Proposed signature change to take ownership
-    pub(super) fn receive_block(&mut self, block: &Block) -> bool {
-        let hash = block.block_hash();
+    pub(super) fn receive_block(&mut self, block: Block) -> bool {
+        let hash = block.block_hash();
         if !self.coordinator.receive(&hash) {
             tracing::debug!("Ignoring unrequested block: {}", hash);
             return false;
         }

         if let Some(height) = self.hash_to_height.remove(&hash) {
             self.pending_heights.remove(&height);
-            self.downloaded.insert(height, block.clone());
+            self.downloaded.insert(height, block);
             self.completed_count += 1;
             true
         } else {
             self.completed_count += 1;
             true
         }
     }
dash-spv/src/sync/progress.rs (1)

101-109: percentage() excludes blocks/masternodes progress.

The percentage calculation only considers headers, filter_headers, and filters. Depending on requirements, blocks and masternodes progress might also contribute to overall completion percentage.

dash-spv/src/sync/chainlock/manager.rs (1)

122-143: verify_block_hash accepts ChainLocks on storage errors.

When storage errors occur (lines 133-141), the method returns true, accepting the ChainLock for further processing. While documented, this could allow invalid ChainLocks to be accepted if storage is intermittently failing. Consider logging at error level or tracking these for later re-validation.

🛡️ Suggested improvement for observability
             Err(e) => {
                 tracing::warn!(
+                tracing::error!(
                     "Storage error checking ChainLock header at height {}: {}",
                     chainlock.block_height,
                     e
                 );
-                // Accept since we can't verify - will validate when header arrives
+                // Accept since we can't verify - signature validation still applies
+                // TODO: Consider tracking for re-validation when storage recovers
                 true
             }
dash-spv/src/sync/download_coordinator.rs (1)

153-163: Clear retry counters after successful receives to avoid growth.
retry_counts only ever grows; clearing entries on success prevents unbounded accumulation in long-running syncs.

♻️ Proposed cleanup
         if self.in_flight.remove(key).is_some() {
+            self.retry_counts.remove(key);
             self.last_progress = Instant::now();
             true
dash-spv/src/sync/filter_headers/pipeline.rs (1)

23-25: Duplicate doc comment for timeout constant.

Line 23 and Line 24 both describe the same constant with slightly different wording. Remove the duplicate.

 /// Timeout for CFHeaders requests (shorter for faster retry on multi-peer).
-/// Timeout for CFHeaders requests. Single response but allow time for network latency.
 const FILTER_HEADERS_TIMEOUT: Duration = Duration::from_secs(20);
dash-spv/src/sync/filters/batch.rs (1)

108-126: Equality based only on start_height may cause unexpected behavior.

PartialEq compares only start_height, meaning two FiltersBatch instances with the same start height but different filters, end heights, or state flags are considered equal. This is explicitly tested (lines 175-187), so it appears intentional for use in sorted collections.

However, this can be surprising:

  • BTreeSet::insert will reject a batch if one with the same start_height already exists
  • HashMap keyed by FiltersBatch would conflate different batches

Consider documenting this behavior prominently or deriving a separate key type for collection membership.

dash-spv-ffi/src/bin/ffi_cli.rs (1)

493-496: Creates redundant tokio runtime for signal handling.

A new tokio runtime is created just to wait for Ctrl+C (line 493), but the FFI client already has a runtime. Consider using a simpler blocking signal handler or exposing the client's runtime handle.

♻️ Alternative using std signal handling
-        // Wait for Ctrl+C signal using tokio
-        tokio::runtime::Runtime::new()
-            .expect("Failed to create tokio runtime")
-            .block_on(tokio::signal::ctrl_c())
-            .expect("Failed to listen for Ctrl+C");
+        // Wait for Ctrl+C signal
+        let (tx, rx) = std::sync::mpsc::channel();
+        ctrlc::set_handler(move || {
+            let _ = tx.send(());
+        }).expect("Failed to set Ctrl+C handler");
+        let _ = rx.recv();

This requires adding the ctrlc crate as a dependency, or alternatively using platform-specific signal handling.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
dash-spv/src/client/sync_coordinator.rs (1)

51-59: ⚠️ Potential issue | 🟠 Major

Closed command channel can cause a tight loop.
Once recv() yields None, this branch will fire immediately on every select, starving ticks/progress and burning CPU. Break the loop or return.

Proposed fix
-                    None => {tracing::warn!("DashSpvClientCommand channel closed.");},
+                    None => {
+                        tracing::warn!("DashSpvClientCommand channel closed.");
+                        break;
+                    },
dash-spv-ffi/src/client.rs (1)

913-922: ⚠️ Potential issue | 🟠 Major

Deep-free FFISyncProgress to avoid leaks.

FFISyncProgress::from allocates nested per-manager progress; this destructor only frees the outer box.

🛠️ Suggested fix
 pub unsafe extern "C" fn dash_spv_ffi_sync_progress_destroy(progress: *mut FFISyncProgress) {
-    if !progress.is_null() {
-        let _ = Box::from_raw(progress);
-    }
+    crate::dash_spv_ffi_manager_sync_progress_destroy(progress);
 }

As per coding guidelines: Be careful with FFI memory management.

🤖 Fix all issues with AI agents
In `@dash-spv-ffi/src/bin/ffi_cli.rs`:
- Around line 151-171: The on_transaction_received callback dereferences the
txid pointer without a null check; update on_transaction_received to first check
if txid.is_null() (inside the existing unsafe block) and handle the null case
(e.g., log a placeholder like "<null txid>" or return early) instead of
dereferencing, leaving the rest of the logic (wallet_str via ffi_string_to_rust,
addr_str, wallet_short, printing) unchanged so you avoid UB when txid is null.

In `@dash-spv/src/network/mod.rs`:
- Around line 113-122: The public method request_qr_info currently hardcodes
extra_share=true which breaks parity with the NetworkManager::request_qr_info
trait; change request_qr_info signature to accept an extra_share: bool
parameter, pass that boolean into the GetQRInfo struct
(NetworkMessage::GetQRInfo(GetQRInfo { ..., extra_share })) and update any
internal callers to forward the desired extra_share value so the wrapper matches
the trait API.

In `@dash-spv/src/sync/block_headers/pipeline.rs`:
- Around line 55-113: In init, short‑circuit when current_height >=
target_height to avoid creating a tip segment and unnecessary requests: at the
top of HeadersPipeline::init (function init) after clearing self.segments and
resetting self.next_to_store and self.initialized, check if current_height >=
target_height and if so set self.initialized = true (ensure next_to_store == 0
and segments remain empty), optionally log that no segments are needed, and
return early; this prevents building any SegmentState entries and avoids sending
requests for an already‑synced or behind peer.

In `@dash-spv/src/sync/filters/batch.rs`:
- Around line 65-68: Add a setter to flip the verification flag so the
verification state can actually change: implement a method like pub(super) fn
mark_verified(&mut self) (or pub fn if needed externally) on the same type that
contains the verified() getter and set self.verified = true; callers that
perform verification should call Batch::mark_verified() after successful
verification; update any tests or call sites that expect the batch to become
verified.
🧹 Nitpick comments (13)
dash-spv/src/validation/filter.rs (1)

65-70: Avoid potential panic when chaining expected headers.

Line 69 indexes the map directly; using get keeps this fallible and avoids panics if invariants change.

♻️ Proposed safe access
-        for &height in &heights {
-            prev_headers.insert(height, prev);
-            prev = input.expected_headers[&height];
-        }
+        for &height in &heights {
+            prev_headers.insert(height, prev);
+            let Some(&expected) = input.expected_headers.get(&height) else {
+                return Err(ValidationError::InvalidFilterHeaderChain(format!(
+                    "Missing expected header at height {}",
+                    height
+                )));
+            };
+            prev = expected;
+        }

As per coding guidelines: Use proper error types (thiserror) and propagate errors appropriately.

dash-spv/src/error.rs (1)

306-322: From implementations lose original error context.

These conversions stringify the original error, discarding the error chain. This is consistent with the existing SyncError::Network(String), SyncError::Storage(String), etc. pattern, but worth noting that structured error information is lost.

If preserving the original error chain becomes important for debugging, consider wrapping the source errors directly (e.g., SyncError::NetworkSource(NetworkError)).

dash-spv/src/sync/block_headers/progress.rs (1)

61-64: Guard effective_height against overflow.
current_height + buffered can wrap in release builds; a saturating add is safer.

Proposed change
-    pub fn effective_height(&self) -> u32 {
-        self.current_height + self.buffered
-    }
+    pub fn effective_height(&self) -> u32 {
+        self.current_height.saturating_add(self.buffered)
+    }
dash-spv-ffi/tests/integration/test_full_workflow.rs (1)

67-91: Remove unused callback definitions.

The on_sync_progress and on_sync_complete callbacks are defined but never registered with the FFI client. Based on the AI summary, callback-driven synchronization was replaced with direct operations. These appear to be dead code.

♻️ Suggested cleanup
-            // Set up callbacks
-            let sync_completed = ctx.sync_completed.clone();
-            let errors = ctx.errors.clone();
-
-            extern "C" fn on_sync_progress(progress: f64, msg: *const c_char, user_data: *mut c_void) {
-                let ctx = unsafe { &*(user_data as *const IntegrationTestContext) };
-                if progress >= 100.0 {
-                    ctx.sync_completed.store(true, Ordering::SeqCst);
-                }
-
-                if !msg.is_null() {
-                    let msg_str = unsafe { CStr::from_ptr(msg).to_str().unwrap() };
-                    ctx.events.lock().unwrap().push(format!("Progress {:.1}%: {}", progress, msg_str));
-                }
-            }
-
-            extern "C" fn on_sync_complete(success: bool, error: *const c_char, user_data: *mut c_void) {
-                let ctx = unsafe { &*(user_data as *const IntegrationTestContext) };
-                ctx.sync_completed.store(true, Ordering::SeqCst);
-
-                if !success && !error.is_null() {
-                    let error_str = unsafe { CStr::from_ptr(error).to_str().unwrap() };
-                    ctx.errors.lock().unwrap().push(error_str.to_string());
-                }
-            }
-
             // Start the client
key-wallet-manager/src/events.rs (1)

56-66: Consider including locked in the description for completeness.

The BalanceUpdated variant includes a locked field, but description() omits it. For consistency and debugging visibility, consider including it.

✏️ Optional fix
             WalletEvent::BalanceUpdated {
                 spendable,
                 unconfirmed,
                 immature,
+                locked,
                 ..
             } => {
                 format!(
-                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={})",
-                    spendable, unconfirmed, immature
+                    "BalanceUpdated(spendable={}, unconfirmed={}, immature={}, locked={})",
+                    spendable, unconfirmed, immature, locked
                 )
             }
dash-spv/src/client/chainlock.rs (1)

1-7: Update module documentation - InstantSendLock processing was removed.

The module doc comment still mentions "InstantSendLock processing" at line 5, but this functionality was removed according to the AI summary. The documentation should be updated to reflect the current module contents.

✏️ Suggested fix
 //! ChainLock processing and validation.
 //!
 //! This module contains:
 //! - ChainLock processing
-//! - InstantSendLock processing
-//! - ChainLock validation updates
 //! - Pending ChainLock validation
dash-spv/src/sync/blocks/pipeline.rs (1)

133-156: Consider avoiding block.clone() if ownership can be transferred.

Line 147 clones the entire block (including transaction data) when storing in the downloaded buffer. If the caller doesn't need to retain the block, consider accepting ownership (block: Block) to avoid the allocation.

However, this may be intentional if the caller needs to retain the block for other purposes (e.g., broadcasting to other components). The current signature accepting &Block is safer from an API perspective.

dash-spv/src/storage/block_headers.rs (1)

53-62: Track the TODO for the BlockHeaderStorage API cleanup.

There’s a TODO to change the trait API to hashed headers only. Please open/track an issue (or add a link) so this doesn’t get lost.

dash-spv/src/client/core.rs (1)

109-116: Note: SyncCoordinator uses concrete storage types.

The sync_coordinator field uses concrete Persistent*Storage types rather than following the generic S: StorageManager pattern used elsewhere in DashSpvClient. This couples the coordinator to specific storage implementations.

This is an acceptable trade-off given the coordinator's internal complexity with multiple managers, but worth documenting if storage abstraction may be needed in the future.

dash-spv/src/sync/filters/pipeline.rs (1)

212-218: Minor: Redundant map lookup.

The code accesses batch_trackers.get_mut(&batch_start) twice in succession. Since we already have tracker from line 191, consider restructuring to avoid the redundant lookup.

♻️ Suggested simplification
         let end_height = tracker.end_height();
-        // Take the filters before removing the tracker
-        let filters =
-            self.batch_trackers.get_mut(&batch_start).map(|t| t.take_filters()).unwrap_or_default();
-
-        self.batch_trackers.remove(&batch_start);
+        // Take the filters and remove the tracker
+        let filters = self
+            .batch_trackers
+            .remove(&batch_start)
+            .map(|mut t| t.take_filters())
+            .unwrap_or_default();
+
         self.coordinator.receive(&batch_start);
dash-spv/src/sync/filters/sync_manager.rs (1)

130-131: Ignored return value from receive_with_data.

The receive_with_data() method returns Option<u32> indicating when a batch completes, but the return value is discarded. The completion is handled implicitly via store_and_match_batches(), but explicitly checking the return could provide useful logging or early return optimization.

♻️ Optional: Use return value for logging
         // Buffer filter in pipeline
-        self.filter_pipeline.receive_with_data(h, cfilter.block_hash, &cfilter.filter);
+        if let Some(completed_height) = self.filter_pipeline.receive_with_data(h, cfilter.block_hash, &cfilter.filter) {
+            tracing::debug!("Filter batch completed at height {}", completed_height);
+        }
dash-spv-ffi/FFI_API.md (1)

1040-1061: Example code references removed API.

The example code uses dash_spv_ffi_client_set_event_callbacks with an FFIEventCallbacks struct, but the new API uses separate callback types (FFISyncEventCallbacks, FFINetworkEventCallbacks, FFIWalletEventCallbacks) with their respective setters. Since this is auto-generated documentation, consider updating the generation script to emit a current example.

dash-spv/src/sync/filters/batch.rs (1)

108-126: Equality based solely on start_height may cause unintended BTreeSet behavior.

The PartialEq and Ord implementations compare only start_height, meaning two batches with the same start height but different content (filters, end_height, etc.) are considered equal. While intentional for BTreeSet ordering, this prevents inserting multiple batches starting at the same height and could mask replacement bugs. Consider documenting this invariant prominently or using a newtype wrapper for the ordering key.

Modular, parallel, manager-based sync architecture. Each sync concern (`BlockHeadersManager`, `FilterHeadersManager`, `FiltersManager`, `BlocksManager`, `MasternodesManager`, `ChainLockManager`, `InstantSendManager`) has its own manager implementing a common `SyncManager` trait while the `SyncCoordinator` orchestrates them and coordinates the communication between them through a `SyncEvent` system.

Progress reporting is streamlined with each manager reporting its own progress through a unified SyncProgress system which updates the subscribers for every change in progress. Event handling follows the same pattern, with a consistent event model across sync, wallet, and network layers. The FFI can now subscribe to all events from sync progress, wallet, and network.

Filter sync got a lot of fixes and now processes filters and blocks in ordered batches. When gap limit maintenance generates new addresses a rescan is triggered automatically to pick up any transactions that match the new addresses.

Post sync is working stable now too, i was constantly running this branch in the last couple weeks monitoring things and didnt run into any more issues by now.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
dash-spv/src/client/sync_coordinator.rs (1)

39-67: ⚠️ Potential issue | 🟠 Major

Handle progress channel closure to avoid CPU spinning.
If the progress sender is dropped, changed() returns Err immediately, causing a tight loop where the select! branch completes, loops back, and calls changed() again with an immediate error. Guard the result and break when the channel closes.

🔧 Suggested fix
-                _ = progress_updates.changed() => {
-                    tracing::info!("Sync progress:{}", *progress_updates.borrow());
-                }
+                progress_result = progress_updates.changed() => {
+                    if progress_result.is_err() {
+                        tracing::debug!("Sync progress channel closed");
+                        break;
+                    }
+                    tracing::info!("Sync progress:{}", *progress_updates.borrow());
+                }
dash-spv-ffi/src/bin/ffi_cli.rs (1)

276-315: ⚠️ Potential issue | 🟡 Minor

CLI help text doesn’t match the actual data-dir default.

The help string says “unique directory in /tmp”, but the default is ".tmp/ffi-cli". Please align the help text or the default.

dash-spv/src/storage/filter_headers.rs (1)

22-39: ⚠️ Potential issue | 🟡 Minor

Guard against u32 overflow in height + 1.

When height == u32::MAX, the expression height + 1 wraps to 0, creating an invalid Range where start >= end. Although block heights are unrealistic to reach u32::MAX, add an explicit guard to prevent this edge case and ensure robustness.

🛡️ Suggested fix
-        Ok(self.load_filter_headers(height..height + 1).await?.first().copied())
+        if height == u32::MAX {
+            return Ok(None);
+        }
+        Ok(self
+            .load_filter_headers(height..height + 1)
+            .await?
+            .first()
+            .copied())
dash-spv-ffi/src/platform_integration.rs (1)

206-215: ⚠️ Potential issue | 🟠 Major

Extract platform activation heights to a canonical source.
Lines 211–214 hardcode per-network platform activation heights that duplicate values already defined in dash/src/sml/llmq_type/network.rs (lines 137–138). The mainnet height is explicitly marked "placeholder - needs verification," indicating it may change. Rather than hardcoding these values independently, use or call the canonical definition from the Network type, or extract them to a shared constant to avoid drift and maintain a single source of truth.

dash-spv-ffi/FFI_API.md (1)

1040-1061: ⚠️ Potential issue | 🟡 Minor

Example still references removed callback API.
The usage snippet uses FFIEventCallbacks and dash_spv_ffi_client_set_event_callbacks, which no longer exist. Please update to the per-domain *_set_*_event_callbacks APIs + dash_spv_ffi_client_run.

🤖 Fix all issues with AI agents
In `@dash-spv/src/client/lifecycle.rs`:
- Around line 219-233: The sync coordinator tasks are started before connecting
and are left running if network.connect() fails; to fix, ensure you call
self.sync_coordinator.shutdown().await (and ignore its result) before returning
the network.connect() error, or move the call to
self.sync_coordinator.start(&mut self.network).await until after
self.network.connect().await succeeds; update the code paths around
sync_coordinator.start, network.connect, and running so that any early return
due to network.connect failure cleans up via sync_coordinator.shutdown() to
avoid lingering SyncError::SyncInProgress state.

In `@dash-spv/src/sync/block_headers/manager.rs`:
- Around line 199-205: The current check swallows storage errors when calling
self.header_storage.read().await.get_header_height_by_hash(block_hash) by only
matching Ok(Some(_)); change it to propagate any Err from the storage lookup
instead of treating it as "missing" — e.g., match the Result and return or
propagate the error through the surrounding function's Result (using ? or
map_err to convert to your thiserror-defined error type) while still continuing
when Ok(None) and skipping when Ok(Some(_)); update any function signature to
return the appropriate error type if needed.

In `@dash-spv/src/sync/blocks/pipeline.rs`:
- Around line 107-120: The code in send_pending currently takes hashes via
coordinator.take_pending and calls requests.request_blocks but marks them sent
unconditionally, losing hashes if request_blocks fails; change the flow to call
requests.request_blocks(hashes.clone()) and if it returns Err, call
self.coordinator.enqueue_retry(hashes) to restore them to the front (honoring
retry limits) and do not call self.coordinator.mark_sent for that batch; only
call mark_sent(&hashes) and increment total_sent after a successful
request_blocks call. Ensure you reference the send_pending method,
RequestSender.request_blocks call, coordinator.take_pending,
coordinator.mark_sent, and coordinator.enqueue_retry when making the change.

In `@dash-spv/src/sync/filters/batch.rs`:
- Around line 31-48: The constructor FiltersBatch::new should validate that
start_height <= end_height and not allow invalid ranges to propagate; change its
signature to return Result<Self, E> (e.g., Result<Self, String> or a small
InvalidRange error) and inside new check if start_height > end_height then
return Err(...) otherwise return Ok(Self { start_height, end_height, filters,
verified: false, scanned: false, pending_blocks: 0, rescan_complete: false,
collected_addresses: HashSet::new() }); update callers of FiltersBatch::new to
propagate or handle the Result accordingly.
🧹 Nitpick comments (14)
dash-spv/src/validation/filter.rs (2)

51-53: Consider using sort_unstable() for minor performance improvement.

Since u32 keys don't have meaningful stability requirements, sort_unstable() avoids the allocation overhead of stable sorting.

♻️ Suggested change
         // Sort expected header heights to build chain correctly
         let mut heights: Vec<u32> = input.expected_headers.keys().copied().collect();
-        heights.sort();
+        heights.sort_unstable();

31-41: Consider supporting configurable validation modes.

Per coding guidelines for dash-spv/src/validation/**/*.rs, validators should implement configurable validation modes (ValidationMode::None, ValidationMode::Basic, ValidationMode::Full). This could allow callers to skip validation during trusted syncs or perform lighter checks for performance.

If filter validation should always be full for security reasons, documenting this design decision would be valuable.

Based on learnings: "Implement configurable validation modes (ValidationMode::None, ValidationMode::Basic, ValidationMode::Full)"

dash-spv-ffi/tests/integration/test_full_workflow.rs (1)

67-91: Unused callback definitions - consider removing or wiring them.

The callbacks on_sync_progress and on_sync_complete are defined but never registered with the FFI layer. They reference ctx through user_data but the test doesn't pass them to any FFI registration function.

Either wire these callbacks to the new FFI event API or remove them as dead code.

dash-spv/src/sync/events.rs (1)

116-121: Consider using a domain-specific error type for ManagerError.

The error: String field loses type information. Consider using a boxed error trait object or a dedicated enum to preserve error context for better debugging.

     ManagerError {
         /// Which manager encountered the error
         manager: ManagerIdentifier,
-        /// Error description
-        error: String,
+        /// Error that occurred
+        error: Box<dyn std::error::Error + Send + Sync>,
     },

This would require adjusting the Clone derive or using Arc for the error.

dash-spv/src/client/chainlock.rs (1)

32-46: Redundant drop(chain_state) - the lock is already released.

The chain_state read guard is acquired at line 32 but the block at lines 33-45 doesn't use it after line 37. The drop(chain_state) at line 46 is redundant since a new write lock is acquired at line 54 anyway.

♻️ Suggested cleanup
         }
-        drop(chain_state);
+        // chain_state read lock released here

Or restructure to acquire the read lock only within the block where it's needed.

dash-spv/src/network/manager.rs (1)

1099-1117: Minor: Lock acquired inside loop for each peer check.

The headers2_disabled.lock().await is called inside the loop for each peer in the headers2 capability check. For large peer sets, this could add latency. Consider acquiring the lock once before the loop.

Suggested optimization
             NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_) => {
                 // Prefer headers2 peers, fall back to all
+                let disabled_peers = self.headers2_disabled.lock().await;
                 let headers2_peers: Vec<_> = {
                     let mut result = Vec::new();
                     for (addr, peer) in &peers {
                         let peer_guard = peer.read().await;
                         if peer_guard.supports_headers2()
-                            && !self.headers2_disabled.lock().await.contains(addr)
+                            && !disabled_peers.contains(addr)
                         {
                             result.push((*addr, peer.clone()));
                         }
                     }
                     result
                 };
+                drop(disabled_peers);
dash-spv/src/storage/block_headers.rs (1)

53-62: Track the TODO with an issue or follow-up task.
Leaving a TODO on a public trait tends to linger; consider filing a ticket or scheduling the HashedBlockHeader-only API change. Happy to help implement it.

dash-spv/src/client/lifecycle.rs (1)

102-115: Replace .expect() with SpvError::Config error type for defensive configuration handling.

Even though the guard if config.enable_masternodes currently protects against panics here, using explicit error handling prevents latent panics if configuration flows evolve. The function already returns Result<Self>, so error propagation is idiomatic.

♻️ Suggested fix
-            let masternode_list_engine = masternode_engine
-                .clone()
-                .expect("Masternode list engine must exist if masternodes are enabled");
+            let masternode_list_engine = masternode_engine.clone().ok_or_else(|| {
+                SpvError::Config(
+                    "Masternode list engine must exist if masternodes are enabled".to_string(),
+                )
+            })?;
dash-spv/src/sync/filters/sync_manager.rs (1)

201-232: Consider extracting the TODO for send_pending decoupling.

The TODO on line 202 about decoupling send_pending from header storage is a reasonable architectural concern. The current implementation works but requires acquiring a read lock on header storage during tick, which could become a bottleneck.

Would you like me to open an issue to track this decoupling work for a future iteration?

dash-spv/src/sync/masternodes/pipeline.rs (1)

65-74: Minor: Log message may be misleading after repeated calls.

Line 72 logs self.base_hashes.len() which represents total queued items, not the count from the current queue_requests call. If called multiple times, this could be confusing.

🔧 Suggested improvement
     pub(super) fn queue_requests(&mut self, requests: Vec<(BlockHash, BlockHash)>) {
+        let added_count = requests.len();
         for (base_hash, target_hash) in requests {
             self.coordinator.enqueue([target_hash]);
             self.base_hashes.insert(target_hash, base_hash);
         }
 
-        if !self.base_hashes.is_empty() {
-            tracing::info!("Queued {} MnListDiff requests", self.base_hashes.len());
+        if added_count > 0 {
+            tracing::info!("Queued {} MnListDiff requests (total: {})", added_count, self.base_hashes.len());
         }
     }
dash-spv/src/sync/instantsend/manager.rs (1)

175-188: Consider using VecDeque for pending queue.

The pending_instantlocks uses Vec with remove(0) at line 179 when at capacity, which is O(n). Since this is a FIFO queue with additions at the end and removals from the front, VecDeque would provide O(1) operations for both.

♻️ Suggested improvement
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};

 pub struct InstantSendManager {
     // ...
-    pending_instantlocks: Vec<PendingInstantLock>,
+    pending_instantlocks: VecDeque<PendingInstantLock>,
 }

 fn queue_pending(&mut self, pending: PendingInstantLock) {
     if self.pending_instantlocks.len() >= MAX_PENDING_INSTANTLOCKS {
-        let dropped = self.pending_instantlocks.remove(0);
+        let dropped = self.pending_instantlocks.pop_front().unwrap();
         // ...
     }
-    self.pending_instantlocks.push(pending);
+    self.pending_instantlocks.push_back(pending);
 }
dash-spv/ARCHITECTURE.md (1)

116-116: Add language specifiers to fenced code blocks.

Static analysis flagged three code blocks without language specifiers. Adding specifiers improves syntax highlighting and accessibility.

🔧 Suggested fixes

For line 116 (architecture diagram):

-```
+```text
 ┌─────────────────────────────────────────────────────────┐

For line 999 (sync coordinator structure):

-```
+```text
 SyncCoordinator

For line 1074 (manager module structure):

-```
+```text
 sync/<manager>/

Also applies to: 999-999, 1074-1074

dash-spv/src/sync/sync_coordinator.rs (1)

322-355: Minor: Redundant identity map and potential misleading default for header_tip.

Two observations:

  1. Line 330: .map(move |p| p) is a no-op identity function that can be removed.
  2. Line 348: When progress.headers() returns Err, header_tip defaults to 0, which could be misleading in the SyncComplete event if headers haven't been tracked.
♻️ Suggested simplification
     let streams: Vec<_> =
-        receivers.into_iter().map(|rx| WatchStream::new(rx).map(move |p| p)).collect();
+        receivers.into_iter().map(WatchStream::new).collect();
dash-spv-ffi/src/types.rs (1)

144-329: Minor: Potential truncation in FFIInstantSendProgress conversion.

Line 323 casts progress.pending() to u32. If pending() returns a usize larger than u32::MAX, this would truncate silently. Consider using saturating conversion for robustness.

♻️ Suggested safer conversion
         FFIInstantSendProgress {
             state: progress.state().into(),
-            pending: progress.pending() as u32,
+            pending: progress.pending().min(u32::MAX as usize) as u32,
             valid: progress.valid(),

Copy link
Collaborator

@ZocoLini ZocoLini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i swear i reviewed everything :D

@xdustinface xdustinface merged commit 12abdc2 into v0.42-dev Feb 4, 2026
53 checks passed
@xdustinface xdustinface deleted the feat/sync-rewrite branch February 4, 2026 03:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants