From 27251ec6d9fae623719c495d5d6c0138b5ab2054 Mon Sep 17 00:00:00 2001 From: Shawn Date: Wed, 4 Feb 2026 16:54:56 +0800 Subject: [PATCH 01/13] feat: fix #3407. feat: fix #3415. docs: added docs/state_pruning.md config: added validations sql: TRUNCATE the state_trie table as well. chore: fmt, clippy --- docs/state_pruning.md | 63 +++++ zilliqa/src/api/admin.rs | 12 + zilliqa/src/cfg.rs | 15 +- zilliqa/src/consensus.rs | 81 ++++--- zilliqa/src/db.rs | 148 ++++++++---- zilliqa/src/message.rs | 5 + zilliqa/src/p2p_node.rs | 8 + zilliqa/src/trie_storage.rs | 451 ++++++++++++++++++++++++++++++++++-- zilliqa/tests/it/main.rs | 3 + zilliqa/tests/it/sync.rs | 30 ++- 10 files changed, 709 insertions(+), 107 deletions(-) create mode 100644 docs/state_pruning.md diff --git a/docs/state_pruning.md b/docs/state_pruning.md new file mode 100644 index 0000000000..f5c76a195b --- /dev/null +++ b/docs/state_pruning.md @@ -0,0 +1,63 @@ +# State Pruning Guide + +With `v0.21.0` the state pruning feature was introduced. +This feature allows users to prune the state database to reduce the size of the database and improve performance. + +There are two parts of the state pruning feature: +1. Pruning the blocks; and +2. Pruning the state. + +## Pruning the Blocks + +To prune the blocks, users can use the `sync.prune_interval` configuration option. +If this option is set to a positive integer, the node will retain only the most recent `sync.prune_interval` blocks. +Any block older than that will be deleted from the SQL database. + +## Pruning the State + +To prune the state, users can use the `db.state_prune` configuration options. +If this option is set to `true`, the node will prune the state database to massively reduce the size of the storage. + +As part of this process, the node will also truncate the SQL `state_trie` table. +But, the node does not `VACUUM` to reclaim the space from the SQL database, as this process can possibly take a long time. +If the disk space needs to be reclaimed, node operators can manually run the `VACUUM` command on the SQL database. + +### How it works + +The state pruning feature works by exploiting the RocksDB built-in compaction feature. +We have chosen to implement a feature that is adapted from the [User-defined Time-stamp](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp) feature of RocksDB. +So, the amount of storage recovered is neither exact nor immediate. +In fact, when it is turned on, you may see an initial increase in storage usage. +But over time, the storage usage will decrease as the compaction process runs. + +- When writing data to the trie-storage in RocksDB, each key is tagged with a *timestamp* suffix that is used to determine the order of the keys. +``` +|user-key + tag|seqno|type| +|<-----internal key------>| +``` + +- When reading trie-storage data from RocksDB, the timestamp is taken into account to ensure that only the most *recent* value is returned, for a specific key. +> Instead of using the block height as the timestamp, we use the block view instead. +This is because there can only be one block per view while there could potentially be more than one block per height. +This ensures that the timestamp is always monotonically increasing per block. + +- When the built-in compaction feature is triggered, any *stale* keys are removed and the disk space is recovered. +> By default RocksDB will ensure that compaction is triggered at least once every 30-days; but you can configure this behaviour by setting the `db.rocksdb_compaction_period` option. + +At each epoch, the node increments its internal timestamp *ceiling*, which will result in any new state being tagged with a higher timestamp. +Also, the node will trigger a background operation to *promote* all pre-existing state that should be retained, by duplicating the entire state-trie with the higher timestamp. +This operation may take some time to complete, but the node will only allow one such operation at a time. +After the operation is complete, the node increments its internal timestamp *floor*; and any state with a timestamp below the *floor* will eventually be compacted away. + +### Conditions + +The conditions for the *ceiling* and the *floor* are that: +- The *ceiling* is always incremented i.e. new ceiling > old ceiling. +- The *floor* is always incremented i.e. new floor > old floor. +- The *ceiling* is always greater than the *floor* i.e. ceiling > floor. +- The *floor* always lags the *ceiling* i.e. floor == old ceiling. + +The promotion operation will only be triggered if the lowest block view is greater than the current *ceiling*. +This condition ensures the safety that, the only states pruned are those that are absolutely no longer needed, since the block no longer exists in the SQL database. +This also means that the node may retain slightly more state than it actually needs. +Considering the amount of state that can be pruned, this tiny bit of extra state is negligible. diff --git a/zilliqa/src/api/admin.rs b/zilliqa/src/api/admin.rs index c040ec4d0d..cc98787520 100644 --- a/zilliqa/src/api/admin.rs +++ b/zilliqa/src/api/admin.rs @@ -41,6 +41,7 @@ pub fn rpc_module(node: Arc, enabled_apis: &[EnabledApi]) -> RpcModule) -> Result } Ok(leaders) } + +fn snapshot(params: Params, node: &Arc) -> Result { + let mut params = params.sequence(); + let block_id: BlockId = params.next()?; + let block = node + .get_block(block_id)? + .ok_or(anyhow!("Block {block_id} does not exist"))?; + + // node.consensus.read().snapshot_at(block.number())?; + Ok(block.view()) +} diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index 18b3fa03ff..6ceaee4099 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -376,31 +376,32 @@ impl NodeConfig { } } + // deprecated settings anyhow::ensure!( self.state_cache_size == state_cache_size_default(), "state_cache_size is deprecated. Use db.rocksdb_cache_size and db.rocksdb_state_cache_size instead." ); - + // sync/prune settings anyhow::ensure!( self.sync.base_height == u64_max() || self.sync.prune_interval == u64_max(), - "base_height and prune_interval cannot be set at the same time" + "sync.base_height and sync.prune_interval cannot be set at the same time" ); - // when set, >> 15 to avoid pruning forks; > 256 to be EVM-safe; arbitrarily picked. anyhow::ensure!( self.sync.prune_interval >= MIN_PRUNE_INTERVAL, - "prune_interval must be at least {MIN_PRUNE_INTERVAL}", + "sync.prune_interval must be at least {MIN_PRUNE_INTERVAL}", + ); + anyhow::ensure!( + self.sync.prune_interval == u64::MAX || !self.do_checkpoints, + "state_prune and do_checkpoints cannot be set at the same time" ); - // 10 is a reasonable minimum for a node to be useful. anyhow::ensure!( self.sync.block_request_batch_size >= 10, "block_request_batch_size must be at least 10" ); - // 1000 would saturate a typical node. anyhow::ensure!( self.sync.max_blocks_in_flight <= 1000, "max_blocks_in_flight must be at most 1000" ); - // the minimum required for the next leader selection anyhow::ensure!( self.max_missed_view_age >= MISSED_VIEW_WINDOW, "max_missed_view_age must be at least {MISSED_VIEW_WINDOW}" diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index c71a0ddb51..e1bbf91004 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -2415,46 +2415,57 @@ impl Consensus { } } - if self.block_is_first_in_epoch(block.number()) - && !block.is_genesis() - && self.config.do_checkpoints - && self.epoch_is_checkpoint(self.epoch_number(block.number())) - && let Some(checkpoint_path) = self.db.get_checkpoint_dir()? - { - let parent = self - .db - .get_block(block.parent_hash().into())? - .ok_or(anyhow!( - "Trying to checkpoint block, but we don't have its parent" - ))?; - let transactions: Vec = block - .transactions - .iter() - .map(|txn_hash| { - let tx = self.db.get_transaction(txn_hash)?.ok_or(anyhow!( - "failed to fetch transaction {} for checkpoint parent {}", - txn_hash, - parent.hash() - ))?; - Ok::<_, anyhow::Error>(tx.tx) - }) - .collect::>>()?; - - self.message_sender.send_message_to_coordinator( - InternalMessage::ExportBlockCheckpoint( - Box::new(block), - transactions, - Box::new(parent), - self.db.state_trie()?.clone(), - self.state.view_history.read().clone(), - checkpoint_path, - ), - )?; + if self.block_is_first_in_epoch(block.number()) && !block.is_genesis() { + // Do snapshots + if self.config.sync.prune_interval != u64::MAX { + // do at block boundaries to avoid state inconsistencies. + // do at epoch boundaries to reduce size amplification. + let range = self.db.available_range()?; + self.snapshot_at(*range.start(), block.view())?; + }; + // Do checkpoints + if self.config.do_checkpoints + && self.db.get_checkpoint_dir()?.is_some() + && self.epoch_is_checkpoint(self.epoch_number(block.number())) + { + self.checkpoint_at(block.number())?; + } } Ok(()) } + /// Trigger a snapshot + pub fn snapshot_at(&self, block_number: u64, new_ceil: u64) -> Result<()> { + // skip if there is a snapshot in progress. + let Some(mut tag_lock) = self.db.tag_lock.try_lock() else { + return Ok(()); + }; + // error if the lowest block does not exist + let Some(block) = self.get_canonical_block_by_number(block_number)? else { + return Err(anyhow::format_err!("Snapshot: missing block")); + }; + // skip if the lowest block unsafe to snapshot + // 'unsafe' means that the block exists in a tag range that could be pruned away during compaction. + if block.view() < *tag_lock { + return Ok(()); + } + + let trie_storage = self.db.state_trie()?; + // raise the ceiling to the new tag, promoting all new state + let old_ceil = trie_storage.set_tag_ceil(new_ceil)?; + // store the previous tag, which is the next floor. + *tag_lock = old_ceil; + + // trigger snapshot + self.message_sender + .send_message_to_coordinator(InternalMessage::PromoteTrie( + trie_storage, + block.state_root_hash().into(), + block_number, + )) + } + /// Trigger a checkpoint, for debugging. /// Returns (file_name, block_hash). At some time after you call this function, hopefully a checkpoint will end up in the file pub fn checkpoint_at(&self, block_number: u64) -> Result<(String, String)> { diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index d10531b2ed..2f06c1ae79 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -6,7 +6,10 @@ use std::{ num::NonZeroUsize, ops::RangeInclusive, path::{Path, PathBuf}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, time::Duration, }; @@ -15,10 +18,13 @@ use anyhow::{Context, Result, anyhow}; #[allow(unused_imports)] use eth_trie::{DB, EthTrie, MemoryDB, Trie}; use lru::LruCache; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; -use rocksdb::{BlockBasedOptions, Cache, DBWithThreadMode, Options, SingleThreaded}; +use revm::primitives::B256; +use rocksdb::{ + BlockBasedOptions, Cache, CompactionDecision, DBWithThreadMode, Options, SingleThreaded, +}; use rusqlite::{ Connection, OptionalExtension, Row, ToSql, named_params, types::{FromSql, FromSqlError, ToSqlOutput}, @@ -34,7 +40,7 @@ use crate::{ precompiles::ViewHistory, time::SystemTime, transaction::{EvmGas, Log, SignedTransaction, TransactionReceipt, VerifiedTransaction}, - trie_storage::TrieStorage, + trie_storage::{ROCKSDB_TAGGING_AT, TrieStorage}, }; const MAX_KEYS_IN_SINGLE_QUERY: usize = 32765; @@ -256,6 +262,10 @@ pub struct Db { executable_blocks_height: Option, /// Clone of DbConfig pub config: DbConfig, + /// State Pruning + tag_ceil: Arc, // always set to the finalised view height; set by Db::set_finalised_view() + tag_floor: Arc, // resets to u64::MAX at startup; gets set during prune.; set by Db::snapshot() + pub tag_lock: Arc>, // used to lock the snapshot process } impl Db { @@ -311,46 +321,14 @@ impl Db { let connection = pool.get()?; Self::ensure_schema(&connection)?; - // RocksDB configuration - let mut block_opts = BlockBasedOptions::default(); - // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter - block_opts.set_ribbon_filter(10.0); - block_opts.set_optimize_filters_for_memory(true); // reduce memory wastage with JeMalloc - // Mitigate OOM - block_opts.set_cache_index_and_filter_blocks(config.rocksdb_cache_index_filters); - // Improve cache utilisation - block_opts.set_pin_top_level_index_and_filter(true); - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - block_opts.set_index_type(rocksdb::BlockBasedIndexType::TwoLevelIndexSearch); - block_opts.set_partition_filters(true); - block_opts.set_block_size(config.rocksdb_block_size); - block_opts.set_metadata_block_size(config.rocksdb_block_size); - - let cache = - Cache::new_hyper_clock_cache(config.rocksdb_cache_size, config.rocksdb_block_size); - block_opts.set_block_cache(&cache); - - let mut rdb_opts = Options::default(); - rdb_opts.create_if_missing(true); - rdb_opts.set_block_based_table_factory(&block_opts); - rdb_opts.set_periodic_compaction_seconds(config.rocksdb_compaction_period); - // Mitigate OOM - prevent opening too many files at a time - rdb_opts.set_max_open_files(config.rocksdb_max_open_files); - // Reduce reads - rdb_opts.set_level_compaction_dynamic_level_bytes(true); - rdb_opts.set_target_file_size_base(config.rocksdb_target_file_size); - rdb_opts.set_max_bytes_for_level_base(config.rocksdb_target_file_size << 2); - // Reduce storage - rdb_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); - rdb_opts.set_bottommost_zstd_max_train_bytes(0, true); - + let tag_floor = Arc::new(AtomicU64::new(u64::MAX)); // default to no compaction // Should be safe in single-threaded mode // https://docs.rs/rocksdb/latest/rocksdb/type.DB.html#limited-performance-implication-for-single-threaded-mode let rdb_path = path.as_ref().map_or_else( || tempfile::tempdir().unwrap().path().join("state.rocksdb"), |p| p.join("state.rocksdb"), ); + let rdb_opts = Self::init_rocksdb(config.clone(), tag_floor.clone()); let rdb = DBWithThreadMode::::open(&rdb_opts, rdb_path)?; tracing::info!( @@ -363,13 +341,23 @@ impl Db { let cache = LruCache::new(NonZeroUsize::new(config.rocksdb_state_cache_size / 500).unwrap()); + // *** Must use the latest tag for keys. *** + let last_tag = rdb.get(ROCKSDB_TAGGING_AT)?.map_or(u64::MAX, |v| { + u64::from_be_bytes(v.try_into().expect("8-bytes")) + }); + let tag_ceil = Arc::new(AtomicU64::new(last_tag)); // stores the reverse view + let tag_lock = Arc::new(Mutex::new(u64::MAX.saturating_sub(last_tag))); // stores the equivalent view + Ok(Db { pool: Arc::new(pool), - path, - executable_blocks_height, kvdb: Arc::new(rdb), cache: Arc::new(RwLock::new(cache)), + path, + executable_blocks_height, config, + tag_ceil, + tag_floor, + tag_lock, }) } @@ -651,6 +639,65 @@ impl Db { Ok(()) } + fn init_rocksdb(config: DbConfig, tag_floor: Arc) -> rocksdb::Options { + // RocksDB configuration + let mut block_opts = BlockBasedOptions::default(); + // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter + block_opts.set_ribbon_filter(10.0); + block_opts.set_optimize_filters_for_memory(true); // reduce memory wastage with JeMalloc + // Mitigate OOM + block_opts.set_cache_index_and_filter_blocks(config.rocksdb_cache_index_filters); + // Improve cache utilisation + block_opts.set_pin_top_level_index_and_filter(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_index_type(rocksdb::BlockBasedIndexType::TwoLevelIndexSearch); + block_opts.set_partition_filters(true); + block_opts.set_block_size(config.rocksdb_block_size); + block_opts.set_metadata_block_size(config.rocksdb_block_size); + + let cache = + Cache::new_hyper_clock_cache(config.rocksdb_cache_size, config.rocksdb_block_size); + block_opts.set_block_cache(&cache); + + let mut rdb_opts = Options::default(); + rdb_opts.create_if_missing(true); + rdb_opts.set_block_based_table_factory(&block_opts); + rdb_opts.set_periodic_compaction_seconds(config.rocksdb_compaction_period); + // Mitigate OOM - prevent opening too many files at a time + rdb_opts.set_max_open_files(config.rocksdb_max_open_files); + // Reduce reads + rdb_opts.set_level_compaction_dynamic_level_bytes(true); + rdb_opts.set_target_file_size_base(config.rocksdb_target_file_size); + rdb_opts.set_max_bytes_for_level_base(config.rocksdb_target_file_size << 2); + // Reduce storage + rdb_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); + rdb_opts.set_bottommost_zstd_max_train_bytes(0, true); + + // Keys are only removed if they are older than the floor value, which is set during snapshot. + // After pruning, old and legacy keys are eventually removed when the background compaction runs. + let floor = tag_floor.clone(); + rdb_opts.set_compaction_filter( + "StatePruneFilter", + move |_lvl, key, _value| -> CompactionDecision { + match key.len() { + // 40-bytes: remove tagged key, if the key is 'older' than the floor + 40 if u64::from_be_bytes(key[32..40].try_into().unwrap()) + > floor.load(Ordering::Relaxed) => + { + CompactionDecision::Remove + } + // 32-bytes: remove legacy key, if snapshot already taken + 32 if floor.load(Ordering::Relaxed) != u64::MAX => CompactionDecision::Remove, + // default to keep, all other keys + _ => CompactionDecision::Keep, + } + }, + ); + + rdb_opts + } + // SQLite performance tweaks fn init_connection( connection: &mut Connection, @@ -812,6 +859,9 @@ impl Db { self.pool.clone(), self.kvdb.clone(), self.cache.clone(), + self.tag_ceil.clone(), + self.tag_floor.clone(), + self.tag_lock.clone(), )) } @@ -1643,6 +1693,24 @@ impl Db { } } +/// Promote the state trie. +/// +/// Promotes the tag of each node to the given view. This process may take a while to complete. +/// The `tag_lock` ensures that only one snapshot is in progress at a time. +/// The previous state trie will be eventually pruned during compaction. +pub fn promote_trie(storage: TrieStorage, root_hash: B256, block_number: u64) -> Result<()> { + let trie = Arc::new(storage); + let tag_lock = trie.tag_lock.lock(); + tracing::info!(%root_hash, block_number, "Promote: start"); + TrieStorage::promote(trie.clone(), root_hash)?; + tracing::info!(%root_hash, block_number, "Promote: done"); + let old_floor = trie.set_tag_floor(*tag_lock)?; + if old_floor != 0 { + trie.drop_sql_state_trie()?; // delete SQL database + } + Ok(()) // not fatal, it can be retried later. +} + pub fn get_checkpoint_filename + Debug>( output_dir: P, block: &Block, diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index b609767682..bd544b3895 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -394,6 +394,8 @@ pub enum InternalMessage { SubscribeToGossipSubTopic(GossipSubTopic), /// Notify p2p cordinator to unsubscribe from a particular gossipsub topic UnsubscribeFromGossipSubTopic(GossipSubTopic), + /// Snapshot a trie at a given point + PromoteTrie(TrieStorage, B256, u64), } #[derive(Debug, Clone)] @@ -420,6 +422,9 @@ impl Display for InternalMessage { InternalMessage::UnsubscribeFromGossipSubTopic(topic) => { write!(f, "UnsubscribeFromGossipSubTopic({topic:?})") } + InternalMessage::PromoteTrie(_storage, hash, view) => { + write!(f, "PromoteTrie({hash:?}) {view})") + } } } } diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index f6f2b68842..d6295acd02 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -484,6 +484,14 @@ impl P2pNode { self.swarm.behaviour_mut().gossipsub.unsubscribe(&Self::validator_topic(shard_id)); } } + InternalMessage::PromoteTrie(trie, hash, view) => { + self.task_threads.spawn(async move { + if let Err(e) = db::promote_trie(trie, hash, view) { + tracing::error!("Snapshot failed: {e:?}"); + } + Ok(()) + }); + } } }, message = self.request_responses_receiver.next() => { diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index a1e02f947f..3de8cd1d09 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -1,18 +1,24 @@ -use std::sync::Arc; +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; use anyhow::Result; +use eth_trie::{EthTrie, Trie as _}; use lru::LruCache; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; +use revm::primitives::B256; use rocksdb::WriteBatch; use rusqlite::OptionalExtension; -use crate::{cfg::Forks, crypto::Hash}; +use crate::{cfg::Forks, crypto::Hash, state::Account}; /// Special storage keys const ROCKSDB_MIGRATE_AT: &str = "migrate_at"; const ROCKSDB_CUTOVER_AT: &str = "cutover_at"; +pub const ROCKSDB_TAGGING_AT: &str = "tagging_at"; /// An implementor of [eth_trie::DB] which uses a [rocksdb::DB]/[rusqlite::Connection] to persist data. #[derive(Debug, Clone)] @@ -20,6 +26,10 @@ pub struct TrieStorage { pool: Arc>, kvdb: Arc, cache: Arc, Vec>>>, + // the tag_* values are stored in the reverse height order i.e. u64::MAX is genesis. + tag_ceil: Arc, // used to tag every write to the state database; reverse-ordered. + tag_floor: Arc, // used to mark the compaction boundary; reverse-ordered. + pub tag_lock: Arc>, // used to lock the snapshot process; forward-ordered. } impl TrieStorage { @@ -27,11 +37,84 @@ impl TrieStorage { pool: Arc>, kvdb: Arc, cache: Arc, Vec>>>, + tag_ceil: Arc, + tag_floor: Arc, + tag_lock: Arc>, ) -> Self { - Self { pool, kvdb, cache } + Self { + pool, + kvdb, + cache, + tag_ceil, + tag_floor, + tag_lock, + } } - pub fn write_batch(&self, keys: Vec>, values: Vec>) -> Result<()> { + /// Truncate the state_trie table + pub fn drop_sql_state_trie(&self) -> Result<()> { + let sql = self.pool.get().unwrap(); + sql.execute("DELETE FROM state_trie", [])?; // TRUNCATE + Ok(()) + } + + /// This snapshot promotes the *active* keys to the tag + /// + /// This works on duplicating the underlying db-level keys, not the trie-level keys. + /// e.g. 500 trie-level random keys takes ~150 db-level keys. + // + // Previously, no deletion of keys was allowed in the state database. So, it is safe to repurpose clear_trie_from_db() to + // promote the trie, rather than delete it. + pub fn promote(trie_storage: Arc, state_root_hash: B256) -> Result<()> { + let mut state_trie = EthTrie::new(trie_storage.clone()).at_root(state_root_hash); + for akv in state_trie.iter() { + let (_key, serialised_account) = akv?; + let account_root = Account::try_from(serialised_account.as_slice())?.storage_root; + let mut account_trie = EthTrie::new(trie_storage.clone()).at_root(account_root); + // repurpose clear_trie_from_db() to promote the account trie + account_trie.clear_trie_from_db()?; + } + // repurpose clear_trie_from_db() to promote the state trie + state_trie.clear_trie_from_db()?; + Ok(()) + } + + /// Set the tag floor to the given view, returning previous tag + /// + /// This ensures that: floor != ceil && new_floor 'increments' old_floor. + pub fn set_tag_floor(&self, view: u64) -> Result { + let new_tag = u64::MAX.saturating_sub(view); // reverse-ordered + let tag_floor = self.tag_floor.load(Ordering::Relaxed); + anyhow::ensure!(new_tag <= tag_floor, "{new_tag} <= {tag_floor}"); + let tag_ceil = self.tag_ceil.load(Ordering::Relaxed); + anyhow::ensure!(new_tag > tag_ceil, "{new_tag} > {tag_ceil}"); + self.tag_floor.store(new_tag, Ordering::Relaxed); + Ok(u64::MAX.saturating_sub(tag_floor)) + } + + /// Set the tag to the given view, returning the previous tag + /// + /// This ensures that: floor != ceil && new_ceil 'increments' old_ceil. + pub fn set_tag_ceil(&self, view: u64) -> Result { + let new_tag = u64::MAX.saturating_sub(view); // reverse-ordered + let tag_ceil = self.tag_ceil.load(Ordering::Relaxed); + anyhow::ensure!(new_tag <= tag_ceil, "{new_tag} <= {tag_ceil}"); + let tag_floor = self.tag_floor.load(Ordering::Relaxed); + anyhow::ensure!(new_tag < tag_floor, "{new_tag} < {tag_floor}"); + self.kvdb.put(ROCKSDB_TAGGING_AT, new_tag.to_be_bytes())?; + self.tag_ceil.store(new_tag, Ordering::Relaxed); + Ok(u64::MAX.saturating_sub(tag_ceil)) + } + + /// Writes a batch of key-value pairs to the database. + /// + /// Since the tagging is only performed here, only Trie keys are tagged, leaving other keys untagged. + /// The other keys stored in this database are e.g. internal settings. + // + // This is the tagging scheme used. Each tag is a U64 in big-endian format. + // |user-key + tag|seqno|type| + // |<-----internal key------>| + fn write_batch(&self, keys: Vec>, values: Vec>, tag: [u8; 8]) -> Result<()> { if keys.is_empty() { return Ok(()); } @@ -41,12 +124,56 @@ impl TrieStorage { let mut batch = WriteBatch::default(); let mut cache = self.cache.write(); for (key, value) in keys.into_iter().zip(values.into_iter()) { - batch.put(key.as_slice(), value.as_slice()); + // tag keys; lexicographically sorted + let mut tag_key = key.clone(); + tag_key.extend_from_slice(tag.as_slice()); // suffix big-endian tags + batch.put(tag_key.as_slice(), value.as_slice()); cache.put(key, value); } Ok(self.kvdb.write(batch)?) } + /// This function retrieves the 'latest' value, at all times. + /// + /// Legacy keys do not have a tag and are ordered first due to lexicographical sorting used by rocksdb. + /// We peek the next key to determine if the legacy key is the latest, or if a later tag key is present. + // + // This is the tagging scheme used. Each tag is a U64 in big-endian format. + // |user-key + tag|seqno|type| + // |<-----internal key------>| + fn get_tag_value(&self, key_prefix: &[u8]) -> Result)>> { + let mut iter = self.kvdb.prefix_iterator(key_prefix); + // early return if user-key/prefix is not found + let Some(prefix) = iter.next() else { + return Ok(None); + }; + let (key, value) = prefix?; + if !key.starts_with(key_prefix) { + return Ok(None); + } + + // Given that the trie keys are *all* Keccak256 keys, the legacy keys are exactly 32-bytes (256-bits) long, + // while the new tag keys are exactly 40-bytes (320-bits) long. We do not expect any other trie-key lengths. + if key.len() == 40 { + let tag = key[32..40].try_into().unwrap(); + // latest tag key, return the latest value + Ok(Some((tag, value.to_vec()))) + } else if key.len() == 32 { + // legacy key - peek to see if a later tag key is present + if let Some(peek) = iter.next() { + let peek = peek?; + if peek.0.starts_with(key_prefix) { + let tag = peek.0[32..].try_into().expect("8-bytes"); + return Ok(Some((tag, peek.1.to_vec()))); // tag key has newer value + } + } + // We do not perform lazy migration here to avoid write amplification. + Ok(Some((u64::MAX.to_be_bytes(), value.to_vec()))) // fall-thru value; return legacy value + } else { + unimplemented!("unsupported trie key length {} bytes", key.len()); + } + } + pub fn init_state_trie(&self, _forks: Forks) -> Result<()> { let rdb = self.kvdb.clone(); if rdb.get(ROCKSDB_CUTOVER_AT)?.is_none() { @@ -70,7 +197,7 @@ impl TrieStorage { Ok(self .kvdb .get(ROCKSDB_MIGRATE_AT)? - .map(|v| u64::from_be_bytes(v.try_into().expect("must be 8-bytes"))) + .map(|v| u64::from_be_bytes(v.try_into().expect("8-bytes"))) .unwrap_or(u64::MAX)) // default to no state-sync } @@ -79,7 +206,7 @@ impl TrieStorage { Ok(self .kvdb .get(ROCKSDB_CUTOVER_AT)? - .map(|v| u64::from_be_bytes(v.try_into().expect("must be 8-bytes"))) + .map(|v| u64::from_be_bytes(v.try_into().expect("8-bytes"))) .unwrap_or_default()) } @@ -119,6 +246,22 @@ impl TrieStorage { self.kvdb.put(ROCKSDB_MIGRATE_AT, u64::MAX.to_be_bytes())?; Ok(()) } + + #[cfg(test)] + // test artifacts + fn clear_cache(&self) -> Result<()> { + self.cache.write().clear(); + Ok(()) + } + + #[cfg(test)] + // test artifacts + fn inc_tag(&self, new_height: u64) -> Result<()> { + let new_tag = u64::MAX.saturating_sub(new_height); + self.tag_ceil + .store(new_tag, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } } impl eth_trie::DB for TrieStorage { @@ -131,9 +274,8 @@ impl eth_trie::DB for TrieStorage { } // L2 - rocksdb - if let Some(value) = self - .kvdb - .get(key) + if let Some((_tag, value)) = self + .get_tag_value(key) .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? { self.cache.write().put(key.to_vec(), value.clone()); @@ -152,9 +294,12 @@ impl eth_trie::DB for TrieStorage { .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; if let Some(value) = value { - self.cache.write().put(key.to_vec(), value.clone()); - self.kvdb - .put(key, value.as_slice()) + // lazy migration + let tag = self + .tag_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(); + self.write_batch(vec![key.to_vec()], vec![value.clone()], tag) .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; return Ok(Some(value)); } @@ -164,13 +309,21 @@ impl eth_trie::DB for TrieStorage { #[inline] fn insert(&self, key: &[u8], value: Vec) -> Result<(), Self::Error> { - self.write_batch(vec![key.to_vec()], vec![value]) + let tag = self + .tag_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(); + self.write_batch(vec![key.to_vec()], vec![value], tag) .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } #[inline] fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { - self.write_batch(keys, values) + let tag = self + .tag_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(); + self.write_batch(keys, values, tag) .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } @@ -180,13 +333,273 @@ impl eth_trie::DB for TrieStorage { .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } - fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { - // we keep old state to function as an archive node, therefore no-op + // Since clear_trie_from_db() iterates over all db-level keys in the trie; this will end up + // promoting the trie at the db-level, without iterating the trie-level keys. + // ** only called from clear_trie_from_db() ** + fn remove(&self, promote_key: &[u8]) -> Result<(), Self::Error> { + if let Ok(Some((_old_tag, value))) = self.get_tag_value(promote_key) { + self.write_batch( + vec![promote_key.to_vec()], + vec![value], + self.tag_ceil + .load(Ordering::Relaxed) + .saturating_add(1) // push it one-down, to keep only 1 snapshot on-disk. + .to_be_bytes(), + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? + } Ok(()) } fn remove_batch(&self, _: &[Vec]) -> Result<(), Self::Error> { - // we keep old state to function as an archive node, therefore no-op + // TODO: Possibly remove intermediate keys. Ok(()) } } + +#[cfg(test)] +mod tests { + use eth_trie::DB; + use tempfile::tempdir; + + use super::*; + use crate::crypto::SecretKey; + + #[test] + fn basic_snapshot() { + let sql = Arc::new( + Pool::builder() + .build(SqliteConnectionManager::memory()) + .unwrap(), + ); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); + let tag = Arc::new(AtomicU64::new(u64::MAX)); + let lock = Arc::new(Mutex::new(u64::MIN)); + let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let trie_storage = TrieStorage::new( + sql.clone(), + rdb.clone(), + cache.clone(), + tag.clone(), + tag.clone(), + lock.clone(), + ); + let trie_storage = Arc::new(trie_storage); + + let mut pmt = EthTrie::new(trie_storage.clone()); + pmt.root_hash().unwrap(); // create one 'empty' record + assert_eq!(rdb.iterator(rocksdb::IteratorMode::Start).count(), 1); + + // create 10 random accounts + let account = Account::default(); + let value = bincode::serde::encode_to_vec(account, bincode::config::legacy()).unwrap(); + for _ in 0..100 { + let key = SecretKey::new().unwrap().as_bytes(); + pmt.insert(key.as_slice(), value.as_slice()).unwrap(); + } + + // write-to-disk; and count nodes + trie_storage.inc_tag(u64::MIN).unwrap(); + let root_hash = pmt.root_hash().unwrap(); // write to disk + let old_count = rdb.iterator(rocksdb::IteratorMode::Start).count(); + assert!(old_count > 1); + + // snapshot-to-promote nodes; and count nodes + trie_storage.inc_tag(u64::MAX).unwrap(); + TrieStorage::promote(trie_storage.clone(), root_hash).unwrap(); + assert_eq!( + rdb.iterator(rocksdb::IteratorMode::Start).count(), + old_count * 2 + ); + } + + #[test] + // lazy migration from sqlite to rocksdb, works. + fn lazy_migration() { + let sql = Arc::new( + Pool::builder() + .build(SqliteConnectionManager::memory()) + .unwrap(), + ); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); + let tag = Arc::new(AtomicU64::new(u64::MAX)); + let lock = Arc::new(Mutex::new(u64::MIN)); + let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let trie_storage = TrieStorage::new( + sql.clone(), + rdb.clone(), + cache.clone(), + tag.clone(), + tag.clone(), + lock.clone(), + ); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + let conn = sql.get().unwrap(); + conn.execute("CREATE TABLE IF NOT EXISTS state_trie (key BLOB NOT NULL PRIMARY KEY, value BLOB NOT NULL) WITHOUT ROWID;", []).unwrap(); + + // read missing key + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value, None); + + // migrate key - from sql + conn.execute( + "INSERT INTO state_trie (key, value) VALUES (?1, ?2)", + [key_prefix.as_slice(), b"sql_value".to_vec().as_slice()], + ) + .unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"sql_value".to_vec()); + + // count all keys + let iter = rdb.prefix_iterator(key_prefix.as_slice()); + assert_eq!(iter.count(), 1); + } + + #[test] + // height-tag read/write works + fn tagging() { + let sql = Arc::new( + Pool::builder() + .build(SqliteConnectionManager::memory()) + .unwrap(), + ); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); + let tag = Arc::new(AtomicU64::new(u64::MAX)); + let lock = Arc::new(Mutex::new(u64::MIN)); + let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let trie_storage = TrieStorage::new( + sql.clone(), + rdb.clone(), + cache.clone(), + tag.clone(), + tag.clone(), + lock.clone(), + ); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + + // write/read hi value, cached + trie_storage.inc_tag(u64::MAX).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"max_value".to_vec()) + .unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value.unwrap(), b"max_value".to_vec()); + + // write/read lo value, cached + trie_storage.inc_tag(u64::MIN).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"min_value".to_vec()) + .unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value.unwrap(), b"min_value".to_vec()); + + // read highest value, bypassing cache. + trie_storage.clear_cache().unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap(); + assert_eq!(value.unwrap(), b"max_value".to_vec()); + + // count all keys + let iter = rdb.prefix_iterator(key_prefix.as_slice()); + assert_eq!(iter.count(), 2); + } + + #[test] + // peek ahead works + fn peek_ahead() { + let sql = Arc::new( + Pool::builder() + .build(SqliteConnectionManager::memory()) + .unwrap(), + ); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); + let tag = Arc::new(AtomicU64::new(u64::MAX)); + let lock = Arc::new(Mutex::new(u64::MIN)); + let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let trie_storage = TrieStorage::new( + sql.clone(), + rdb.clone(), + cache.clone(), + tag.clone(), + tag.clone(), + lock.clone(), + ); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + rdb.put(key_prefix.as_slice(), b"rdb_value").unwrap(); + + // read legacy value + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"rdb_value".to_vec()); + + // peak ahead tests + trie_storage.inc_tag(u64::MIN).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"min_value".to_vec()) + .unwrap(); + trie_storage.clear_cache().unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"min_value".to_vec()); + + // peak ahead ordering + trie_storage.inc_tag(u64::MAX).unwrap(); + trie_storage + .insert(key_prefix.as_slice(), b"max_value".to_vec()) + .unwrap(); + trie_storage.clear_cache().unwrap(); + let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"max_value".to_vec()); + + // count all keys + let iter = rdb.prefix_iterator(key_prefix.as_slice()); + assert_eq!(iter.count(), 3); + } + + #[test] + // writes to disk only happens on commit + fn write_on_commit() { + let sql = Arc::new( + Pool::builder() + .build(SqliteConnectionManager::memory()) + .unwrap(), + ); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); + let tag = Arc::new(AtomicU64::new(u64::MAX)); + let lock = Arc::new(Mutex::new(u64::MIN)); + let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let trie_storage = TrieStorage::new( + sql.clone(), + rdb.clone(), + cache.clone(), + tag.clone(), + tag.clone(), + lock.clone(), + ); + let trie_storage = Arc::new(trie_storage); + + let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; + let mut pmt = EthTrie::new(trie_storage.clone()); + + trie_storage.inc_tag(u64::MIN).unwrap(); + pmt.insert(key_prefix.as_slice(), b"one_value").unwrap(); + pmt.insert(key_prefix.as_slice(), b"two_value").unwrap(); + pmt.insert(key_prefix.as_slice(), b"tri_value").unwrap(); + trie_storage.clear_cache().unwrap(); + let value = pmt.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"tri_value".to_vec()); + pmt.root_hash().unwrap(); // write to disk + + trie_storage.inc_tag(1).unwrap(); + pmt.insert(key_prefix.as_slice(), b"for_value").unwrap(); + trie_storage.inc_tag(2).unwrap(); + pmt.insert(key_prefix.as_slice(), b"fiv_value").unwrap(); + trie_storage.clear_cache().unwrap(); + let value = pmt.get(key_prefix.as_slice()).unwrap().unwrap(); + assert_eq!(value, b"fiv_value".to_vec()); + pmt.root_hash().unwrap(); // write to disk + + let iter = rdb.iterator(rocksdb::IteratorMode::Start); + assert_eq!(iter.count(), 2); + } +} diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index 8fb1bf94d8..478f381f4f 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -1131,6 +1131,9 @@ impl Network { InternalMessage::UnsubscribeFromGossipSubTopic(topic) => { debug!("unsubscribing from topic {:?}", topic); } + InternalMessage::PromoteTrie(storage, hash, view) => { + db::promote_trie(storage.clone(), *hash, *view).unwrap(); + } } } AnyMessage::External(external_message) => { diff --git a/zilliqa/tests/it/sync.rs b/zilliqa/tests/it/sync.rs index fe16da2f42..482c21134d 100644 --- a/zilliqa/tests/it/sync.rs +++ b/zilliqa/tests/it/sync.rs @@ -1,6 +1,6 @@ use alloy::{ primitives::{Address, U256}, - providers::Provider as _, + providers::Provider, rpc::types::TransactionRequest, }; use fs_extra::file::CopyOptions; @@ -9,25 +9,43 @@ use zilliqa::{cfg::Checkpoint, crypto::Hash, sync::MIN_PRUNE_INTERVAL}; use crate::{Network, NewNodeOptions}; // Test a pruning node does not hold old blocks. -#[zilliqa_macros::test] +#[zilliqa_macros::test(blocks_per_epoch = 3)] async fn prune_interval(mut network: Network) { - network.run_until_block_finalized(5, 100).await.unwrap(); + let wallet = network.genesis_wallet().await; - tracing::info!("Adding pruned node. {}", MIN_PRUNE_INTERVAL); + // make sure that pruning preserves the balance. + let address = Address::random(); + let amount = U256::from(1234); + let hash = *wallet + .send_transaction(TransactionRequest::default().to(address).value(amount)) + .await + .unwrap() + .tx_hash(); + let _ = network.run_until_receipt(&wallet, &hash, 100).await; + let balance = wallet.get_balance(address).await.unwrap(); + assert_eq!(balance, amount); + + tracing::info!(prune_interval = MIN_PRUNE_INTERVAL, "Adding pruned node."); let index = network.add_node_with_options(crate::NewNodeOptions { prune_interval: Some(MIN_PRUNE_INTERVAL), ..Default::default() }); network.run_until_synced(index).await; + let number = network.node_at(index).get_finalized_block_number().unwrap(); + + tracing::info!(number, "Added pruned node."); network - .run_until_block_finalized(MIN_PRUNE_INTERVAL + 5, 1000) + .run_until_block_finalized(MIN_PRUNE_INTERVAL * 3, 1000) .await .unwrap(); let range = network.node_at(index).db.available_range().unwrap(); - tracing::info!("Pruned range: {range:?}"); + tracing::info!(?range, "Pruned"); assert_eq!(range.count() as u64, MIN_PRUNE_INTERVAL); + + let balance = wallet.get_balance(address).await.unwrap(); + assert_eq!(balance, amount); } #[zilliqa_macros::test(do_checkpoints)] From 3caeccd54886b65fe8816ab4ec360ef144e3f3a1 Mon Sep 17 00:00:00 2001 From: Shawn Date: Thu, 12 Feb 2026 09:31:13 +0800 Subject: [PATCH 02/13] rename tag variables - for clarity. fix tests. --- zilliqa/src/consensus.rs | 2 +- zilliqa/src/db.rs | 50 ++++++----- zilliqa/src/trie_storage.rs | 162 ++++++++++++------------------------ 3 files changed, 84 insertions(+), 130 deletions(-) diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index e1bbf91004..6af07bc017 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -2438,7 +2438,7 @@ impl Consensus { /// Trigger a snapshot pub fn snapshot_at(&self, block_number: u64, new_ceil: u64) -> Result<()> { // skip if there is a snapshot in progress. - let Some(mut tag_lock) = self.db.tag_lock.try_lock() else { + let Some(mut tag_lock) = self.db.tag_view.try_lock() else { return Ok(()); }; // error if the lowest block does not exist diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 2f06c1ae79..78a342b0df 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -8,7 +8,7 @@ use std::{ path::{Path, PathBuf}, sync::{ Arc, - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, }, time::Duration, }; @@ -263,9 +263,10 @@ pub struct Db { /// Clone of DbConfig pub config: DbConfig, /// State Pruning - tag_ceil: Arc, // always set to the finalised view height; set by Db::set_finalised_view() - tag_floor: Arc, // resets to u64::MAX at startup; gets set during prune.; set by Db::snapshot() - pub tag_lock: Arc>, // used to lock the snapshot process + rev_ceil: Arc, // always set to the finalised view height; set by Db::set_finalised_view() + rev_floor: Arc, // resets to u64::MAX at startup; gets set during prune.; set by Db::snapshot() + del_legacy: Arc, + pub tag_view: Arc>, // used to lock the snapshot process } impl Db { @@ -321,14 +322,15 @@ impl Db { let connection = pool.get()?; Self::ensure_schema(&connection)?; - let tag_floor = Arc::new(AtomicU64::new(u64::MAX)); // default to no compaction + let rev_floor = Arc::new(AtomicU64::new(u64::MAX)); + let del_legacy = Arc::new(AtomicBool::new(false)); // Should be safe in single-threaded mode // https://docs.rs/rocksdb/latest/rocksdb/type.DB.html#limited-performance-implication-for-single-threaded-mode let rdb_path = path.as_ref().map_or_else( || tempfile::tempdir().unwrap().path().join("state.rocksdb"), |p| p.join("state.rocksdb"), ); - let rdb_opts = Self::init_rocksdb(config.clone(), tag_floor.clone()); + let rdb_opts = Self::init_rocksdb(config.clone(), rev_floor.clone(), del_legacy.clone()); let rdb = DBWithThreadMode::::open(&rdb_opts, rdb_path)?; tracing::info!( @@ -345,8 +347,8 @@ impl Db { let last_tag = rdb.get(ROCKSDB_TAGGING_AT)?.map_or(u64::MAX, |v| { u64::from_be_bytes(v.try_into().expect("8-bytes")) }); - let tag_ceil = Arc::new(AtomicU64::new(last_tag)); // stores the reverse view - let tag_lock = Arc::new(Mutex::new(u64::MAX.saturating_sub(last_tag))); // stores the equivalent view + let rev_ceil = Arc::new(AtomicU64::new(last_tag)); // stores the reverse view + let tag_view = Arc::new(Mutex::new(u64::MAX.saturating_sub(last_tag))); // stores the equivalent view Ok(Db { pool: Arc::new(pool), @@ -355,9 +357,10 @@ impl Db { path, executable_blocks_height, config, - tag_ceil, - tag_floor, - tag_lock, + rev_ceil, + rev_floor, + tag_view, + del_legacy, }) } @@ -639,7 +642,11 @@ impl Db { Ok(()) } - fn init_rocksdb(config: DbConfig, tag_floor: Arc) -> rocksdb::Options { + fn init_rocksdb( + config: DbConfig, + tag_floor: Arc, + del_legacy: Arc, + ) -> rocksdb::Options { // RocksDB configuration let mut block_opts = BlockBasedOptions::default(); // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter @@ -676,19 +683,20 @@ impl Db { // Keys are only removed if they are older than the floor value, which is set during snapshot. // After pruning, old and legacy keys are eventually removed when the background compaction runs. - let floor = tag_floor.clone(); + let tag_floor = tag_floor.clone(); + let del_legacy = del_legacy.clone(); rdb_opts.set_compaction_filter( "StatePruneFilter", move |_lvl, key, _value| -> CompactionDecision { match key.len() { // 40-bytes: remove tagged key, if the key is 'older' than the floor 40 if u64::from_be_bytes(key[32..40].try_into().unwrap()) - > floor.load(Ordering::Relaxed) => + > tag_floor.load(Ordering::Relaxed) => { CompactionDecision::Remove } // 32-bytes: remove legacy key, if snapshot already taken - 32 if floor.load(Ordering::Relaxed) != u64::MAX => CompactionDecision::Remove, + 32 if del_legacy.load(Ordering::Relaxed) => CompactionDecision::Remove, // default to keep, all other keys _ => CompactionDecision::Keep, } @@ -859,9 +867,10 @@ impl Db { self.pool.clone(), self.kvdb.clone(), self.cache.clone(), - self.tag_ceil.clone(), - self.tag_floor.clone(), - self.tag_lock.clone(), + self.rev_ceil.clone(), + self.rev_floor.clone(), + self.tag_view.clone(), + self.del_legacy.clone(), )) } @@ -1700,12 +1709,11 @@ impl Db { /// The previous state trie will be eventually pruned during compaction. pub fn promote_trie(storage: TrieStorage, root_hash: B256, block_number: u64) -> Result<()> { let trie = Arc::new(storage); - let tag_lock = trie.tag_lock.lock(); + let tag_lock = trie.tag_view.lock(); tracing::info!(%root_hash, block_number, "Promote: start"); TrieStorage::promote(trie.clone(), root_hash)?; tracing::info!(%root_hash, block_number, "Promote: done"); - let old_floor = trie.set_tag_floor(*tag_lock)?; - if old_floor != 0 { + if trie.set_tag_floor(*tag_lock)? != 0 { trie.drop_sql_state_trie()?; // delete SQL database } Ok(()) // not fatal, it can be retried later. diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index 3de8cd1d09..4b80773900 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -1,6 +1,6 @@ use std::sync::{ Arc, - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, }; use anyhow::Result; @@ -27,9 +27,10 @@ pub struct TrieStorage { kvdb: Arc, cache: Arc, Vec>>>, // the tag_* values are stored in the reverse height order i.e. u64::MAX is genesis. - tag_ceil: Arc, // used to tag every write to the state database; reverse-ordered. - tag_floor: Arc, // used to mark the compaction boundary; reverse-ordered. - pub tag_lock: Arc>, // used to lock the snapshot process; forward-ordered. + rev_ceil: Arc, // used to tag every write to the state database; reverse-ordered. + rev_floor: Arc, // used to mark the compaction boundary; reverse-ordered. + del_legacy: Arc, + pub tag_view: Arc>, // used to lock the snapshot process; forward-ordered. } impl TrieStorage { @@ -37,24 +38,29 @@ impl TrieStorage { pool: Arc>, kvdb: Arc, cache: Arc, Vec>>>, - tag_ceil: Arc, - tag_floor: Arc, - tag_lock: Arc>, + rev_ceil: Arc, + rev_floor: Arc, + tag_view: Arc>, + del_legacy: Arc, ) -> Self { Self { pool, kvdb, cache, - tag_ceil, - tag_floor, - tag_lock, + rev_ceil, + rev_floor, + tag_view, + del_legacy, } } /// Truncate the state_trie table pub fn drop_sql_state_trie(&self) -> Result<()> { - let sql = self.pool.get().unwrap(); - sql.execute("DELETE FROM state_trie", [])?; // TRUNCATE + self.pool + .get() + .unwrap() + .execute("DELETE FROM state_trie", [])?; // TRUNCATE + self.del_legacy.store(true, Ordering::Relaxed); // allow deletion of rocksdb keys Ok(()) } @@ -84,11 +90,11 @@ impl TrieStorage { /// This ensures that: floor != ceil && new_floor 'increments' old_floor. pub fn set_tag_floor(&self, view: u64) -> Result { let new_tag = u64::MAX.saturating_sub(view); // reverse-ordered - let tag_floor = self.tag_floor.load(Ordering::Relaxed); + let tag_floor = self.rev_floor.load(Ordering::Relaxed); anyhow::ensure!(new_tag <= tag_floor, "{new_tag} <= {tag_floor}"); - let tag_ceil = self.tag_ceil.load(Ordering::Relaxed); + let tag_ceil = self.rev_ceil.load(Ordering::Relaxed); anyhow::ensure!(new_tag > tag_ceil, "{new_tag} > {tag_ceil}"); - self.tag_floor.store(new_tag, Ordering::Relaxed); + self.rev_floor.store(new_tag, Ordering::Relaxed); Ok(u64::MAX.saturating_sub(tag_floor)) } @@ -97,12 +103,12 @@ impl TrieStorage { /// This ensures that: floor != ceil && new_ceil 'increments' old_ceil. pub fn set_tag_ceil(&self, view: u64) -> Result { let new_tag = u64::MAX.saturating_sub(view); // reverse-ordered - let tag_ceil = self.tag_ceil.load(Ordering::Relaxed); + let tag_ceil = self.rev_ceil.load(Ordering::Relaxed); anyhow::ensure!(new_tag <= tag_ceil, "{new_tag} <= {tag_ceil}"); - let tag_floor = self.tag_floor.load(Ordering::Relaxed); + let tag_floor = self.rev_floor.load(Ordering::Relaxed); anyhow::ensure!(new_tag < tag_floor, "{new_tag} < {tag_floor}"); self.kvdb.put(ROCKSDB_TAGGING_AT, new_tag.to_be_bytes())?; - self.tag_ceil.store(new_tag, Ordering::Relaxed); + self.rev_ceil.store(new_tag, Ordering::Relaxed); Ok(u64::MAX.saturating_sub(tag_ceil)) } @@ -247,18 +253,11 @@ impl TrieStorage { Ok(()) } - #[cfg(test)] - // test artifacts - fn clear_cache(&self) -> Result<()> { - self.cache.write().clear(); - Ok(()) - } - #[cfg(test)] // test artifacts fn inc_tag(&self, new_height: u64) -> Result<()> { let new_tag = u64::MAX.saturating_sub(new_height); - self.tag_ceil + self.rev_ceil .store(new_tag, std::sync::atomic::Ordering::Relaxed); Ok(()) } @@ -268,8 +267,8 @@ impl eth_trie::DB for TrieStorage { type Error = eth_trie::TrieError; fn get(&self, key: &[u8]) -> Result>, Self::Error> { - // L1 - Lru mem - if let Some(value) = self.cache.write().get(key) { + // L1 - pseudo-lru + if let Some(value) = self.cache.read().peek(key) { return Ok(Some(value.clone())); } @@ -296,7 +295,7 @@ impl eth_trie::DB for TrieStorage { if let Some(value) = value { // lazy migration let tag = self - .tag_ceil + .rev_ceil .load(std::sync::atomic::Ordering::Relaxed) .to_be_bytes(); self.write_batch(vec![key.to_vec()], vec![value.clone()], tag) @@ -310,7 +309,7 @@ impl eth_trie::DB for TrieStorage { #[inline] fn insert(&self, key: &[u8], value: Vec) -> Result<(), Self::Error> { let tag = self - .tag_ceil + .rev_ceil .load(std::sync::atomic::Ordering::Relaxed) .to_be_bytes(); self.write_batch(vec![key.to_vec()], vec![value], tag) @@ -320,7 +319,7 @@ impl eth_trie::DB for TrieStorage { #[inline] fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { let tag = self - .tag_ceil + .rev_ceil .load(std::sync::atomic::Ordering::Relaxed) .to_be_bytes(); self.write_batch(keys, values, tag) @@ -341,7 +340,7 @@ impl eth_trie::DB for TrieStorage { self.write_batch( vec![promote_key.to_vec()], vec![value], - self.tag_ceil + self.rev_ceil .load(Ordering::Relaxed) .saturating_add(1) // push it one-down, to keep only 1 snapshot on-disk. .to_be_bytes(), @@ -365,8 +364,12 @@ mod tests { use super::*; use crate::crypto::SecretKey; - #[test] - fn basic_snapshot() { + fn setup() -> ( + Arc>, + Arc, + Arc, Vec>>>, + Arc, + ) { let sql = Arc::new( Pool::builder() .build(SqliteConnectionManager::memory()) @@ -376,6 +379,7 @@ mod tests { let tag = Arc::new(AtomicU64::new(u64::MAX)); let lock = Arc::new(Mutex::new(u64::MIN)); let cache = Arc::new(RwLock::new(LruCache::unbounded())); + let del = Arc::new(AtomicBool::new(true)); let trie_storage = TrieStorage::new( sql.clone(), rdb.clone(), @@ -383,8 +387,15 @@ mod tests { tag.clone(), tag.clone(), lock.clone(), + del.clone(), ); let trie_storage = Arc::new(trie_storage); + (sql, rdb, cache, trie_storage) + } + + #[test] + fn basic_snapshot() { + let (_, rdb, _, trie_storage) = setup(); let mut pmt = EthTrie::new(trie_storage.clone()); pmt.root_hash().unwrap(); // create one 'empty' record @@ -416,23 +427,7 @@ mod tests { #[test] // lazy migration from sqlite to rocksdb, works. fn lazy_migration() { - let sql = Arc::new( - Pool::builder() - .build(SqliteConnectionManager::memory()) - .unwrap(), - ); - let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); - let tag = Arc::new(AtomicU64::new(u64::MAX)); - let lock = Arc::new(Mutex::new(u64::MIN)); - let cache = Arc::new(RwLock::new(LruCache::unbounded())); - let trie_storage = TrieStorage::new( - sql.clone(), - rdb.clone(), - cache.clone(), - tag.clone(), - tag.clone(), - lock.clone(), - ); + let (sql, rdb, _, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; let conn = sql.get().unwrap(); @@ -459,23 +454,7 @@ mod tests { #[test] // height-tag read/write works fn tagging() { - let sql = Arc::new( - Pool::builder() - .build(SqliteConnectionManager::memory()) - .unwrap(), - ); - let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); - let tag = Arc::new(AtomicU64::new(u64::MAX)); - let lock = Arc::new(Mutex::new(u64::MIN)); - let cache = Arc::new(RwLock::new(LruCache::unbounded())); - let trie_storage = TrieStorage::new( - sql.clone(), - rdb.clone(), - cache.clone(), - tag.clone(), - tag.clone(), - lock.clone(), - ); + let (_, rdb, cache, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; @@ -496,7 +475,7 @@ mod tests { assert_eq!(value.unwrap(), b"min_value".to_vec()); // read highest value, bypassing cache. - trie_storage.clear_cache().unwrap(); + cache.write().clear(); let value = trie_storage.get(key_prefix.as_slice()).unwrap(); assert_eq!(value.unwrap(), b"max_value".to_vec()); @@ -508,23 +487,7 @@ mod tests { #[test] // peek ahead works fn peek_ahead() { - let sql = Arc::new( - Pool::builder() - .build(SqliteConnectionManager::memory()) - .unwrap(), - ); - let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); - let tag = Arc::new(AtomicU64::new(u64::MAX)); - let lock = Arc::new(Mutex::new(u64::MIN)); - let cache = Arc::new(RwLock::new(LruCache::unbounded())); - let trie_storage = TrieStorage::new( - sql.clone(), - rdb.clone(), - cache.clone(), - tag.clone(), - tag.clone(), - lock.clone(), - ); + let (_, rdb, cache, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; rdb.put(key_prefix.as_slice(), b"rdb_value").unwrap(); @@ -538,7 +501,7 @@ mod tests { trie_storage .insert(key_prefix.as_slice(), b"min_value".to_vec()) .unwrap(); - trie_storage.clear_cache().unwrap(); + cache.write().clear(); let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); assert_eq!(value, b"min_value".to_vec()); @@ -547,7 +510,7 @@ mod tests { trie_storage .insert(key_prefix.as_slice(), b"max_value".to_vec()) .unwrap(); - trie_storage.clear_cache().unwrap(); + cache.write().clear(); let value = trie_storage.get(key_prefix.as_slice()).unwrap().unwrap(); assert_eq!(value, b"max_value".to_vec()); @@ -559,24 +522,7 @@ mod tests { #[test] // writes to disk only happens on commit fn write_on_commit() { - let sql = Arc::new( - Pool::builder() - .build(SqliteConnectionManager::memory()) - .unwrap(), - ); - let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); - let tag = Arc::new(AtomicU64::new(u64::MAX)); - let lock = Arc::new(Mutex::new(u64::MIN)); - let cache = Arc::new(RwLock::new(LruCache::unbounded())); - let trie_storage = TrieStorage::new( - sql.clone(), - rdb.clone(), - cache.clone(), - tag.clone(), - tag.clone(), - lock.clone(), - ); - let trie_storage = Arc::new(trie_storage); + let (_, rdb, cache, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; let mut pmt = EthTrie::new(trie_storage.clone()); @@ -585,7 +531,7 @@ mod tests { pmt.insert(key_prefix.as_slice(), b"one_value").unwrap(); pmt.insert(key_prefix.as_slice(), b"two_value").unwrap(); pmt.insert(key_prefix.as_slice(), b"tri_value").unwrap(); - trie_storage.clear_cache().unwrap(); + cache.write().clear(); let value = pmt.get(key_prefix.as_slice()).unwrap().unwrap(); assert_eq!(value, b"tri_value".to_vec()); pmt.root_hash().unwrap(); // write to disk @@ -594,7 +540,7 @@ mod tests { pmt.insert(key_prefix.as_slice(), b"for_value").unwrap(); trie_storage.inc_tag(2).unwrap(); pmt.insert(key_prefix.as_slice(), b"fiv_value").unwrap(); - trie_storage.clear_cache().unwrap(); + cache.write().clear(); let value = pmt.get(key_prefix.as_slice()).unwrap().unwrap(); assert_eq!(value, b"fiv_value".to_vec()); pmt.root_hash().unwrap(); // write to disk From 0977f41bfb05470c28bba257ca0df228c1f77501 Mon Sep 17 00:00:00 2001 From: Shawn Date: Thu, 12 Feb 2026 12:16:26 +0800 Subject: [PATCH 03/13] added TrieStorage::migrate_legacy() --- zilliqa/src/cfg.rs | 4 ++ zilliqa/src/consensus.rs | 11 +++- zilliqa/src/db.rs | 78 ++++++++++++++-------------- zilliqa/src/message.rs | 5 ++ zilliqa/src/node.rs | 7 ++- zilliqa/src/p2p_node.rs | 10 +++- zilliqa/src/trie_storage.rs | 100 +++++++++++++++++++++++++++++------- zilliqa/tests/it/main.rs | 1 + 8 files changed, 155 insertions(+), 61 deletions(-) diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index 6ceaee4099..9d0fe822c2 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -174,6 +174,9 @@ pub struct DbConfig { /// Whether to enable state-sync/state-migration #[serde(default)] pub state_sync: bool, + /// config.toml setting is ignored. + #[serde(default)] + pub state_prune: bool, /// RocksDB block cache size, in bytes. #[serde(default = "rocksdb_cache_size_default")] pub rocksdb_cache_size: usize, @@ -239,6 +242,7 @@ impl Default for DbConfig { conn_cache_size: sql_cache_size_default(), auto_checkpoint: sql_auto_checkpoint_default(), state_sync: false, + state_prune: false, rocksdb_cache_size: rocksdb_cache_size_default(), rocksdb_compaction_period: rocksdb_compaction_period_default(), rocksdb_max_open_files: rocksdb_max_open_files_default(), diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index 6af07bc017..dd942978c9 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -386,12 +386,13 @@ impl Consensus { let state_sync = config.db.state_sync; let forks = config.consensus.get_forks()?; let enable_ots_indices = config.enable_ots_indices; + let prune_interval = config.sync.prune_interval; let mut consensus = Consensus { secret_key, config, sync, - message_sender, + message_sender: message_sender.clone(), reset_timeout, votes: DashMap::new(), buffered_votes: DashMap::new(), @@ -579,7 +580,13 @@ impl Consensus { } // Initialize state trie storage - consensus.db.state_trie()?.init_state_trie(forks)?; + let state_trie = consensus.db.state_trie()?; + state_trie.init_state_trie(forks)?; + if prune_interval == u64::MAX { + // If the note is being pruned, let pruning naturally migrate the state. + // Otherwise, trigger migration - the actual migration process will be handled by the coordinator + message_sender.send_message_to_coordinator(InternalMessage::MigrateTrie(state_trie))?; + } // If timestamp of when current high_qc was written exists then use it to estimate the minimum number of blocks the network has moved on since shut down // This is useful in scenarios in which consensus has failed since this node went down diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 78a342b0df..801aedc2c3 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -8,7 +8,7 @@ use std::{ path::{Path, PathBuf}, sync::{ Arc, - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicU64, Ordering}, }, time::Duration, }; @@ -40,7 +40,7 @@ use crate::{ precompiles::ViewHistory, time::SystemTime, transaction::{EvmGas, Log, SignedTransaction, TransactionReceipt, VerifiedTransaction}, - trie_storage::{ROCKSDB_TAGGING_AT, TrieStorage}, + trie_storage::{LATEST_KEY_LEN, LEGACY_KEY_LEN, ROCKSDB_TAGGING_AT, TrieStorage}, }; const MAX_KEYS_IN_SINGLE_QUERY: usize = 32765; @@ -265,7 +265,6 @@ pub struct Db { /// State Pruning rev_ceil: Arc, // always set to the finalised view height; set by Db::set_finalised_view() rev_floor: Arc, // resets to u64::MAX at startup; gets set during prune.; set by Db::snapshot() - del_legacy: Arc, pub tag_view: Arc>, // used to lock the snapshot process } @@ -322,15 +321,13 @@ impl Db { let connection = pool.get()?; Self::ensure_schema(&connection)?; - let rev_floor = Arc::new(AtomicU64::new(u64::MAX)); - let del_legacy = Arc::new(AtomicBool::new(false)); // Should be safe in single-threaded mode // https://docs.rs/rocksdb/latest/rocksdb/type.DB.html#limited-performance-implication-for-single-threaded-mode let rdb_path = path.as_ref().map_or_else( || tempfile::tempdir().unwrap().path().join("state.rocksdb"), |p| p.join("state.rocksdb"), ); - let rdb_opts = Self::init_rocksdb(config.clone(), rev_floor.clone(), del_legacy.clone()); + let (rdb_opts, rev_floor) = Self::init_rocksdb(config.clone()); let rdb = DBWithThreadMode::::open(&rdb_opts, rdb_path)?; tracing::info!( @@ -343,10 +340,14 @@ impl Db { let cache = LruCache::new(NonZeroUsize::new(config.rocksdb_state_cache_size / 500).unwrap()); - // *** Must use the latest tag for keys. *** - let last_tag = rdb.get(ROCKSDB_TAGGING_AT)?.map_or(u64::MAX, |v| { - u64::from_be_bytes(v.try_into().expect("8-bytes")) - }); + // Use the last known tag for keys. + // Defaults to MAX - 1, to work well with TrieStorage::migrate_legacy(). + // MAX is reserved for use by the background legacy migration process. + let last_tag = rdb + .get(ROCKSDB_TAGGING_AT)? + .map_or(u64::MAX.saturating_sub(1), |v| { + u64::from_be_bytes(v.try_into().expect("8-bytes")) + }); let rev_ceil = Arc::new(AtomicU64::new(last_tag)); // stores the reverse view let tag_view = Arc::new(Mutex::new(u64::MAX.saturating_sub(last_tag))); // stores the equivalent view @@ -360,7 +361,6 @@ impl Db { rev_ceil, rev_floor, tag_view, - del_legacy, }) } @@ -642,11 +642,7 @@ impl Db { Ok(()) } - fn init_rocksdb( - config: DbConfig, - tag_floor: Arc, - del_legacy: Arc, - ) -> rocksdb::Options { + fn init_rocksdb(config: DbConfig) -> (rocksdb::Options, Arc) { // RocksDB configuration let mut block_opts = BlockBasedOptions::default(); // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter @@ -681,29 +677,34 @@ impl Db { rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); rdb_opts.set_bottommost_zstd_max_train_bytes(0, true); - // Keys are only removed if they are older than the floor value, which is set during snapshot. - // After pruning, old and legacy keys are eventually removed when the background compaction runs. - let tag_floor = tag_floor.clone(); - let del_legacy = del_legacy.clone(); - rdb_opts.set_compaction_filter( - "StatePruneFilter", - move |_lvl, key, _value| -> CompactionDecision { - match key.len() { - // 40-bytes: remove tagged key, if the key is 'older' than the floor - 40 if u64::from_be_bytes(key[32..40].try_into().unwrap()) - > tag_floor.load(Ordering::Relaxed) => - { - CompactionDecision::Remove + let tag_floor = Arc::new(AtomicU64::new(u64::MAX)); + // Use the default compaction filter, unless pruning is enabled + if config.state_prune { + let rev_floor = tag_floor.clone(); + // Keys are only removed if they are older than the floor value, which is set during snapshot. + // After pruning, old and legacy keys are eventually removed when the background compaction runs. + rdb_opts.set_compaction_filter( + "StatePruneFilter", + move |_lvl, key, _value| -> CompactionDecision { + match key.len() { + // 40-bytes: remove tagged key, if the key is 'older' than the floor + LATEST_KEY_LEN + if u64::from_be_bytes(key[32..40].try_into().unwrap()) + > rev_floor.load(Ordering::Relaxed) => + { + CompactionDecision::Remove + } + // 32-bytes: remove legacy key, if snapshot already taken + LEGACY_KEY_LEN if rev_floor.load(Ordering::Relaxed) != u64::MAX => { + CompactionDecision::Remove + } + // default to keep, all other keys + _ => CompactionDecision::Keep, } - // 32-bytes: remove legacy key, if snapshot already taken - 32 if del_legacy.load(Ordering::Relaxed) => CompactionDecision::Remove, - // default to keep, all other keys - _ => CompactionDecision::Keep, - } - }, - ); - - rdb_opts + }, + ); + } + (rdb_opts, tag_floor) } // SQLite performance tweaks @@ -870,7 +871,6 @@ impl Db { self.rev_ceil.clone(), self.rev_floor.clone(), self.tag_view.clone(), - self.del_legacy.clone(), )) } diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index bd544b3895..4bb0cefd09 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -396,6 +396,8 @@ pub enum InternalMessage { UnsubscribeFromGossipSubTopic(GossipSubTopic), /// Snapshot a trie at a given point PromoteTrie(TrieStorage, B256, u64), + /// Migrate the legacy trie to the new trie + MigrateTrie(TrieStorage), } #[derive(Debug, Clone)] @@ -425,6 +427,9 @@ impl Display for InternalMessage { InternalMessage::PromoteTrie(_storage, hash, view) => { write!(f, "PromoteTrie({hash:?}) {view})") } + InternalMessage::MigrateTrie(_storage) => { + write!(f, "MigrateTrie") + } } } } diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index 0263be10cf..c86a8a4bf0 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -211,11 +211,16 @@ impl Node { .consensus .get_forks()? .find_height_fork_first_activated(ForkName::ExecutableBlocks); + + // FIXME: This is a hack, we should factorise pruning logic out of sync/db. + let mut db_config = config.db.clone(); + db_config.state_prune = config.sync.prune_interval != u64::MAX; + let db = Arc::new(Db::new( config.data_dir.as_ref(), config.eth_chain_id, executable_blocks_height, - config.db.clone(), + db_config, )?); let node = Node { config: config.clone(), diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index d6295acd02..dbbd42ecbf 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -487,7 +487,15 @@ impl P2pNode { InternalMessage::PromoteTrie(trie, hash, view) => { self.task_threads.spawn(async move { if let Err(e) = db::promote_trie(trie, hash, view) { - tracing::error!("Snapshot failed: {e:?}"); + tracing::error!(error = %e, "Snapshot failed"); + } + Ok(()) + }); + } + InternalMessage::MigrateTrie(storage) => { + self.task_threads.spawn(async move { + if let Err(e) = storage.migrate_legacy() { + tracing::error!(error = %e, "Migration failed"); } Ok(()) }); diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index 4b80773900..c40b29c2bb 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -1,6 +1,9 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, AtomicU64, Ordering}, +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + usize, }; use anyhow::Result; @@ -18,7 +21,10 @@ use crate::{cfg::Forks, crypto::Hash, state::Account}; /// Special storage keys const ROCKSDB_MIGRATE_AT: &str = "migrate_at"; const ROCKSDB_CUTOVER_AT: &str = "cutover_at"; +const ROCKSDB_CONTENT_AT: &str = "content_at"; pub const ROCKSDB_TAGGING_AT: &str = "tagging_at"; +pub const LEGACY_KEY_LEN: usize = 32; +pub const LATEST_KEY_LEN: usize = 40; /// An implementor of [eth_trie::DB] which uses a [rocksdb::DB]/[rusqlite::Connection] to persist data. #[derive(Debug, Clone)] @@ -29,7 +35,6 @@ pub struct TrieStorage { // the tag_* values are stored in the reverse height order i.e. u64::MAX is genesis. rev_ceil: Arc, // used to tag every write to the state database; reverse-ordered. rev_floor: Arc, // used to mark the compaction boundary; reverse-ordered. - del_legacy: Arc, pub tag_view: Arc>, // used to lock the snapshot process; forward-ordered. } @@ -41,7 +46,6 @@ impl TrieStorage { rev_ceil: Arc, rev_floor: Arc, tag_view: Arc>, - del_legacy: Arc, ) -> Self { Self { pool, @@ -50,7 +54,6 @@ impl TrieStorage { rev_ceil, rev_floor, tag_view, - del_legacy, } } @@ -60,7 +63,57 @@ impl TrieStorage { .get() .unwrap() .execute("DELETE FROM state_trie", [])?; // TRUNCATE - self.del_legacy.store(true, Ordering::Relaxed); // allow deletion of rocksdb keys + Ok(()) + } + + const CHUNK_SIZE: usize = 1_000; + /// One-off migrate all rocksdb legacy keys to the latest keys. + /// + /// This function migrates legacy keys from RocksDB to the latest format. + /// It iterates through all keys in the database, identifies legacy keys, + /// and migrates them to the new format. + /// + // It is possible that some keys may have been lazily migrated during normal operations. + // This function ensures that any such keys are migrated in a way that does not hide nor + // impact the reading of newer keys. + pub fn migrate_legacy(&self) -> Result<()> { + // skip if done. + if self.kvdb.get(ROCKSDB_CONTENT_AT)?.is_some() { + return Ok(()); + } + + // Since the default database tag is MAX - 1, using MAX here ensures that, + // this key does not hide any newer keys (all < MAX). + let tag = u64::MAX.to_be_bytes(); + + // pre-allocate and re-use memory buffer. + let mut keys = Vec::with_capacity(Self::CHUNK_SIZE); + let mut values = Vec::with_capacity(Self::CHUNK_SIZE); + + // iterate thru all keys, migrating legacy keys. + for kv in self.kvdb.iterator(rocksdb::IteratorMode::Start) { + let (key, value) = kv?; + // migrate legacy keys, overwriting any previous migration. + if key.len() == LEGACY_KEY_LEN { + keys.push(key.to_vec()); + values.push(value.to_vec()); + // chunking + if keys.len() == Self::CHUNK_SIZE { + self.write_batch( + std::mem::take(&mut keys), + std::mem::take(&mut values), + tag, // this will not override any newer values + true, + )?; + } + } + } + // last chunk + if !keys.is_empty() { + self.write_batch(keys, values, tag, true)?; + } + self.kvdb + .put(ROCKSDB_CONTENT_AT, usize::MAX.to_be_bytes())?; Ok(()) } @@ -120,7 +173,13 @@ impl TrieStorage { // This is the tagging scheme used. Each tag is a U64 in big-endian format. // |user-key + tag|seqno|type| // |<-----internal key------>| - fn write_batch(&self, keys: Vec>, values: Vec>, tag: [u8; 8]) -> Result<()> { + fn write_batch( + &self, + keys: Vec>, + values: Vec>, + tag: [u8; 8], + migration: bool, + ) -> Result<()> { if keys.is_empty() { return Ok(()); } @@ -134,7 +193,13 @@ impl TrieStorage { let mut tag_key = key.clone(); tag_key.extend_from_slice(tag.as_slice()); // suffix big-endian tags batch.put(tag_key.as_slice(), value.as_slice()); - cache.put(key, value); + + // for legacy migration skip the cache and tombstone the key. + if !migration { + cache.put(key, value); + } else { + batch.delete(key.as_slice()); + } } Ok(self.kvdb.write(batch)?) } @@ -160,16 +225,16 @@ impl TrieStorage { // Given that the trie keys are *all* Keccak256 keys, the legacy keys are exactly 32-bytes (256-bits) long, // while the new tag keys are exactly 40-bytes (320-bits) long. We do not expect any other trie-key lengths. - if key.len() == 40 { - let tag = key[32..40].try_into().unwrap(); + if key.len() == LATEST_KEY_LEN { + let tag = key[LEGACY_KEY_LEN..].try_into().unwrap(); // latest tag key, return the latest value Ok(Some((tag, value.to_vec()))) - } else if key.len() == 32 { + } else if key.len() == LEGACY_KEY_LEN { // legacy key - peek to see if a later tag key is present if let Some(peek) = iter.next() { let peek = peek?; if peek.0.starts_with(key_prefix) { - let tag = peek.0[32..].try_into().expect("8-bytes"); + let tag = peek.0[LEGACY_KEY_LEN..].try_into().expect("8-bytes"); return Ok(Some((tag, peek.1.to_vec()))); // tag key has newer value } } @@ -298,7 +363,7 @@ impl eth_trie::DB for TrieStorage { .rev_ceil .load(std::sync::atomic::Ordering::Relaxed) .to_be_bytes(); - self.write_batch(vec![key.to_vec()], vec![value.clone()], tag) + self.write_batch(vec![key.to_vec()], vec![value.clone()], tag, false) .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; return Ok(Some(value)); } @@ -312,7 +377,7 @@ impl eth_trie::DB for TrieStorage { .rev_ceil .load(std::sync::atomic::Ordering::Relaxed) .to_be_bytes(); - self.write_batch(vec![key.to_vec()], vec![value], tag) + self.write_batch(vec![key.to_vec()], vec![value], tag, false) .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } @@ -322,7 +387,7 @@ impl eth_trie::DB for TrieStorage { .rev_ceil .load(std::sync::atomic::Ordering::Relaxed) .to_be_bytes(); - self.write_batch(keys, values, tag) + self.write_batch(keys, values, tag, false) .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } @@ -344,6 +409,7 @@ impl eth_trie::DB for TrieStorage { .load(Ordering::Relaxed) .saturating_add(1) // push it one-down, to keep only 1 snapshot on-disk. .to_be_bytes(), + false, ) .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? } @@ -379,7 +445,6 @@ mod tests { let tag = Arc::new(AtomicU64::new(u64::MAX)); let lock = Arc::new(Mutex::new(u64::MIN)); let cache = Arc::new(RwLock::new(LruCache::unbounded())); - let del = Arc::new(AtomicBool::new(true)); let trie_storage = TrieStorage::new( sql.clone(), rdb.clone(), @@ -387,7 +452,6 @@ mod tests { tag.clone(), tag.clone(), lock.clone(), - del.clone(), ); let trie_storage = Arc::new(trie_storage); (sql, rdb, cache, trie_storage) diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index 478f381f4f..adf8c3e8ed 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -1134,6 +1134,7 @@ impl Network { InternalMessage::PromoteTrie(storage, hash, view) => { db::promote_trie(storage.clone(), *hash, *view).unwrap(); } + InternalMessage::MigrateTrie(storage) => storage.migrate_legacy().unwrap(), } } AnyMessage::External(external_message) => { From e54d75bb3a27c19836f545bed031a7de2ebc233e Mon Sep 17 00:00:00 2001 From: Shawn Date: Fri, 13 Feb 2026 10:34:24 +0800 Subject: [PATCH 04/13] implement lazy migration - better operator ergonomics. --- docs/state_pruning.md | 16 ++- zilliqa/src/api/admin.rs | 12 --- zilliqa/src/consensus.rs | 8 +- zilliqa/src/db.rs | 29 +++--- zilliqa/src/message.rs | 11 +- zilliqa/src/p2p_node.rs | 12 +-- zilliqa/src/trie_storage.rs | 198 +++++++++++++++--------------------- zilliqa/tests/it/main.rs | 5 +- zilliqa/tests/it/sync.rs | 1 + 9 files changed, 117 insertions(+), 175 deletions(-) diff --git a/docs/state_pruning.md b/docs/state_pruning.md index f5c76a195b..94f78976ab 100644 --- a/docs/state_pruning.md +++ b/docs/state_pruning.md @@ -45,11 +45,11 @@ This ensures that the timestamp is always monotonically increasing per block. > By default RocksDB will ensure that compaction is triggered at least once every 30-days; but you can configure this behaviour by setting the `db.rocksdb_compaction_period` option. At each epoch, the node increments its internal timestamp *ceiling*, which will result in any new state being tagged with a higher timestamp. -Also, the node will trigger a background operation to *promote* all pre-existing state that should be retained, by duplicating the entire state-trie with the higher timestamp. -This operation may take some time to complete, but the node will only allow one such operation at a time. +Also, the node will trigger a background operation to *promote* all active state that should be retained, by duplicating the entire state-trie with the higher timestamp. +This operation may take some time to complete, possibly several epochs, and the node will only allow one such operation at a time. After the operation is complete, the node increments its internal timestamp *floor*; and any state with a timestamp below the *floor* will eventually be compacted away. -### Conditions +#### Conditions The conditions for the *ceiling* and the *floor* are that: - The *ceiling* is always incremented i.e. new ceiling > old ceiling. @@ -59,5 +59,11 @@ The conditions for the *ceiling* and the *floor* are that: The promotion operation will only be triggered if the lowest block view is greater than the current *ceiling*. This condition ensures the safety that, the only states pruned are those that are absolutely no longer needed, since the block no longer exists in the SQL database. -This also means that the node may retain slightly more state than it actually needs. -Considering the amount of state that can be pruned, this tiny bit of extra state is negligible. +This also means that the node may retain more state than it actually needs. +Considering the amount of state saved through pruning, this bit of extra state is negligible. + +### Key Migration + +For nodes that disable pruning, the existing keys need to be migrated to the new tagged-keys. +This process is done lazily, as nodes are being read from time to time. +Whenever a legacy-key node is read from the database, the key is migrated to the new tagged-key format and deleted. diff --git a/zilliqa/src/api/admin.rs b/zilliqa/src/api/admin.rs index cc98787520..c040ec4d0d 100644 --- a/zilliqa/src/api/admin.rs +++ b/zilliqa/src/api/admin.rs @@ -41,7 +41,6 @@ pub fn rpc_module(node: Arc, enabled_apis: &[EnabledApi]) -> RpcModule) -> Result } Ok(leaders) } - -fn snapshot(params: Params, node: &Arc) -> Result { - let mut params = params.sequence(); - let block_id: BlockId = params.next()?; - let block = node - .get_block(block_id)? - .ok_or(anyhow!("Block {block_id} does not exist"))?; - - // node.consensus.read().snapshot_at(block.number())?; - Ok(block.view()) -} diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index dd942978c9..f4271ebe92 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -386,7 +386,6 @@ impl Consensus { let state_sync = config.db.state_sync; let forks = config.consensus.get_forks()?; let enable_ots_indices = config.enable_ots_indices; - let prune_interval = config.sync.prune_interval; let mut consensus = Consensus { secret_key, @@ -582,11 +581,6 @@ impl Consensus { // Initialize state trie storage let state_trie = consensus.db.state_trie()?; state_trie.init_state_trie(forks)?; - if prune_interval == u64::MAX { - // If the note is being pruned, let pruning naturally migrate the state. - // Otherwise, trigger migration - the actual migration process will be handled by the coordinator - message_sender.send_message_to_coordinator(InternalMessage::MigrateTrie(state_trie))?; - } // If timestamp of when current high_qc was written exists then use it to estimate the minimum number of blocks the network has moved on since shut down // This is useful in scenarios in which consensus has failed since this node went down @@ -2466,7 +2460,7 @@ impl Consensus { // trigger snapshot self.message_sender - .send_message_to_coordinator(InternalMessage::PromoteTrie( + .send_message_to_coordinator(InternalMessage::SnapshotTrie( trie_storage, block.state_root_hash().into(), block_number, diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 801aedc2c3..53a8082a2b 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -40,7 +40,7 @@ use crate::{ precompiles::ViewHistory, time::SystemTime, transaction::{EvmGas, Log, SignedTransaction, TransactionReceipt, VerifiedTransaction}, - trie_storage::{LATEST_KEY_LEN, LEGACY_KEY_LEN, ROCKSDB_TAGGING_AT, TrieStorage}, + trie_storage::{LEGACY_KEY_LEN, ROCKSDB_TAGGING_AT, TAGGED_KEY_LEN, TrieStorage}, }; const MAX_KEYS_IN_SINGLE_QUERY: usize = 32765; @@ -264,8 +264,8 @@ pub struct Db { pub config: DbConfig, /// State Pruning rev_ceil: Arc, // always set to the finalised view height; set by Db::set_finalised_view() - rev_floor: Arc, // resets to u64::MAX at startup; gets set during prune.; set by Db::snapshot() - pub tag_view: Arc>, // used to lock the snapshot process + rev_floor: Arc, // resets to u64::MAX at startup; gets set by Db::snapshot() + pub tag_view: Arc>, // used to lock the promotion process } impl Db { @@ -341,13 +341,9 @@ impl Db { LruCache::new(NonZeroUsize::new(config.rocksdb_state_cache_size / 500).unwrap()); // Use the last known tag for keys. - // Defaults to MAX - 1, to work well with TrieStorage::migrate_legacy(). - // MAX is reserved for use by the background legacy migration process. - let last_tag = rdb - .get(ROCKSDB_TAGGING_AT)? - .map_or(u64::MAX.saturating_sub(1), |v| { - u64::from_be_bytes(v.try_into().expect("8-bytes")) - }); + let last_tag = rdb.get(ROCKSDB_TAGGING_AT)?.map_or(u64::MAX, |v| { + u64::from_be_bytes(v.try_into().expect("8-bytes")) + }); let rev_ceil = Arc::new(AtomicU64::new(last_tag)); // stores the reverse view let tag_view = Arc::new(Mutex::new(u64::MAX.saturating_sub(last_tag))); // stores the equivalent view @@ -688,7 +684,7 @@ impl Db { move |_lvl, key, _value| -> CompactionDecision { match key.len() { // 40-bytes: remove tagged key, if the key is 'older' than the floor - LATEST_KEY_LEN + TAGGED_KEY_LEN if u64::from_be_bytes(key[32..40].try_into().unwrap()) > rev_floor.load(Ordering::Relaxed) => { @@ -871,6 +867,7 @@ impl Db { self.rev_ceil.clone(), self.rev_floor.clone(), self.tag_view.clone(), + self.config.state_prune, )) } @@ -1702,17 +1699,17 @@ impl Db { } } -/// Promote the state trie. +/// Snapshot the state trie. /// /// Promotes the tag of each node to the given view. This process may take a while to complete. /// The `tag_lock` ensures that only one snapshot is in progress at a time. /// The previous state trie will be eventually pruned during compaction. -pub fn promote_trie(storage: TrieStorage, root_hash: B256, block_number: u64) -> Result<()> { +pub fn snapshot_trie(storage: TrieStorage, root_hash: B256, block_number: u64) -> Result<()> { let trie = Arc::new(storage); let tag_lock = trie.tag_view.lock(); - tracing::info!(%root_hash, block_number, "Promote: start"); - TrieStorage::promote(trie.clone(), root_hash)?; - tracing::info!(%root_hash, block_number, "Promote: done"); + tracing::info!(%root_hash, block_number, "Snapshot: start"); + TrieStorage::snapshot(trie.clone(), root_hash)?; + tracing::info!(%root_hash, block_number, "Snapshot: done"); if trie.set_tag_floor(*tag_lock)? != 0 { trie.drop_sql_state_trie()?; // delete SQL database } diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index 4bb0cefd09..986306c624 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -395,9 +395,7 @@ pub enum InternalMessage { /// Notify p2p cordinator to unsubscribe from a particular gossipsub topic UnsubscribeFromGossipSubTopic(GossipSubTopic), /// Snapshot a trie at a given point - PromoteTrie(TrieStorage, B256, u64), - /// Migrate the legacy trie to the new trie - MigrateTrie(TrieStorage), + SnapshotTrie(TrieStorage, B256, u64), } #[derive(Debug, Clone)] @@ -424,11 +422,8 @@ impl Display for InternalMessage { InternalMessage::UnsubscribeFromGossipSubTopic(topic) => { write!(f, "UnsubscribeFromGossipSubTopic({topic:?})") } - InternalMessage::PromoteTrie(_storage, hash, view) => { - write!(f, "PromoteTrie({hash:?}) {view})") - } - InternalMessage::MigrateTrie(_storage) => { - write!(f, "MigrateTrie") + InternalMessage::SnapshotTrie(_storage, hash, view) => { + write!(f, "SnapshotTrie({hash:?}) {view})") } } } diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index dbbd42ecbf..9f639c7d1d 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -484,22 +484,14 @@ impl P2pNode { self.swarm.behaviour_mut().gossipsub.unsubscribe(&Self::validator_topic(shard_id)); } } - InternalMessage::PromoteTrie(trie, hash, view) => { + InternalMessage::SnapshotTrie(trie, hash, view) => { self.task_threads.spawn(async move { - if let Err(e) = db::promote_trie(trie, hash, view) { + if let Err(e) = db::snapshot_trie(trie, hash, view) { tracing::error!(error = %e, "Snapshot failed"); } Ok(()) }); } - InternalMessage::MigrateTrie(storage) => { - self.task_threads.spawn(async move { - if let Err(e) = storage.migrate_legacy() { - tracing::error!(error = %e, "Migration failed"); - } - Ok(()) - }); - } } }, message = self.request_responses_receiver.next() => { diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index c40b29c2bb..b3928ce00e 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -1,11 +1,9 @@ -use std::{ - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, - usize, +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, }; +use alloy::primitives::KECCAK256_EMPTY; use anyhow::Result; use eth_trie::{EthTrie, Trie as _}; use lru::LruCache; @@ -21,10 +19,9 @@ use crate::{cfg::Forks, crypto::Hash, state::Account}; /// Special storage keys const ROCKSDB_MIGRATE_AT: &str = "migrate_at"; const ROCKSDB_CUTOVER_AT: &str = "cutover_at"; -const ROCKSDB_CONTENT_AT: &str = "content_at"; pub const ROCKSDB_TAGGING_AT: &str = "tagging_at"; -pub const LEGACY_KEY_LEN: usize = 32; -pub const LATEST_KEY_LEN: usize = 40; +pub const LEGACY_KEY_LEN: usize = KECCAK256_EMPTY.0.len(); +pub const TAGGED_KEY_LEN: usize = KECCAK256_EMPTY.0.len() + std::mem::size_of::(); /// An implementor of [eth_trie::DB] which uses a [rocksdb::DB]/[rusqlite::Connection] to persist data. #[derive(Debug, Clone)] @@ -36,6 +33,7 @@ pub struct TrieStorage { rev_ceil: Arc, // used to tag every write to the state database; reverse-ordered. rev_floor: Arc, // used to mark the compaction boundary; reverse-ordered. pub tag_view: Arc>, // used to lock the snapshot process; forward-ordered. + state_prune: bool, } impl TrieStorage { @@ -46,6 +44,7 @@ impl TrieStorage { rev_ceil: Arc, rev_floor: Arc, tag_view: Arc>, + state_prune: bool, ) -> Self { Self { pool, @@ -54,6 +53,7 @@ impl TrieStorage { rev_ceil, rev_floor, tag_view, + state_prune, } } @@ -66,65 +66,14 @@ impl TrieStorage { Ok(()) } - const CHUNK_SIZE: usize = 1_000; - /// One-off migrate all rocksdb legacy keys to the latest keys. - /// - /// This function migrates legacy keys from RocksDB to the latest format. - /// It iterates through all keys in the database, identifies legacy keys, - /// and migrates them to the new format. - /// - // It is possible that some keys may have been lazily migrated during normal operations. - // This function ensures that any such keys are migrated in a way that does not hide nor - // impact the reading of newer keys. - pub fn migrate_legacy(&self) -> Result<()> { - // skip if done. - if self.kvdb.get(ROCKSDB_CONTENT_AT)?.is_some() { - return Ok(()); - } - - // Since the default database tag is MAX - 1, using MAX here ensures that, - // this key does not hide any newer keys (all < MAX). - let tag = u64::MAX.to_be_bytes(); - - // pre-allocate and re-use memory buffer. - let mut keys = Vec::with_capacity(Self::CHUNK_SIZE); - let mut values = Vec::with_capacity(Self::CHUNK_SIZE); - - // iterate thru all keys, migrating legacy keys. - for kv in self.kvdb.iterator(rocksdb::IteratorMode::Start) { - let (key, value) = kv?; - // migrate legacy keys, overwriting any previous migration. - if key.len() == LEGACY_KEY_LEN { - keys.push(key.to_vec()); - values.push(value.to_vec()); - // chunking - if keys.len() == Self::CHUNK_SIZE { - self.write_batch( - std::mem::take(&mut keys), - std::mem::take(&mut values), - tag, // this will not override any newer values - true, - )?; - } - } - } - // last chunk - if !keys.is_empty() { - self.write_batch(keys, values, tag, true)?; - } - self.kvdb - .put(ROCKSDB_CONTENT_AT, usize::MAX.to_be_bytes())?; - Ok(()) - } - /// This snapshot promotes the *active* keys to the tag /// /// This works on duplicating the underlying db-level keys, not the trie-level keys. /// e.g. 500 trie-level random keys takes ~150 db-level keys. // // Previously, no deletion of keys was allowed in the state database. So, it is safe to repurpose clear_trie_from_db() to - // promote the trie, rather than delete it. - pub fn promote(trie_storage: Arc, state_root_hash: B256) -> Result<()> { + // snapshot-to-promote the trie, rather than delete it. + pub fn snapshot(trie_storage: Arc, state_root_hash: B256) -> Result<()> { let mut state_trie = EthTrie::new(trie_storage.clone()).at_root(state_root_hash); for akv in state_trie.iter() { let (_key, serialised_account) = akv?; @@ -178,7 +127,7 @@ impl TrieStorage { keys: Vec>, values: Vec>, tag: [u8; 8], - migration: bool, + is_migration: bool, ) -> Result<()> { if keys.is_empty() { return Ok(()); @@ -188,17 +137,17 @@ impl TrieStorage { let mut batch = WriteBatch::default(); let mut cache = self.cache.write(); - for (key, value) in keys.into_iter().zip(values.into_iter()) { + for (key_prefix, value) in keys.into_iter().zip(values.into_iter()) { // tag keys; lexicographically sorted - let mut tag_key = key.clone(); + let mut tag_key = key_prefix.clone(); tag_key.extend_from_slice(tag.as_slice()); // suffix big-endian tags batch.put(tag_key.as_slice(), value.as_slice()); - // for legacy migration skip the cache and tombstone the key. - if !migration { - cache.put(key, value); + // If migration, bypass cache, and delete the old key + if !is_migration { + cache.put(key_prefix, value); } else { - batch.delete(key.as_slice()); + batch.delete(key_prefix.as_slice()); } } Ok(self.kvdb.write(batch)?) @@ -212,36 +161,48 @@ impl TrieStorage { // This is the tagging scheme used. Each tag is a U64 in big-endian format. // |user-key + tag|seqno|type| // |<-----internal key------>| - fn get_tag_value(&self, key_prefix: &[u8]) -> Result)>> { + fn get_tag_value(&self, key_prefix: &[u8]) -> Result>> { let mut iter = self.kvdb.prefix_iterator(key_prefix); // early return if user-key/prefix is not found - let Some(prefix) = iter.next() else { + let Some((key, value)) = iter.next().transpose()? else { return Ok(None); }; - let (key, value) = prefix?; if !key.starts_with(key_prefix) { return Ok(None); } // Given that the trie keys are *all* Keccak256 keys, the legacy keys are exactly 32-bytes (256-bits) long, // while the new tag keys are exactly 40-bytes (320-bits) long. We do not expect any other trie-key lengths. - if key.len() == LATEST_KEY_LEN { - let tag = key[LEGACY_KEY_LEN..].try_into().unwrap(); - // latest tag key, return the latest value - Ok(Some((tag, value.to_vec()))) - } else if key.len() == LEGACY_KEY_LEN { - // legacy key - peek to see if a later tag key is present - if let Some(peek) = iter.next() { - let peek = peek?; - if peek.0.starts_with(key_prefix) { - let tag = peek.0[LEGACY_KEY_LEN..].try_into().expect("8-bytes"); - return Ok(Some((tag, peek.1.to_vec()))); // tag key has newer value + match key.len() { + TAGGED_KEY_LEN => { + // latest tag key, naturally the most recent due to lexicographical order. + Ok(Some(value.to_vec())) + } + LEGACY_KEY_LEN => { + // legacy key - peek to see if a later tag key is present + if let Some((k, v)) = iter.next().transpose()? + && k.starts_with(key_prefix) + { + // Lazily delete the legacy key, if state_prune is false. + if !self.state_prune { + self.kvdb.delete(key)?; + } + return Ok(Some(v.to_vec())); // tag key has newer value } + // Migration fall-thru. + // If state_prune is true, migration will naturally happen during promotion. + // Lazily migrate the key, if state_prune is false. + if !self.state_prune { + self.write_batch( + vec![key.to_vec()], + vec![value.to_vec()], + self.rev_ceil.load(Ordering::Relaxed).to_be_bytes(), + true, + )?; + } + Ok(Some(value.to_vec())) // fall-thru value; return legacy value } - // We do not perform lazy migration here to avoid write amplification. - Ok(Some((u64::MAX.to_be_bytes(), value.to_vec()))) // fall-thru value; return legacy value - } else { - unimplemented!("unsupported trie key length {} bytes", key.len()); + _ => unimplemented!("unsupported trie key length: {} bytes", key.len()), } } @@ -338,7 +299,7 @@ impl eth_trie::DB for TrieStorage { } // L2 - rocksdb - if let Some((_tag, value)) = self + if let Some(value) = self .get_tag_value(key) .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? { @@ -359,12 +320,13 @@ impl eth_trie::DB for TrieStorage { if let Some(value) = value { // lazy migration - let tag = self - .rev_ceil - .load(std::sync::atomic::Ordering::Relaxed) - .to_be_bytes(); - self.write_batch(vec![key.to_vec()], vec![value.clone()], tag, false) - .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; + self.write_batch( + vec![key.to_vec()], + vec![value.clone()], + self.rev_ceil.load(Ordering::Relaxed).to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string()))?; return Ok(Some(value)); } @@ -373,22 +335,28 @@ impl eth_trie::DB for TrieStorage { #[inline] fn insert(&self, key: &[u8], value: Vec) -> Result<(), Self::Error> { - let tag = self - .rev_ceil - .load(std::sync::atomic::Ordering::Relaxed) - .to_be_bytes(); - self.write_batch(vec![key.to_vec()], vec![value], tag, false) - .map_err(|e| eth_trie::TrieError::DB(e.to_string())) + self.write_batch( + vec![key.to_vec()], + vec![value], + self.rev_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } #[inline] fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { - let tag = self - .rev_ceil - .load(std::sync::atomic::Ordering::Relaxed) - .to_be_bytes(); - self.write_batch(keys, values, tag, false) - .map_err(|e| eth_trie::TrieError::DB(e.to_string())) + self.write_batch( + keys, + values, + self.rev_ceil + .load(std::sync::atomic::Ordering::Relaxed) + .to_be_bytes(), + false, + ) + .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } fn flush(&self) -> Result<(), Self::Error> { @@ -401,7 +369,7 @@ impl eth_trie::DB for TrieStorage { // promoting the trie at the db-level, without iterating the trie-level keys. // ** only called from clear_trie_from_db() ** fn remove(&self, promote_key: &[u8]) -> Result<(), Self::Error> { - if let Ok(Some((_old_tag, value))) = self.get_tag_value(promote_key) { + if let Ok(Some(value)) = self.get_tag_value(promote_key) { self.write_batch( vec![promote_key.to_vec()], vec![value], @@ -452,13 +420,14 @@ mod tests { tag.clone(), tag.clone(), lock.clone(), + false, ); let trie_storage = Arc::new(trie_storage); (sql, rdb, cache, trie_storage) } #[test] - fn basic_snapshot() { + fn snapshot_doubles_the_nodes() { let (_, rdb, _, trie_storage) = setup(); let mut pmt = EthTrie::new(trie_storage.clone()); @@ -481,7 +450,7 @@ mod tests { // snapshot-to-promote nodes; and count nodes trie_storage.inc_tag(u64::MAX).unwrap(); - TrieStorage::promote(trie_storage.clone(), root_hash).unwrap(); + TrieStorage::snapshot(trie_storage.clone(), root_hash).unwrap(); assert_eq!( rdb.iterator(rocksdb::IteratorMode::Start).count(), old_count * 2 @@ -490,7 +459,7 @@ mod tests { #[test] // lazy migration from sqlite to rocksdb, works. - fn lazy_migration() { + fn lazy_migration_from_sqlite() { let (sql, rdb, _, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; @@ -517,7 +486,7 @@ mod tests { #[test] // height-tag read/write works - fn tagging() { + fn key_tagging_works() { let (_, rdb, cache, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; @@ -550,7 +519,7 @@ mod tests { #[test] // peek ahead works - fn peek_ahead() { + fn peek_ahead_and_migration() { let (_, rdb, cache, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; @@ -580,12 +549,13 @@ mod tests { // count all keys let iter = rdb.prefix_iterator(key_prefix.as_slice()); - assert_eq!(iter.count(), 3); + // The migration should have deleted the legacy key; and migrated everything to new keys. + assert_eq!(iter.count(), 2); } #[test] // writes to disk only happens on commit - fn write_on_commit() { + fn write_on_commit_only() { let (_, rdb, cache, trie_storage) = setup(); let key_prefix = alloy::consensus::EMPTY_ROOT_HASH.0; diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index adf8c3e8ed..e01a6a2ca7 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -1131,10 +1131,9 @@ impl Network { InternalMessage::UnsubscribeFromGossipSubTopic(topic) => { debug!("unsubscribing from topic {:?}", topic); } - InternalMessage::PromoteTrie(storage, hash, view) => { - db::promote_trie(storage.clone(), *hash, *view).unwrap(); + InternalMessage::SnapshotTrie(storage, hash, view) => { + db::snapshot_trie(storage.clone(), *hash, *view).unwrap(); } - InternalMessage::MigrateTrie(storage) => storage.migrate_legacy().unwrap(), } } AnyMessage::External(external_message) => { diff --git a/zilliqa/tests/it/sync.rs b/zilliqa/tests/it/sync.rs index 482c21134d..37790aab9a 100644 --- a/zilliqa/tests/it/sync.rs +++ b/zilliqa/tests/it/sync.rs @@ -35,6 +35,7 @@ async fn prune_interval(mut network: Network) { tracing::info!(number, "Added pruned node."); + // run for a bit to allow state pruning to kick in network .run_until_block_finalized(MIN_PRUNE_INTERVAL * 3, 1000) .await From 5e46e38aa9ab3fb342e059aa860684388bdd7194 Mon Sep 17 00:00:00 2001 From: Shawn Date: Mon, 16 Feb 2026 13:26:43 +0800 Subject: [PATCH 05/13] chore: clippy --- zilliqa/src/trie_storage.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index b3928ce00e..f5609a2a86 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -398,6 +398,7 @@ mod tests { use super::*; use crate::crypto::SecretKey; + #[allow(clippy::type_complexity)] fn setup() -> ( Arc>, Arc, From e62bb454d6f793b04998dde7098c23ec63db87ca Mon Sep 17 00:00:00 2001 From: Shawn Date: Fri, 20 Feb 2026 17:48:04 +0800 Subject: [PATCH 06/13] added some logging. --- zilliqa/src/consensus.rs | 1 + zilliqa/src/db.rs | 10 +++++++--- zilliqa/src/trie_storage.rs | 5 ++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index f4271ebe92..a104cbc56a 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -2457,6 +2457,7 @@ impl Consensus { let old_ceil = trie_storage.set_tag_ceil(new_ceil)?; // store the previous tag, which is the next floor. *tag_lock = old_ceil; + tracing::info!(block_number, new_ceil, old_ceil, "Snapshot: trigger"); // trigger snapshot self.message_sender diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 53a8082a2b..89b6a7dd7e 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -677,6 +677,8 @@ impl Db { // Use the default compaction filter, unless pruning is enabled if config.state_prune { let rev_floor = tag_floor.clone(); + let tag_floor = rev_floor.load(Ordering::Relaxed); + tracing::info!(tag_floor, "StatePruneFilter"); // Keys are only removed if they are older than the floor value, which is set during snapshot. // After pruning, old and legacy keys are eventually removed when the background compaction runs. rdb_opts.set_compaction_filter( @@ -1707,12 +1709,14 @@ impl Db { pub fn snapshot_trie(storage: TrieStorage, root_hash: B256, block_number: u64) -> Result<()> { let trie = Arc::new(storage); let tag_lock = trie.tag_view.lock(); - tracing::info!(%root_hash, block_number, "Snapshot: start"); + let new_tag = *tag_lock; + tracing::info!(%root_hash, block_number, new_tag, "Snapshot: start"); TrieStorage::snapshot(trie.clone(), root_hash)?; - tracing::info!(%root_hash, block_number, "Snapshot: done"); - if trie.set_tag_floor(*tag_lock)? != 0 { + let old_tag = trie.set_tag_floor(new_tag)?; + if old_tag != 0 { trie.drop_sql_state_trie()?; // delete SQL database } + tracing::info!(%root_hash, block_number, new_tag, old_tag, "Snapshot: done"); Ok(()) // not fatal, it can be retried later. } diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index f5609a2a86..129446a135 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -3,7 +3,6 @@ use std::sync::{ atomic::{AtomicU64, Ordering}, }; -use alloy::primitives::KECCAK256_EMPTY; use anyhow::Result; use eth_trie::{EthTrie, Trie as _}; use lru::LruCache; @@ -20,8 +19,8 @@ use crate::{cfg::Forks, crypto::Hash, state::Account}; const ROCKSDB_MIGRATE_AT: &str = "migrate_at"; const ROCKSDB_CUTOVER_AT: &str = "cutover_at"; pub const ROCKSDB_TAGGING_AT: &str = "tagging_at"; -pub const LEGACY_KEY_LEN: usize = KECCAK256_EMPTY.0.len(); -pub const TAGGED_KEY_LEN: usize = KECCAK256_EMPTY.0.len() + std::mem::size_of::(); +pub const LEGACY_KEY_LEN: usize = 32; +pub const TAGGED_KEY_LEN: usize = 40; /// An implementor of [eth_trie::DB] which uses a [rocksdb::DB]/[rusqlite::Connection] to persist data. #[derive(Debug, Clone)] From 91ce1f51907de0e80896108904e12a85981c006b Mon Sep 17 00:00:00 2001 From: Shawn Date: Mon, 23 Feb 2026 11:18:53 +0800 Subject: [PATCH 07/13] consider multiple prune_interval for snapshots. --- zilliqa/src/consensus.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index a104cbc56a..51c73b06a4 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -2418,11 +2418,18 @@ impl Consensus { if self.block_is_first_in_epoch(block.number()) && !block.is_genesis() { // Do snapshots + // at epoch/block boundaries to avoid state inconsistencies. if self.config.sync.prune_interval != u64::MAX { - // do at block boundaries to avoid state inconsistencies. - // do at epoch boundaries to reduce size amplification. - let range = self.db.available_range()?; - self.snapshot_at(*range.start(), block.view())?; + let multiple = + self.config.sync.prune_interval / self.config.consensus.blocks_per_epoch; + // gap > prune_interval to reduce size amplification. + if self + .epoch_number(block.number()) + .is_multiple_of(multiple.saturating_add(1)) + { + let range = self.db.available_range()?; + self.snapshot_at(*range.start(), block.view())?; + } }; // Do checkpoints if self.config.do_checkpoints From 1d1182a184c8312fe33f9702eecb645431c9b8a3 Mon Sep 17 00:00:00 2001 From: Shawn Date: Mon, 23 Feb 2026 12:39:28 +0800 Subject: [PATCH 08/13] sql: change TRUNCATE to DROP operation. --- zilliqa/src/trie_storage.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index 129446a135..353db4834a 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -58,10 +58,8 @@ impl TrieStorage { /// Truncate the state_trie table pub fn drop_sql_state_trie(&self) -> Result<()> { - self.pool - .get() - .unwrap() - .execute("DELETE FROM state_trie", [])?; // TRUNCATE + let sql = self.pool.get().unwrap(); + sql.execute("DROP TABLE IF EXISTS state_trie", [])?; // Use DROP TABLE faster Ok(()) } From 2eec87425a9d491809d91b2a2d0ede3180c88008 Mon Sep 17 00:00:00 2001 From: Shawn Date: Mon, 23 Feb 2026 15:14:34 +0800 Subject: [PATCH 09/13] return error in TrieStorage::remove() instead of silent; flush to disk. --- zilliqa/src/trie_storage.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index 353db4834a..0dbd08160c 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -4,7 +4,7 @@ use std::sync::{ }; use anyhow::Result; -use eth_trie::{EthTrie, Trie as _}; +use eth_trie::{DB, EthTrie, Trie as _}; use lru::LruCache; use parking_lot::{Mutex, RwLock}; use r2d2::Pool; @@ -81,6 +81,8 @@ impl TrieStorage { } // repurpose clear_trie_from_db() to promote the state trie state_trie.clear_trie_from_db()?; + // force flush to disk + trie_storage.flush()?; Ok(()) } @@ -366,7 +368,10 @@ impl eth_trie::DB for TrieStorage { // promoting the trie at the db-level, without iterating the trie-level keys. // ** only called from clear_trie_from_db() ** fn remove(&self, promote_key: &[u8]) -> Result<(), Self::Error> { - if let Ok(Some(value)) = self.get_tag_value(promote_key) { + if let Some(value) = self + .get_tag_value(promote_key) + .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? + { self.write_batch( vec![promote_key.to_vec()], vec![value], @@ -376,9 +381,10 @@ impl eth_trie::DB for TrieStorage { .to_be_bytes(), false, ) - .map_err(|e| eth_trie::TrieError::DB(e.to_string()))? + .map_err(|e| eth_trie::TrieError::DB(e.to_string())) + } else { + Err(eth_trie::TrieError::DB("trie not found".to_string())) } - Ok(()) } fn remove_batch(&self, _: &[Vec]) -> Result<(), Self::Error> { From 388f6e721f2060f0ecd5a6df641e18492ed8fb01 Mon Sep 17 00:00:00 2001 From: Shawn Date: Mon, 23 Feb 2026 18:31:30 +0800 Subject: [PATCH 10/13] additional commit() to TrieStorage::snapshot() function. --- zilliqa/src/trie_storage.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index 0dbd08160c..e0f134f6b0 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -76,11 +76,14 @@ impl TrieStorage { let (_key, serialised_account) = akv?; let account_root = Account::try_from(serialised_account.as_slice())?.storage_root; let mut account_trie = EthTrie::new(trie_storage.clone()).at_root(account_root); - // repurpose clear_trie_from_db() to promote the account trie + // repurpose clear_trie_from_db() to promote the trie; root_hash() forces a commit. + let _ = account_trie.root_hash()?; account_trie.clear_trie_from_db()?; } - // repurpose clear_trie_from_db() to promote the state trie + // repurpose clear_trie_from_db() to promote the trie; root_hash() forces a commit. + let _ = state_trie.root_hash()?; state_trie.clear_trie_from_db()?; + // force commit of root hash // force flush to disk trie_storage.flush()?; Ok(()) From c203311aff5b17a2d8eac2ba4c85b81a6bff5810 Mon Sep 17 00:00:00 2001 From: Shawn Date: Tue, 24 Feb 2026 10:46:50 +0800 Subject: [PATCH 11/13] added comments, documentation, config check. --- docs/state_pruning.md | 56 +++++++++++++++++++++++-------------- zilliqa/src/cfg.rs | 1 + zilliqa/src/db.rs | 14 ++++++---- zilliqa/src/trie_storage.rs | 2 +- 4 files changed, 46 insertions(+), 27 deletions(-) diff --git a/docs/state_pruning.md b/docs/state_pruning.md index 94f78976ab..ffc975417c 100644 --- a/docs/state_pruning.md +++ b/docs/state_pruning.md @@ -1,34 +1,46 @@ # State Pruning Guide -With `v0.21.0` the state pruning feature was introduced. +As of `v0.21.0` the state pruning feature is available. This feature allows users to prune the state database to reduce the size of the database and improve performance. There are two parts of the state pruning feature: 1. Pruning the blocks; and 2. Pruning the state. -## Pruning the Blocks +## How to Use It -To prune the blocks, users can use the `sync.prune_interval` configuration option. +If you wish to run a pruned node, the easiest way to do it is to: +1. Restore the node from the most recent checkpoint. +This mitigates the need to migrate any legacy keys. +Since you're going to prune anyway, the older blocks/state are unnecessary overhead. +2. Let the node active-sync and catch up with the head of the chain. +This is just to ensure that the node is up-to-date. +3. Set the `sync.prune_interval` configuration option to a positive integer. +If the number is too small, you may end up storing too many snapshots between compaction runs. +A good number is about a day's worth of blocks/state. +4. Restart the node. +You can monitor the logs for the "Snapshot:" messages. + +### Configuration + +To prune the blocks/state, users can use the `sync.prune_interval` configuration option. If this option is set to a positive integer, the node will retain only the most recent `sync.prune_interval` blocks. Any block older than that will be deleted from the SQL database. +Also, the node will prune the state database to reduce the size of the storage. -## Pruning the State +As part of this process, the node will *drop* the SQL `state_trie` table. +This only impacts nodes that have existing state prior to `v0.18.0`. +But, the node does not `VACUUM` to reclaim the space from the SQL database, as this process can take a long time. +If the SQL disk space needs to be reclaimed, node operators should schedule and manually run the `VACUUM` command on the SQL database. -To prune the state, users can use the `db.state_prune` configuration options. -If this option is set to `true`, the node will prune the state database to massively reduce the size of the storage. +## How it works -As part of this process, the node will also truncate the SQL `state_trie` table. -But, the node does not `VACUUM` to reclaim the space from the SQL database, as this process can possibly take a long time. -If the disk space needs to be reclaimed, node operators can manually run the `VACUUM` command on the SQL database. +The state pruning feature works by exploiting the RocksDB compaction feature. +Storage will get recovered as part of its normal background compaction operation that happens periodically and incrementally. +In fact, when it is first turned on, you may see an initial increase in storage usage. +But over time, the storage usage will decrease after several compaction runs. -### How it works - -The state pruning feature works by exploiting the RocksDB built-in compaction feature. We have chosen to implement a feature that is adapted from the [User-defined Time-stamp](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp) feature of RocksDB. -So, the amount of storage recovered is neither exact nor immediate. -In fact, when it is turned on, you may see an initial increase in storage usage. -But over time, the storage usage will decrease as the compaction process runs. - When writing data to the trie-storage in RocksDB, each key is tagged with a *timestamp* suffix that is used to determine the order of the keys. ``` @@ -44,12 +56,14 @@ This ensures that the timestamp is always monotonically increasing per block. - When the built-in compaction feature is triggered, any *stale* keys are removed and the disk space is recovered. > By default RocksDB will ensure that compaction is triggered at least once every 30-days; but you can configure this behaviour by setting the `db.rocksdb_compaction_period` option. -At each epoch, the node increments its internal timestamp *ceiling*, which will result in any new state being tagged with a higher timestamp. -Also, the node will trigger a background operation to *promote* all active state that should be retained, by duplicating the entire state-trie with the higher timestamp. +### Operation + +1. At each epoch, the node increments its internal timestamp *ceiling*, which will result in any new state being tagged with a higher timestamp. +2. The node will trigger a background operation to *snapshot* all active state that should be retained, by duplicating the entire state-trie with the higher timestamp. This operation may take some time to complete, possibly several epochs, and the node will only allow one such operation at a time. -After the operation is complete, the node increments its internal timestamp *floor*; and any state with a timestamp below the *floor* will eventually be compacted away. +3. After the operation is complete, the node increments its internal timestamp *floor*; and any state with a timestamp below the *floor* will eventually be compacted away. -#### Conditions +### Conditions The conditions for the *ceiling* and the *floor* are that: - The *ceiling* is always incremented i.e. new ceiling > old ceiling. @@ -57,7 +71,7 @@ The conditions for the *ceiling* and the *floor* are that: - The *ceiling* is always greater than the *floor* i.e. ceiling > floor. - The *floor* always lags the *ceiling* i.e. floor == old ceiling. -The promotion operation will only be triggered if the lowest block view is greater than the current *ceiling*. +The snapshot operation will only be triggered if the lowest block view is greater than the current *ceiling*. This condition ensures the safety that, the only states pruned are those that are absolutely no longer needed, since the block no longer exists in the SQL database. This also means that the node may retain more state than it actually needs. Considering the amount of state saved through pruning, this bit of extra state is negligible. @@ -66,4 +80,4 @@ Considering the amount of state saved through pruning, this bit of extra state i For nodes that disable pruning, the existing keys need to be migrated to the new tagged-keys. This process is done lazily, as nodes are being read from time to time. -Whenever a legacy-key node is read from the database, the key is migrated to the new tagged-key format and deleted. +Whenever a legacy-key node is read from the database, the key is migrated to the new tagged-key format and deleted by compaction. diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index 9d0fe822c2..f495a56bc0 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -385,6 +385,7 @@ impl NodeConfig { self.state_cache_size == state_cache_size_default(), "state_cache_size is deprecated. Use db.rocksdb_cache_size and db.rocksdb_state_cache_size instead." ); + anyhow::ensure!(!self.db.state_prune, "db.state_prune must not be set"); // sync/prune settings anyhow::ensure!( self.sync.base_height == u64_max() || self.sync.prune_interval == u64_max(), diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 89b6a7dd7e..5965e369c6 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -672,20 +672,23 @@ impl Db { rdb_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); rdb_opts.set_bottommost_zstd_max_train_bytes(0, true); + // Increase background threads + rdb_opts.increase_parallelism(crate::available_threads() as i32); let tag_floor = Arc::new(AtomicU64::new(u64::MAX)); // Use the default compaction filter, unless pruning is enabled if config.state_prune { + tracing::info!("SnapshotCompactionFilter"); let rev_floor = tag_floor.clone(); - let tag_floor = rev_floor.load(Ordering::Relaxed); - tracing::info!(tag_floor, "StatePruneFilter"); - // Keys are only removed if they are older than the floor value, which is set during snapshot. + // Keys are only removed if they are older than the floor value, which is set during a snapshot. // After pruning, old and legacy keys are eventually removed when the background compaction runs. rdb_opts.set_compaction_filter( - "StatePruneFilter", + "SnapshotCompactionFilter", move |_lvl, key, _value| -> CompactionDecision { match key.len() { - // 40-bytes: remove tagged key, if the key is 'older' than the floor + // 40-bytes: remove tagged key, if the key is 'older' than the floor. + // The `rev_floor` is set to u64::MAX during startup; and only changed *after* a snapshot is taken. + // If a snapshot gets interrupted for whatever reason, the floor remains unchanged; partial snapshots are safe. TAGGED_KEY_LEN if u64::from_be_bytes(key[32..40].try_into().unwrap()) > rev_floor.load(Ordering::Relaxed) => @@ -693,6 +696,7 @@ impl Db { CompactionDecision::Remove } // 32-bytes: remove legacy key, if snapshot already taken + // Legacy keys are only removed after a snapshot of a more recent state has been taken. LEGACY_KEY_LEN if rev_floor.load(Ordering::Relaxed) != u64::MAX => { CompactionDecision::Remove } diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index e0f134f6b0..a2b8dde800 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -386,7 +386,7 @@ impl eth_trie::DB for TrieStorage { ) .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } else { - Err(eth_trie::TrieError::DB("trie not found".to_string())) + Err(eth_trie::TrieError::DB("clear trie not found".to_string())) } } From 2b829c7ab7544a8c530edae4b5e02a36a83c8885 Mon Sep 17 00:00:00 2001 From: Shawn Date: Wed, 25 Feb 2026 08:47:07 +0800 Subject: [PATCH 12/13] optimise rocksdb config. --- zilliqa/src/cfg.rs | 36 ++++++------------------------------ zilliqa/src/db.rs | 18 ++++++------------ zilliqa/src/trie_storage.rs | 1 - 3 files changed, 12 insertions(+), 43 deletions(-) diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index f495a56bc0..3a8b6d76b0 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -180,15 +180,6 @@ pub struct DbConfig { /// RocksDB block cache size, in bytes. #[serde(default = "rocksdb_cache_size_default")] pub rocksdb_cache_size: usize, - /// RocksDB periodic compaction seconds. - #[serde(default = "rocksdb_compaction_period_default")] - pub rocksdb_compaction_period: u64, - /// RocksDB max open files. - #[serde(default = "rocksdb_max_open_files_default")] - pub rocksdb_max_open_files: i32, - /// RocksDB cache index/filters - #[serde(default = "rocksdb_cache_index_filters_default")] - pub rocksdb_cache_index_filters: bool, /// State cache size #[serde(default = "rocksdb_state_cache_size_default")] pub rocksdb_state_cache_size: usize, @@ -196,28 +187,16 @@ pub struct DbConfig { #[serde(default = "rocksdb_block_size_default")] pub rocksdb_block_size: usize, /// Target File Size - #[serde(default = "rocksdb_target_file_size_default")] - pub rocksdb_target_file_size: u64, + #[serde(default = "rocksdb_memtable_budget_default")] + pub rocksdb_memtable_budget: usize, } fn rocksdb_block_size_default() -> usize { 1 << 14 // 16KB reduces in-memory indexes } -fn rocksdb_target_file_size_default() -> u64 { - 1 << 28 // 256MB reduces number of open files -} - -fn rocksdb_cache_index_filters_default() -> bool { - false // true: mitigate OOM; false: better performance. -} - -fn rocksdb_max_open_files_default() -> i32 { - -1 // Set max_open_files to -1 to always keep all files open, which avoids expensive table cache calls. -} - -fn rocksdb_compaction_period_default() -> u64 { - u64::MAX - 1 // allow rocksdb to decide +fn rocksdb_memtable_budget_default() -> usize { + 1 << 31 // 2GB is a reasonable default } fn rocksdb_cache_size_default() -> usize { @@ -225,7 +204,7 @@ fn rocksdb_cache_size_default() -> usize { } fn rocksdb_state_cache_size_default() -> usize { - 1 << 31 + 1 << 31 // 2GB is a reasonable default } fn sql_cache_size_default() -> usize { @@ -244,12 +223,9 @@ impl Default for DbConfig { state_sync: false, state_prune: false, rocksdb_cache_size: rocksdb_cache_size_default(), - rocksdb_compaction_period: rocksdb_compaction_period_default(), - rocksdb_max_open_files: rocksdb_max_open_files_default(), - rocksdb_cache_index_filters: rocksdb_cache_index_filters_default(), rocksdb_state_cache_size: rocksdb_state_cache_size_default(), rocksdb_block_size: rocksdb_block_size_default(), - rocksdb_target_file_size: rocksdb_target_file_size_default(), + rocksdb_memtable_budget: rocksdb_memtable_budget_default(), } } } diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 5965e369c6..63a940ae35 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -639,13 +639,12 @@ impl Db { } fn init_rocksdb(config: DbConfig) -> (rocksdb::Options, Arc) { - // RocksDB configuration let mut block_opts = BlockBasedOptions::default(); // reduce disk and memory usage - https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#ribbon-filter block_opts.set_ribbon_filter(10.0); block_opts.set_optimize_filters_for_memory(true); // reduce memory wastage with JeMalloc // Mitigate OOM - block_opts.set_cache_index_and_filter_blocks(config.rocksdb_cache_index_filters); + block_opts.set_cache_index_and_filter_blocks(true); // Improve cache utilisation block_opts.set_pin_top_level_index_and_filter(true); block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); @@ -653,21 +652,16 @@ impl Db { block_opts.set_partition_filters(true); block_opts.set_block_size(config.rocksdb_block_size); block_opts.set_metadata_block_size(config.rocksdb_block_size); - - let cache = - Cache::new_hyper_clock_cache(config.rocksdb_cache_size, config.rocksdb_block_size); + // RocksDB cache + let cache = Cache::new_lru_cache(config.rocksdb_cache_size); block_opts.set_block_cache(&cache); + // RocksDB configuration let mut rdb_opts = Options::default(); + // Set optimised defaults + rdb_opts.optimize_level_style_compaction(config.rocksdb_memtable_budget); rdb_opts.create_if_missing(true); rdb_opts.set_block_based_table_factory(&block_opts); - rdb_opts.set_periodic_compaction_seconds(config.rocksdb_compaction_period); - // Mitigate OOM - prevent opening too many files at a time - rdb_opts.set_max_open_files(config.rocksdb_max_open_files); - // Reduce reads - rdb_opts.set_level_compaction_dynamic_level_bytes(true); - rdb_opts.set_target_file_size_base(config.rocksdb_target_file_size); - rdb_opts.set_max_bytes_for_level_base(config.rocksdb_target_file_size << 2); // Reduce storage rdb_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); rdb_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index a2b8dde800..b3e74f34ec 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -83,7 +83,6 @@ impl TrieStorage { // repurpose clear_trie_from_db() to promote the trie; root_hash() forces a commit. let _ = state_trie.root_hash()?; state_trie.clear_trie_from_db()?; - // force commit of root hash // force flush to disk trie_storage.flush()?; Ok(()) From 908cb625841340b0481071650f7bc561a5b5414e Mon Sep 17 00:00:00 2001 From: Shawn Date: Wed, 25 Feb 2026 09:13:52 +0800 Subject: [PATCH 13/13] fix test. --- zilliqa/src/trie_storage.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/zilliqa/src/trie_storage.rs b/zilliqa/src/trie_storage.rs index b3e74f34ec..aba6bb507f 100644 --- a/zilliqa/src/trie_storage.rs +++ b/zilliqa/src/trie_storage.rs @@ -360,6 +360,7 @@ impl eth_trie::DB for TrieStorage { .map_err(|e| eth_trie::TrieError::DB(e.to_string())) } + #[inline] fn flush(&self) -> Result<(), Self::Error> { self.kvdb .flush() @@ -415,7 +416,7 @@ mod tests { .build(SqliteConnectionManager::memory()) .unwrap(), ); - let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap()).unwrap()); + let rdb = Arc::new(rocksdb::DB::open_default(tempdir().unwrap().keep()).unwrap()); let tag = Arc::new(AtomicU64::new(u64::MAX)); let lock = Arc::new(Mutex::new(u64::MIN)); let cache = Arc::new(RwLock::new(LruCache::unbounded())); @@ -457,10 +458,8 @@ mod tests { // snapshot-to-promote nodes; and count nodes trie_storage.inc_tag(u64::MAX).unwrap(); TrieStorage::snapshot(trie_storage.clone(), root_hash).unwrap(); - assert_eq!( - rdb.iterator(rocksdb::IteratorMode::Start).count(), - old_count * 2 - ); + let new_count = rdb.iterator(rocksdb::IteratorMode::Start).count(); + assert_eq!(new_count, old_count * 2); } #[test]