diff --git a/docs/state_pruning.md b/docs/state_pruning.md new file mode 100644 index 0000000000..ffc975417c --- /dev/null +++ b/docs/state_pruning.md @@ -0,0 +1,83 @@ +# State Pruning Guide + +As of `v0.21.0` the state pruning feature is available. +This feature allows users to prune the state database to reduce the size of the database and improve performance. + +There are two parts of the state pruning feature: +1. Pruning the blocks; and +2. Pruning the state. + +## How to Use It + +If you wish to run a pruned node, the easiest way to do it is to: +1. Restore the node from the most recent checkpoint. +This mitigates the need to migrate any legacy keys. +Since you're going to prune anyway, the older blocks/state are unnecessary overhead. +2. Let the node active-sync and catch up with the head of the chain. +This is just to ensure that the node is up-to-date. +3. Set the `sync.prune_interval` configuration option to a positive integer. +If the number is too small, you may end up storing too many snapshots between compaction runs. +A good number is about a day's worth of blocks/state. +4. Restart the node. +You can monitor the logs for the "Snapshot:" messages. + +### Configuration + +To prune the blocks/state, users can use the `sync.prune_interval` configuration option. +If this option is set to a positive integer, the node will retain only the most recent `sync.prune_interval` blocks. +Any block older than that will be deleted from the SQL database. +Also, the node will prune the state database to reduce the size of the storage. + +As part of this process, the node will *drop* the SQL `state_trie` table. +This only impacts nodes that have existing state prior to `v0.18.0`. +But, the node does not `VACUUM` to reclaim the space from the SQL database, as this process can take a long time. +If the SQL disk space needs to be reclaimed, node operators should schedule and manually run the `VACUUM` command on the SQL database. + +## How it works + +The state pruning feature works by exploiting the RocksDB compaction feature. +Storage will get recovered as part of its normal background compaction operation that happens periodically and incrementally. +In fact, when it is first turned on, you may see an initial increase in storage usage. +But over time, the storage usage will decrease after several compaction runs. + +We have chosen to implement a feature that is adapted from the [User-defined Time-stamp](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp) feature of RocksDB. + +- When writing data to the trie-storage in RocksDB, each key is tagged with a *timestamp* suffix that is used to determine the order of the keys. +``` +|user-key + tag|seqno|type| +|<-----internal key------>| +``` + +- When reading trie-storage data from RocksDB, the timestamp is taken into account to ensure that only the most *recent* value is returned, for a specific key. +> Instead of using the block height as the timestamp, we use the block view instead. +This is because there can only be one block per view while there could potentially be more than one block per height. +This ensures that the timestamp is always monotonically increasing per block. + +- When the built-in compaction feature is triggered, any *stale* keys are removed and the disk space is recovered. +> By default RocksDB will ensure that compaction is triggered at least once every 30-days; but you can configure this behaviour by setting the `db.rocksdb_compaction_period` option. + +### Operation + +1. At each epoch, the node increments its internal timestamp *ceiling*, which will result in any new state being tagged with a higher timestamp. +2. The node will trigger a background operation to *snapshot* all active state that should be retained, by duplicating the entire state-trie with the higher timestamp. +This operation may take some time to complete, possibly several epochs, and the node will only allow one such operation at a time. +3. After the operation is complete, the node increments its internal timestamp *floor*; and any state with a timestamp below the *floor* will eventually be compacted away. + +### Conditions + +The conditions for the *ceiling* and the *floor* are that: +- The *ceiling* is always incremented i.e. new ceiling > old ceiling. +- The *floor* is always incremented i.e. new floor > old floor. +- The *ceiling* is always greater than the *floor* i.e. ceiling > floor. +- The *floor* always lags the *ceiling* i.e. floor == old ceiling. + +The snapshot operation will only be triggered if the lowest block view is greater than the current *ceiling*. +This condition ensures the safety that, the only states pruned are those that are absolutely no longer needed, since the block no longer exists in the SQL database. +This also means that the node may retain more state than it actually needs. +Considering the amount of state saved through pruning, this bit of extra state is negligible. + +### Key Migration + +For nodes that disable pruning, the existing keys need to be migrated to the new tagged-keys. +This process is done lazily, as nodes are being read from time to time. +Whenever a legacy-key node is read from the database, the key is migrated to the new tagged-key format and deleted by compaction. diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index 18b3fa03ff..3a8b6d76b0 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -174,18 +174,12 @@ pub struct DbConfig { /// Whether to enable state-sync/state-migration #[serde(default)] pub state_sync: bool, + /// config.toml setting is ignored. + #[serde(default)] + pub state_prune: bool, /// RocksDB block cache size, in bytes. #[serde(default = "rocksdb_cache_size_default")] pub rocksdb_cache_size: usize, - /// RocksDB periodic compaction seconds. - #[serde(default = "rocksdb_compaction_period_default")] - pub rocksdb_compaction_period: u64, - /// RocksDB max open files. - #[serde(default = "rocksdb_max_open_files_default")] - pub rocksdb_max_open_files: i32, - /// RocksDB cache index/filters - #[serde(default = "rocksdb_cache_index_filters_default")] - pub rocksdb_cache_index_filters: bool, /// State cache size #[serde(default = "rocksdb_state_cache_size_default")] pub rocksdb_state_cache_size: usize, @@ -193,28 +187,16 @@ pub struct DbConfig { #[serde(default = "rocksdb_block_size_default")] pub rocksdb_block_size: usize, /// Target File Size - #[serde(default = "rocksdb_target_file_size_default")] - pub rocksdb_target_file_size: u64, + #[serde(default = "rocksdb_memtable_budget_default")] + pub rocksdb_memtable_budget: usize, } fn rocksdb_block_size_default() -> usize { 1 << 14 // 16KB reduces in-memory indexes } -fn rocksdb_target_file_size_default() -> u64 { - 1 << 28 // 256MB reduces number of open files -} - -fn rocksdb_cache_index_filters_default() -> bool { - false // true: mitigate OOM; false: better performance. -} - -fn rocksdb_max_open_files_default() -> i32 { - -1 // Set max_open_files to -1 to always keep all files open, which avoids expensive table cache calls. -} - -fn rocksdb_compaction_period_default() -> u64 { - u64::MAX - 1 // allow rocksdb to decide +fn rocksdb_memtable_budget_default() -> usize { + 1 << 31 // 2GB is a reasonable default } fn rocksdb_cache_size_default() -> usize { @@ -222,7 +204,7 @@ fn rocksdb_cache_size_default() -> usize { } fn rocksdb_state_cache_size_default() -> usize { - 1 << 31 + 1 << 31 // 2GB is a reasonable default } fn sql_cache_size_default() -> usize { @@ -239,13 +221,11 @@ impl Default for DbConfig { conn_cache_size: sql_cache_size_default(), auto_checkpoint: sql_auto_checkpoint_default(), state_sync: false, + state_prune: false, rocksdb_cache_size: rocksdb_cache_size_default(), - rocksdb_compaction_period: rocksdb_compaction_period_default(), - rocksdb_max_open_files: rocksdb_max_open_files_default(), - rocksdb_cache_index_filters: rocksdb_cache_index_filters_default(), rocksdb_state_cache_size: rocksdb_state_cache_size_default(), rocksdb_block_size: rocksdb_block_size_default(), - rocksdb_target_file_size: rocksdb_target_file_size_default(), + rocksdb_memtable_budget: rocksdb_memtable_budget_default(), } } } @@ -376,31 +356,33 @@ impl NodeConfig { } } + // deprecated settings anyhow::ensure!( self.state_cache_size == state_cache_size_default(), "state_cache_size is deprecated. Use db.rocksdb_cache_size and db.rocksdb_state_cache_size instead." ); - + anyhow::ensure!(!self.db.state_prune, "db.state_prune must not be set"); + // sync/prune settings anyhow::ensure!( self.sync.base_height == u64_max() || self.sync.prune_interval == u64_max(), - "base_height and prune_interval cannot be set at the same time" + "sync.base_height and sync.prune_interval cannot be set at the same time" ); - // when set, >> 15 to avoid pruning forks; > 256 to be EVM-safe; arbitrarily picked. anyhow::ensure!( self.sync.prune_interval >= MIN_PRUNE_INTERVAL, - "prune_interval must be at least {MIN_PRUNE_INTERVAL}", + "sync.prune_interval must be at least {MIN_PRUNE_INTERVAL}", + ); + anyhow::ensure!( + self.sync.prune_interval == u64::MAX || !self.do_checkpoints, + "state_prune and do_checkpoints cannot be set at the same time" ); - // 10 is a reasonable minimum for a node to be useful. anyhow::ensure!( self.sync.block_request_batch_size >= 10, "block_request_batch_size must be at least 10" ); - // 1000 would saturate a typical node. anyhow::ensure!( self.sync.max_blocks_in_flight <= 1000, "max_blocks_in_flight must be at most 1000" ); - // the minimum required for the next leader selection anyhow::ensure!( self.max_missed_view_age >= MISSED_VIEW_WINDOW, "max_missed_view_age must be at least {MISSED_VIEW_WINDOW}" diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index c71a0ddb51..75abe11315 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -247,6 +247,8 @@ pub struct Consensus { force_view: Option<(u64, DateTime)>, /// Mark if this node is in the committee at it's current head block height in_committee: bool, + /// Prune interval, if applicable + prune_interval: u64, } impl Consensus { @@ -387,11 +389,15 @@ impl Consensus { let forks = config.consensus.get_forks()?; let enable_ots_indices = config.enable_ots_indices; + // pre-compute how often state snapshots are taken, if at all. + let bpe = config.consensus.blocks_per_epoch; + let prune_interval = ((config.sync.prune_interval / bpe) * bpe).saturating_add(bpe); + let mut consensus = Consensus { secret_key, config, sync, - message_sender, + message_sender: message_sender.clone(), reset_timeout, votes: DashMap::new(), buffered_votes: DashMap::new(), @@ -411,6 +417,7 @@ impl Consensus { new_transaction_hashes: broadcast::Sender::new(128), force_view: None, in_committee: true, + prune_interval, }; // If we're at genesis, add the genesis block and return @@ -579,7 +586,8 @@ impl Consensus { } // Initialize state trie storage - consensus.db.state_trie()?.init_state_trie(forks)?; + let state_trie = consensus.db.state_trie()?; + state_trie.init_state_trie(forks)?; // If timestamp of when current high_qc was written exists then use it to estimate the minimum number of blocks the network has moved on since shut down // This is useful in scenarios in which consensus has failed since this node went down @@ -2415,46 +2423,57 @@ impl Consensus { } } - if self.block_is_first_in_epoch(block.number()) - && !block.is_genesis() - && self.config.do_checkpoints - && self.epoch_is_checkpoint(self.epoch_number(block.number())) - && let Some(checkpoint_path) = self.db.get_checkpoint_dir()? - { - let parent = self - .db - .get_block(block.parent_hash().into())? - .ok_or(anyhow!( - "Trying to checkpoint block, but we don't have its parent" - ))?; - let transactions: Vec = block - .transactions - .iter() - .map(|txn_hash| { - let tx = self.db.get_transaction(txn_hash)?.ok_or(anyhow!( - "failed to fetch transaction {} for checkpoint parent {}", - txn_hash, - parent.hash() - ))?; - Ok::<_, anyhow::Error>(tx.tx) - }) - .collect::>>()?; - - self.message_sender.send_message_to_coordinator( - InternalMessage::ExportBlockCheckpoint( - Box::new(block), - transactions, - Box::new(parent), - self.db.state_trie()?.clone(), - self.state.view_history.read().clone(), - checkpoint_path, - ), - )?; + if self.block_is_first_in_epoch(block.number()) && !block.is_genesis() { + // Do snapshots + // at epoch/block boundaries to avoid state inconsistencies. + if block.number().is_multiple_of(self.prune_interval) { + let range = self.db.available_range()?; + self.snapshot_at(*range.start(), block.view())?; + } + // Do checkpoints + if self.config.do_checkpoints + && self.db.get_checkpoint_dir()?.is_some() + && self.epoch_is_checkpoint(self.epoch_number(block.number())) + { + self.checkpoint_at(block.number())?; + } } Ok(()) } + /// Trigger a snapshot + pub fn snapshot_at(&self, block_number: u64, new_ceil: u64) -> Result<()> { + // skip if there is a snapshot in progress. + let Some(mut tag_lock) = self.db.tag_view.try_lock() else { + return Ok(()); + }; + // error if the lowest block does not exist + let Some(block) = self.get_canonical_block_by_number(block_number)? else { + return Err(anyhow::format_err!("Snapshot: missing block")); + }; + // skip if the lowest block unsafe to snapshot + // 'unsafe' means that the block exists in a tag range that could be pruned away during compaction. + if block.view() < *tag_lock { + return Ok(()); + } + + let trie_storage = self.db.state_trie()?; + // raise the ceiling to the new tag, promoting all new state + let old_ceil = trie_storage.set_tag_ceil(new_ceil)?; + // store the previous tag, which is the next floor. + *tag_lock = old_ceil; + tracing::info!(block_number, new_ceil, old_ceil, "Snapshot: trigger"); + + // trigger snapshot + self.message_sender + .send_message_to_coordinator(InternalMessage::SnapshotTrie( + trie_storage, + block.state_root_hash().into(), + block_number, + )) + } + /// Trigger a checkpoint, for debugging. /// Returns (file_name, block_hash). At some time after you call this function, hopefully a checkpoint will end up in the file pub fn checkpoint_at(&self, block_number: u64) -> Result<(String, String)> { diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index d10531b2ed..63a940ae35 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -6,7 +6,10 @@ use std::{ num::NonZeroUsize, ops::RangeInclusive, path::{Path, PathBuf}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, time::Duration, }; @@ -15,10 +18,13 @@ use anyhow::{Context, Result, anyhow}; #[allow(unused_imports)] use eth_trie::{DB, EthTrie, MemoryDB, Trie}; use lru::LruCache; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; -use rocksdb::{BlockBasedOptions, Cache, DBWithThreadMode, Options, SingleThreaded}; +use revm::primitives::B256; +use rocksdb::{ + BlockBasedOptions, Cache, CompactionDecision, DBWithThreadMode, Options, SingleThreaded, +}; use rusqlite::{ Connection, OptionalExtension, Row, ToSql, named_params, types::{FromSql, FromSqlError, ToSqlOutput}, @@ -34,7 +40,7 @@ use crate::{ precompiles::ViewHistory, time::SystemTime, transaction::{EvmGas, Log, SignedTransaction, TransactionReceipt, VerifiedTransaction}, - trie_storage::TrieStorage, + trie_storage::{LEGACY_KEY_LEN, ROCKSDB_TAGGING_AT, TAGGED_KEY_LEN, TrieStorage}, }; const MAX_KEYS_IN_SINGLE_QUERY: usize = 32765; @@ -256,6 +262,10 @@ pub struct Db { executable_blocks_height: Option, /// Clone of DbConfig pub config: DbConfig, + /// State Pruning + rev_ceil: Arc, // always set to the finalised view height; set by Db::set_finalised_view() + rev_floor: Arc, // resets to u64::MAX at startup; gets set by Db::snapshot() + pub tag_view: Arc>, // used to lock the promotion process } impl Db { @@ -311,46 +321,13 @@ impl Db { let connection = pool.get()?; Self::ensure_schema(&connection)?; - // RocksDB configuration - let mut block_opts = BlockBasedOptions::default(); - // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter - block_opts.set_ribbon_filter(10.0); - block_opts.set_optimize_filters_for_memory(true); // reduce memory wastage with JeMalloc - // Mitigate OOM - block_opts.set_cache_index_and_filter_blocks(config.rocksdb_cache_index_filters); - // Improve cache utilisation - block_opts.set_pin_top_level_index_and_filter(true); - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - block_opts.set_index_type(rocksdb::BlockBasedIndexType::TwoLevelIndexSearch); - block_opts.set_partition_filters(true); - block_opts.set_block_size(config.rocksdb_block_size); - block_opts.set_metadata_block_size(config.rocksdb_block_size); - - let cache = - Cache::new_hyper_clock_cache(config.rocksdb_cache_size, config.rocksdb_block_size); - block_opts.set_block_cache(&cache); - - let mut rdb_opts = Options::default(); - rdb_opts.create_if_missing(true); - rdb_opts.set_block_based_table_factory(&block_opts); - rdb_opts.set_periodic_compaction_seconds(config.rocksdb_compaction_period); - // Mitigate OOM - prevent opening too many files at a time - rdb_opts.set_max_open_files(config.rocksdb_max_open_files); - // Reduce reads - rdb_opts.set_level_compaction_dynamic_level_bytes(true); - rdb_opts.set_target_file_size_base(config.rocksdb_target_file_size); - rdb_opts.set_max_bytes_for_level_base(config.rocksdb_target_file_size << 2); - // Reduce storage - rdb_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); - rdb_opts.set_bottommost_zstd_max_train_bytes(0, true); - // Should be safe in single-threaded mode // https://docs.rs/rocksdb/latest/rocksdb/type.DB.html#limited-performance-implication-for-single-threaded-mode let rdb_path = path.as_ref().map_or_else( || tempfile::tempdir().unwrap().path().join("state.rocksdb"), |p| p.join("state.rocksdb"), ); + let (rdb_opts, rev_floor) = Self::init_rocksdb(config.clone()); let rdb = DBWithThreadMode::::open(&rdb_opts, rdb_path)?; tracing::info!( @@ -363,13 +340,23 @@ impl Db { let cache = LruCache::new(NonZeroUsize::new(config.rocksdb_state_cache_size / 500).unwrap()); + // Use the last known tag for keys. + let last_tag = rdb.get(ROCKSDB_TAGGING_AT)?.map_or(u64::MAX, |v| { + u64::from_be_bytes(v.try_into().expect("8-bytes")) + }); + let rev_ceil = Arc::new(AtomicU64::new(last_tag)); // stores the reverse view + let tag_view = Arc::new(Mutex::new(u64::MAX.saturating_sub(last_tag))); // stores the equivalent view + Ok(Db { pool: Arc::new(pool), - path, - executable_blocks_height, kvdb: Arc::new(rdb), cache: Arc::new(RwLock::new(cache)), + path, + executable_blocks_height, config, + rev_ceil, + rev_floor, + tag_view, }) } @@ -651,6 +638,71 @@ impl Db { Ok(()) } + fn init_rocksdb(config: DbConfig) -> (rocksdb::Options, Arc) { + let mut block_opts = BlockBasedOptions::default(); + // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter + block_opts.set_ribbon_filter(10.0); + block_opts.set_optimize_filters_for_memory(true); // reduce memory wastage with JeMalloc + // Mitigate OOM + block_opts.set_cache_index_and_filter_blocks(true); + // Improve cache utilisation + block_opts.set_pin_top_level_index_and_filter(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_index_type(rocksdb::BlockBasedIndexType::TwoLevelIndexSearch); + block_opts.set_partition_filters(true); + block_opts.set_block_size(config.rocksdb_block_size); + block_opts.set_metadata_block_size(config.rocksdb_block_size); + // RocksDB cache + let cache = Cache::new_lru_cache(config.rocksdb_cache_size); + block_opts.set_block_cache(&cache); + + // RocksDB configuration + let mut rdb_opts = Options::default(); + // Set optimised defaults + rdb_opts.optimize_level_style_compaction(config.rocksdb_memtable_budget); + rdb_opts.create_if_missing(true); + rdb_opts.set_block_based_table_factory(&block_opts); + // Reduce storage + rdb_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); + rdb_opts.set_bottommost_zstd_max_train_bytes(0, true); + // Increase background threads + rdb_opts.increase_parallelism(crate::available_threads() as i32); + + let tag_floor = Arc::new(AtomicU64::new(u64::MAX)); + // Use the default compaction filter, unless pruning is enabled + if config.state_prune { + tracing::info!("SnapshotCompactionFilter"); + let rev_floor = tag_floor.clone(); + // Keys are only removed if they are older than the floor value, which is set during a snapshot. + // After pruning, old and legacy keys are eventually removed when the background compaction runs. + rdb_opts.set_compaction_filter( + "SnapshotCompactionFilter", + move |_lvl, key, _value| -> CompactionDecision { + match key.len() { + // 40-bytes: remove tagged key, if the key is 'older' than the floor. + // The `rev_floor` is set to u64::MAX during startup; and only changed *after* a snapshot is taken. + // If a snapshot gets interrupted for whatever reason, the floor remains unchanged; partial snapshots are safe. + TAGGED_KEY_LEN + if u64::from_be_bytes(key[32..40].try_into().unwrap()) + > rev_floor.load(Ordering::Relaxed) => + { + CompactionDecision::Remove + } + // 32-bytes: remove legacy key, if snapshot already taken + // Legacy keys are only removed after a snapshot of a more recent state has been taken. + LEGACY_KEY_LEN if rev_floor.load(Ordering::Relaxed) != u64::MAX => { + CompactionDecision::Remove + } + // default to keep, all other keys + _ => CompactionDecision::Keep, + } + }, + ); + } + (rdb_opts, tag_floor) + } + // SQLite performance tweaks fn init_connection( connection: &mut Connection, @@ -812,6 +864,10 @@ impl Db { self.pool.clone(), self.kvdb.clone(), self.cache.clone(), + self.rev_ceil.clone(), + self.rev_floor.clone(), + self.tag_view.clone(), + self.config.state_prune, )) } @@ -1643,6 +1699,25 @@ impl Db { } } +/// Snapshot the state trie. +/// +/// Promotes the tag of each node to the given view. This process may take a while to complete. +/// The `tag_lock` ensures that only one snapshot is in progress at a time. +/// The previous state trie will be eventually pruned during compaction. +pub fn snapshot_trie(storage: TrieStorage, root_hash: B256, block_number: u64) -> Result<()> { + let trie = Arc::new(storage); + let tag_lock = trie.tag_view.lock(); + let new_tag = *tag_lock; + tracing::info!(%root_hash, block_number, new_tag, "Snapshot: start"); + TrieStorage::snapshot(trie.clone(), root_hash)?; + let old_tag = trie.set_tag_floor(new_tag)?; + if old_tag != 0 { + trie.drop_sql_state_trie()?; // delete SQL database + } + tracing::info!(%root_hash, block_number, new_tag, old_tag, "Snapshot: done"); + Ok(()) // not fatal, it can be retried later. +} + pub fn get_checkpoint_filename + Debug>( output_dir: P, block: &Block, diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index b609767682..986306c624 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -394,6 +394,8 @@ pub enum InternalMessage { SubscribeToGossipSubTopic(GossipSubTopic), /// Notify p2p cordinator to unsubscribe from a particular gossipsub topic UnsubscribeFromGossipSubTopic(GossipSubTopic), + /// Snapshot a trie at a given point + SnapshotTrie(TrieStorage, B256, u64), } #[derive(Debug, Clone)] @@ -420,6 +422,9 @@ impl Display for InternalMessage { InternalMessage::UnsubscribeFromGossipSubTopic(topic) => { write!(f, "UnsubscribeFromGossipSubTopic({topic:?})") } + InternalMessage::SnapshotTrie(_storage, hash, view) => { + write!(f, "SnapshotTrie({hash:?}) {view})") + } } } } diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index 0263be10cf..c86a8a4bf0 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -211,11 +211,16 @@ impl Node { .consensus .get_forks()? .find_height_fork_first_activated(ForkName::ExecutableBlocks); + + // FIXME: This is a hack, we should factorise pruning logic out of sync/db. + let mut db_config = config.db.clone(); + db_config.state_prune = config.sync.prune_interval != u64::MAX; + let db = Arc::new(Db::new( config.data_dir.as_ref(), config.eth_chain_id, executable_blocks_height, - config.db.clone(), + db_config, )?); let node = Node { config: config.clone(), diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index f6f2b68842..9f639c7d1d 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -484,6 +484,14 @@ impl P2pNode { self.swarm.behaviour_mut().gossipsub.unsubscribe(&Self::validator_topic(shard_id)); } } + InternalMessage::SnapshotTrie(trie, hash, view) => { + self.task_threads.spawn(async move { + if let Err(e) = db::snapshot_trie(trie, hash, view) { + tracing::error!(error = %e, "Snapshot failed"); + } + Ok(()) + }); + } } }, message = self.request_responses_receiver.next() => { diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index a1e02f947f..aba6bb507f 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -1,18 +1,26 @@ -use std::sync::Arc; +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; use anyhow::Result; +use eth_trie::{DB, EthTrie, Trie as _}; use lru::LruCache; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; +use revm::primitives::B256; use rocksdb::WriteBatch; use rusqlite::OptionalExtension; -use crate::{cfg::Forks, crypto::Hash}; +use crate::{cfg::Forks, crypto::Hash, state::Account}; /// Special storage keys const ROCKSDB_MIGRATE_AT: &str = "migrate_at"; const ROCKSDB_CUTOVER_AT: &str = "cutover_at"; +pub const ROCKSDB_TAGGING_AT: &str = "tagging_at"; +pub const LEGACY_KEY_LEN: usize = 32; +pub const TAGGED_KEY_LEN: usize = 40; /// An implementor of [eth_trie::DB] which uses a [rocksdb::DB]/[rusqlite::Connection] to persist data. #[derive(Debug, Clone)] @@ -20,6 +28,11 @@ pub struct TrieStorage { pool: Arc>, kvdb: Arc, cache: Arc, Vec>>>, + // the tag_* values are stored in the reverse height order i.e. u64::MAX is genesis. + rev_ceil: Arc, // used to tag every write to the state database; reverse-ordered. + rev_floor: Arc, // used to mark the compaction boundary; reverse-ordered. + pub tag_view: Arc>, // used to lock the snapshot process; forward-ordered. + state_prune: bool, } impl TrieStorage { @@ -27,11 +40,96 @@ impl TrieStorage { pool: Arc>, kvdb: Arc, cache: Arc, Vec>>>, + rev_ceil: Arc, + rev_floor: Arc, + tag_view: Arc>, + state_prune: bool, ) -> Self { - Self { pool, kvdb, cache } + Self { + pool, + kvdb, + cache, + rev_ceil, + rev_floor, + tag_view, + state_prune, + } + } + + /// Truncate the state_trie table + pub fn drop_sql_state_trie(&self) -> Result<()> { + let sql = self.pool.get().unwrap(); + sql.execute("DROP TABLE IF EXISTS state_trie", [])?; // Use DROP TABLE faster + Ok(()) + } + + /// This snapshot promotes the *active* keys to the tag + /// + /// This works on duplicating the underlying db-level keys, not the trie-level keys. + /// e.g. 500 trie-level random keys takes ~150 db-level keys. + // + // Previously, no deletion of keys was allowed in the state database. So, it is safe to repurpose clear_trie_from_db() to + // snapshot-to-promote the trie, rather than delete it. + pub fn snapshot(trie_storage: Arc, state_root_hash: B256) -> Result<()> { + let mut state_trie = EthTrie::new(trie_storage.clone()).at_root(state_root_hash); + for akv in state_trie.iter() { + let (_key, serialised_account) = akv?; + let account_root = Account::try_from(serialised_account.as_slice())?.storage_root; + let mut account_trie = EthTrie::new(trie_storage.clone()).at_root(account_root); + // repurpose clear_trie_from_db() to promote the trie; root_hash() forces a commit. + let _ = account_trie.root_hash()?; + account_trie.clear_trie_from_db()?; + } + // repurpose clear_trie_from_db() to promote the trie; root_hash() forces a commit. + let _ = state_trie.root_hash()?; + state_trie.clear_trie_from_db()?; + // force flush to disk + trie_storage.flush()?; + Ok(()) + } + + /// Set the tag floor to the given view, returning previous tag + /// + /// This ensures that: floor != ceil && new_floor 'increments' old_floor. + pub fn set_tag_floor(&self, view: u64) -> Result { + let new_tag = u64::MAX.saturating_sub(view); // reverse-ordered + let tag_floor = self.rev_floor.load(Ordering::Relaxed); + anyhow::ensure!(new_tag <= tag_floor, "{new_tag} <= {tag_floor}"); + let tag_ceil = self.rev_ceil.load(Ordering::Relaxed); + anyhow::ensure!(new_tag > tag_ceil, "{new_tag} > {tag_ceil}"); + self.rev_floor.store(new_tag, Ordering::Relaxed); + Ok(u64::MAX.saturating_sub(tag_floor)) } - pub fn write_batch(&self, keys: Vec>, values: Vec>) -> Result<()> { + /// Set the tag to the given view, returning the previous tag + /// + /// This ensures that: floor != ceil && new_ceil 'increments' old_ceil. + pub fn set_tag_ceil(&self, view: u64) -> Result { + let new_tag = u64::MAX.saturating_sub(view); // reverse-ordered + let tag_ceil = self.rev_ceil.load(Ordering::Relaxed); + anyhow::ensure!(new_tag <= tag_ceil, "{new_tag} <= {tag_ceil}"); + let tag_floor = self.rev_floor.load(Ordering::Relaxed); + anyhow::ensure!(new_tag < tag_floor, "{new_tag} < {tag_floor}"); + self.kvdb.put(ROCKSDB_TAGGING_AT, new_tag.to_be_bytes())?; + self.rev_ceil.store(new_tag, Ordering::Relaxed); + Ok(u64::MAX.saturating_sub(tag_ceil)) + } + + /// Writes a batch of key-value pairs to the database. + /// + /// Since the tagging is only performed here, only Trie keys are tagged, leaving other keys untagged. + /// The other keys stored in this database are e.g. internal settings. + // + // This is the tagging scheme used. Each tag is a U64 in big-endian format. + // |user-key + tag|seqno|type| + // |<-----internal key------>| + fn write_batch( + &self, + keys: Vec>, + values: Vec>, + tag: [u8; 8], + is_migration: bool, + ) -> Result<()> { if keys.is_empty() { return Ok(()); } @@ -40,13 +138,75 @@ impl TrieStorage { let mut batch = WriteBatch::default(); let mut cache = self.cache.write(); - for (key, value) in keys.into_iter().zip(values.into_iter()) { - batch.put(key.as_slice(), value.as_slice()); - cache.put(key, value); + for (key_prefix, value) in keys.into_iter().zip(values.into_iter()) { + // tag keys; lexicographically sorted + let mut tag_key = key_prefix.clone(); + tag_key.extend_from_slice(tag.as_slice()); // suffix big-endian tags + batch.put(tag_key.as_slice(), value.as_slice()); + + // If migration, bypass cache, and delete the old key + if !is_migration { + cache.put(key_prefix, value); + } else { + batch.delete(key_prefix.as_slice()); + } } Ok(self.kvdb.write(batch)?) } + /// This function retrieves the 'latest' value, at all times. + /// + /// Legacy keys do not have a tag and are ordered first due to lexicographical sorting used by rocksdb. + /// We peek the next key to determine if the legacy key is the latest, or if a later tag key is present. + // + // This is the tagging scheme used. Each tag is a U64 in big-endian format. + // |user-key + tag|seqno|type| + // |<-----internal key------>| + fn get_tag_value(&self, key_prefix: &[u8]) -> Result>> { + let mut iter = self.kvdb.prefix_iterator(key_prefix); + // early return if user-key/prefix is not found + let Some((key, value)) = iter.next().transpose()? else { + return Ok(None); + }; + if !key.starts_with(key_prefix) { + return Ok(None); + } + + // Given that the trie keys are *all* Keccak256 keys, the legacy keys are exactly 32-bytes (256-bits) long, + // while the new tag keys are exactly 40-bytes (320-bits) long. We do not expect any other trie-key lengths. + match key.len() { + TAGGED_KEY_LEN => { + // latest tag key, naturally the most recent due to lexicographical order. + Ok(Some(value.to_vec())) + } + LEGACY_KEY_LEN => { + // legacy key - peek to see if a later tag key is present + if let Some((k, v)) = iter.next().transpose()? + && k.starts_with(key_prefix) + { + // Lazily delete the legacy key, if state_prune is false. + if !self.state_prune { + self.kvdb.delete(key)?; + } + return Ok(Some(v.to_vec())); // tag key has newer value + } + // Migration fall-thru. + // If state_prune is true, migration will naturally happen during promotion. + // Lazily migrate the key, if state_prune is false. + if !self.state_prune { + self.write_batch( + vec![key.to_vec()], + vec![value.to_vec()], + self.rev_ceil.load(Ordering::Relaxed).to_be_bytes(), + true, + )?; + } + Ok(Some(value.to_vec())) // fall-thru value; return legacy value + } + _ => unimplemented!("unsupported trie key length: {} bytes", key.len()), + } + } + pub fn init_state_trie(&self, _forks: Forks) -> Result<()> { let rdb = self.kvdb.clone(); if rdb.get(ROCKSDB_CUTOVER_AT)?.is_none() { @@ -70,7 +230,7 @@ impl TrieStorage { Ok(self .kvdb .get(ROCKSDB_MIGRATE_AT)? - .map(|v| u64::from_be_bytes(v.try_into().expect("must be 8-bytes"))) + .map(|v| u64::from_be_bytes(v.try_into().expect("8-bytes"))) .unwrap_or(u64::MAX)) // default to no state-sync } @@ -79,7 +239,7 @@ impl TrieStorage { Ok(self .kvdb .get(ROCKSDB_CUTOVER_AT)? - .map(|v| u64::from_be_bytes(v.try_into().expect("must be 8-bytes"))) + .map(|v| u64::from_be_bytes(v.try_into().expect("8-bytes"))) .unwrap_or_default()) } @@ -119,21 +279,29 @@ impl TrieStorage { self.kvdb.put(ROCKSDB_MIGRATE_AT, u64::MAX.to_be_bytes())?; Ok(()) } + + #[cfg(test)] + // test artifacts + fn inc_tag(&self, new_height: u64) -> Result<()> { + let new_tag = u64::MAX.saturating_sub(new_height); + self.rev_ceil + .store(new_tag, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } } impl eth_trie::DB for TrieStorage { type Error = eth_trie::TrieError; fn get(&self, key: &[u8]) -> Result>, Self::Error> { - // L1 - Lru mem - if let Some(value) = self.cache.write().get(key) { + // L1 - pseudo-lru + if let Some(value) = self.cache.read().peek(key) { return Ok(Some(value.clone())); } // L2 - rocksdb if let Some(value) = self - .kvdb - .get(key) + .get_tag_value(key) .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? { self.cache.write().put(key.to_vec(), value.clone()); @@ -152,10 +320,14 @@ impl eth_trie::DB for TrieStorage { .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; if let Some(value) = value { - self.cache.write().put(key.to_vec(), value.clone()); - self.kvdb - .put(key, value.as_slice()) - .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; + // lazy migration + self.write_batch( + vec![key.to_vec()], + vec![value.clone()], + self.rev_ceil.load(Ordering::Relaxed).to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; return Ok(Some(value)); } @@ -164,29 +336,255 @@ impl eth_trie::DB for TrieStorage { #[inline] fn insert(&self, key: &[u8], value: Vec) -> Result<(), Self::Error> { - self.write_batch(vec![key.to_vec()], vec![value]) - .map_err(|e| eth_trie::TrieError::DB(e.to_string())) + self.write_batch( + vec![key.to_vec()], + vec![value], + self.rev_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } #[inline] fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { - self.write_batch(keys, values) - .map_err(|e| eth_trie::TrieError::DB(e.to_string())) + self.write_batch( + keys, + values, + self.rev_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } + #[inline] fn flush(&self) -> Result<(), Self::Error> { self.kvdb .flush() .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } - fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { - // we keep old state to function as an archive node, therefore no-op - Ok(()) + // Since clear_trie_from_db() iterates over all db-level keys in the trie; this will end up + // promoting the trie at the db-level, without iterating the trie-level keys. + // ** only called from clear_trie_from_db() ** + fn remove(&self, promote_key: &[u8]) -> Result<(), Self::Error> { + if let Some(value) = self + .get_tag_value(promote_key) + .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? + { + self.write_batch( + vec![promote_key.to_vec()], + vec![value], + self.rev_ceil + .load(Ordering::Relaxed) + .saturating_add(1) // push it one-down, to keep only 1 snapshot on-disk. + .to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string())) + } else { + Err(eth_trie::TrieError::DB("clear trie not found".to_string())) + } } fn remove_batch(&self, _: &[Vec]) -> Result<(), Self::Error> { - // we keep old state to function as an archive node, therefore no-op + // TODO: Possibly remove intermediate keys. Ok(()) } } + +#[cfg(test)] +mod tests { + use eth_trie::DB; + use tempfile::tempdir; + + use super::*; + use crate::crypto::SecretKey; + + #[allow(clippy::type_complexity)] + fn setup() -> ( + Arc>, + Arc, + Arc, Vec>>>, + Arc, + ) { + let sql = Arc::new( + Pool::builder() + .build(SqliteConnectionManager::memory()) + .unwrap(), + ); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap().keep()).unwrap()); + let tag = Arc::new(AtomicU64::new(u64::MAX)); + let lock = Arc::new(Mutex::new(u64::MIN)); + let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let trie_storage = TrieStorage::new( + sql.clone(), + rdb.clone(), + cache.clone(), + tag.clone(), + tag.clone(), + lock.clone(), + false, + ); + let trie_storage = Arc::new(trie_storage); + (sql, rdb, cache, trie_storage) + } + + #[test] + fn snapshot_doubles_the_nodes() { + let (_, rdb, _, trie_storage) = setup(); + + let mut pmt = EthTrie::new(trie_storage.clone()); + pmt.root_hash().unwrap(); // create one 'empty' record + assert_eq!(rdb.iterator(rocksdb::IteratorMode::Start).count(), 1); + + // create 10 random accounts + let account = Account::default(); + let value = bincode::serde::encode_to_vec(account, bincode::config::legacy()).unwrap(); + for _ in 0..100 { + let key = SecretKey::new().unwrap().as_bytes(); + pmt.insert(key.as_slice(), value.as_slice()).unwrap(); + } + + // write-to-disk; and count nodes + trie_storage.inc_tag(u64::MIN).unwrap(); + let root_hash = pmt.root_hash().unwrap(); // write to disk + let old_count = rdb.iterator(rocksdb::IteratorMode::Start).count(); + assert!(old_count > 1); + + // snapshot-to-promote nodes; and count nodes + trie_storage.inc_tag(u64::MAX).unwrap(); + TrieStorage::snapshot(trie_storage.clone(), root_hash).unwrap(); + let new_count = rdb.iterator(rocksdb::IteratorMode::Start).count(); + assert_eq!(new_count, old_count * 2); + } + + #[test] + // lazy migration from sqlite to rocksdb, works. + fn lazy_migration_from_sqlite() { + let (sql, rdb, _, trie_storage) = setup(); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + let conn = sql.get().unwrap(); + conn.execute("CREATE TABLE IF NOT EXISTS state_trie (key BLOB NOT NULL PRIMARY KEY, value BLOB NOT NULL) WITHOUT ROWID;", []).unwrap(); + + // read missing key + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value, None); + + // migrate key - from sql + conn.execute( + "INSERT INTO state_trie (key, value) VALUES (?1, ?2)", + [key_prefix.as_slice(), b"sql_value".to_vec().as_slice()], + ) + .unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"sql_value".to_vec()); + + // count all keys + let iter = rdb.prefix_iterator(key_prefix.as_slice()); + assert_eq!(iter.count(), 1); + } + + #[test] + // height-tag read/write works + fn key_tagging_works() { + let (_, rdb, cache, trie_storage) = setup(); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + + // write/read hi value, cached + trie_storage.inc_tag(u64::MAX).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"max_value".to_vec()) + .unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value.unwrap(), b"max_value".to_vec()); + + // write/read lo value, cached + trie_storage.inc_tag(u64::MIN).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"min_value".to_vec()) + .unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value.unwrap(), b"min_value".to_vec()); + + // read highest value, bypassing cache. + cache.write().clear(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value.unwrap(), b"max_value".to_vec()); + + // count all keys + let iter = rdb.prefix_iterator(key_prefix.as_slice()); + assert_eq!(iter.count(), 2); + } + + #[test] + // peek ahead works + fn peek_ahead_and_migration() { + let (_, rdb, cache, trie_storage) = setup(); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + rdb.put(key_prefix.as_slice(), b"rdb_value").unwrap(); + + // read legacy value + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"rdb_value".to_vec()); + + // peak ahead tests + trie_storage.inc_tag(u64::MIN).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"min_value".to_vec()) + .unwrap(); + cache.write().clear(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"min_value".to_vec()); + + // peak ahead ordering + trie_storage.inc_tag(u64::MAX).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"max_value".to_vec()) + .unwrap(); + cache.write().clear(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"max_value".to_vec()); + + // count all keys + let iter = rdb.prefix_iterator(key_prefix.as_slice()); + // The migration should have deleted the legacy key; and migrated everything to new keys. + assert_eq!(iter.count(), 2); + } + + #[test] + // writes to disk only happens on commit + fn write_on_commit_only() { + let (_, rdb, cache, trie_storage) = setup(); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + let mut pmt = EthTrie::new(trie_storage.clone()); + + trie_storage.inc_tag(u64::MIN).unwrap(); + pmt.insert(key_prefix.as_slice(), b"one_value").unwrap(); + pmt.insert(key_prefix.as_slice(), b"two_value").unwrap(); + pmt.insert(key_prefix.as_slice(), b"tri_value").unwrap(); + cache.write().clear(); + let value = pmt.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"tri_value".to_vec()); + pmt.root_hash().unwrap(); // write to disk + + trie_storage.inc_tag(1).unwrap(); + pmt.insert(key_prefix.as_slice(), b"for_value").unwrap(); + trie_storage.inc_tag(2).unwrap(); + pmt.insert(key_prefix.as_slice(), b"fiv_value").unwrap(); + cache.write().clear(); + let value = pmt.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"fiv_value".to_vec()); + pmt.root_hash().unwrap(); // write to disk + + let iter = rdb.iterator(rocksdb::IteratorMode::Start); + assert_eq!(iter.count(), 2); + } +} diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index 8fb1bf94d8..e01a6a2ca7 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -1131,6 +1131,9 @@ impl Network { InternalMessage::UnsubscribeFromGossipSubTopic(topic) => { debug!("unsubscribing from topic {:?}", topic); } + InternalMessage::SnapshotTrie(storage, hash, view) => { + db::snapshot_trie(storage.clone(), *hash, *view).unwrap(); + } } } AnyMessage::External(external_message) => { diff --git a/zilliqa/tests/it/sync.rs b/zilliqa/tests/it/sync.rs index fe16da2f42..37790aab9a 100644 --- a/zilliqa/tests/it/sync.rs +++ b/zilliqa/tests/it/sync.rs @@ -1,6 +1,6 @@ use alloy::{ primitives::{Address, U256}, - providers::Provider as _, + providers::Provider, rpc::types::TransactionRequest, }; use fs_extra::file::CopyOptions; @@ -9,25 +9,44 @@ use zilliqa::{cfg::Checkpoint, crypto::Hash, sync::MIN_PRUNE_INTERVAL}; use crate::{Network, NewNodeOptions}; // Test a pruning node does not hold old blocks. -#[zilliqa_macros::test] +#[zilliqa_macros::test(blocks_per_epoch = 3)] async fn prune_interval(mut network: Network) { - network.run_until_block_finalized(5, 100).await.unwrap(); + let wallet = network.genesis_wallet().await; - tracing::info!("Adding pruned node. {}", MIN_PRUNE_INTERVAL); + // make sure that pruning preserves the balance. + let address = Address::random(); + let amount = U256::from(1234); + let hash = *wallet + .send_transaction(TransactionRequest::default().to(address).value(amount)) + .await + .unwrap() + .tx_hash(); + let _ = network.run_until_receipt(&wallet, &hash, 100).await; + let balance = wallet.get_balance(address).await.unwrap(); + assert_eq!(balance, amount); + + tracing::info!(prune_interval = MIN_PRUNE_INTERVAL, "Adding pruned node."); let index = network.add_node_with_options(crate::NewNodeOptions { prune_interval: Some(MIN_PRUNE_INTERVAL), ..Default::default() }); network.run_until_synced(index).await; + let number = network.node_at(index).get_finalized_block_number().unwrap(); + + tracing::info!(number, "Added pruned node."); + // run for a bit to allow state pruning to kick in network - .run_until_block_finalized(MIN_PRUNE_INTERVAL + 5, 1000) + .run_until_block_finalized(MIN_PRUNE_INTERVAL * 3, 1000) .await .unwrap(); let range = network.node_at(index).db.available_range().unwrap(); - tracing::info!("Pruned range: {range:?}"); + tracing::info!(?range, "Pruned"); assert_eq!(range.count() as u64, MIN_PRUNE_INTERVAL); + + let balance = wallet.get_balance(address).await.unwrap(); + assert_eq!(balance, amount); } #[zilliqa_macros::test(do_checkpoints)]