Skip to content
83 changes: 83 additions & 0 deletions docs/state_pruning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# State Pruning Guide

As of `v0.21.0` the state pruning feature is available.
This feature allows users to prune the state database to reduce the size of the database and improve performance.

There are two parts of the state pruning feature:
1. Pruning the blocks; and
2. Pruning the state.

## How to Use It

If you wish to run a pruned node, the easiest way to do it is to:
1. Restore the node from the most recent checkpoint.
This mitigates the need to migrate any legacy keys.
Since you're going to prune anyway, the older blocks/state are unnecessary overhead.
2. Let the node active-sync and catch up with the head of the chain.
This is just to ensure that the node is up-to-date.
3. Set the `sync.prune_interval` configuration option to a positive integer.
If the number is too small, you may end up storing too many snapshots between compaction runs.
A good number is about a day's worth of blocks/state.
4. Restart the node.
You can monitor the logs for the "Snapshot:" messages.

### Configuration

To prune the blocks/state, users can use the `sync.prune_interval` configuration option.
If this option is set to a positive integer, the node will retain only the most recent `sync.prune_interval` blocks.
Any block older than that will be deleted from the SQL database.
Also, the node will prune the state database to reduce the size of the storage.

As part of this process, the node will *drop* the SQL `state_trie` table.
This only impacts nodes that have existing state prior to `v0.18.0`.
But, the node does not `VACUUM` to reclaim the space from the SQL database, as this process can take a long time.
If the SQL disk space needs to be reclaimed, node operators should schedule and manually run the `VACUUM` command on the SQL database.

## How it works

The state pruning feature works by exploiting the RocksDB compaction feature.
Storage will get recovered as part of its normal background compaction operation that happens periodically and incrementally.
In fact, when it is first turned on, you may see an initial increase in storage usage.
But over time, the storage usage will decrease after several compaction runs.

We have chosen to implement a feature that is adapted from the [User-defined Time-stamp](https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp) feature of RocksDB.

- When writing data to the trie-storage in RocksDB, each key is tagged with a *timestamp* suffix that is used to determine the order of the keys.
```
|user-key + tag|seqno|type|
|<-----internal key------>|
```

- When reading trie-storage data from RocksDB, the timestamp is taken into account to ensure that only the most *recent* value is returned, for a specific key.
> Instead of using the block height as the timestamp, we use the block view instead.
This is because there can only be one block per view while there could potentially be more than one block per height.
This ensures that the timestamp is always monotonically increasing per block.

- When the built-in compaction feature is triggered, any *stale* keys are removed and the disk space is recovered.
> By default RocksDB will ensure that compaction is triggered at least once every 30-days; but you can configure this behaviour by setting the `db.rocksdb_compaction_period` option.

### Operation

1. At each epoch, the node increments its internal timestamp *ceiling*, which will result in any new state being tagged with a higher timestamp.
2. The node will trigger a background operation to *snapshot* all active state that should be retained, by duplicating the entire state-trie with the higher timestamp.
This operation may take some time to complete, possibly several epochs, and the node will only allow one such operation at a time.
3. After the operation is complete, the node increments its internal timestamp *floor*; and any state with a timestamp below the *floor* will eventually be compacted away.

### Conditions

The conditions for the *ceiling* and the *floor* are that:
- The *ceiling* is always incremented i.e. new ceiling > old ceiling.
- The *floor* is always incremented i.e. new floor > old floor.
- The *ceiling* is always greater than the *floor* i.e. ceiling > floor.
- The *floor* always lags the *ceiling* i.e. floor == old ceiling.

The snapshot operation will only be triggered if the lowest block view is greater than the current *ceiling*.
This condition ensures the safety that, the only states pruned are those that are absolutely no longer needed, since the block no longer exists in the SQL database.
This also means that the node may retain more state than it actually needs.
Considering the amount of state saved through pruning, this bit of extra state is negligible.

### Key Migration

For nodes that disable pruning, the existing keys need to be migrated to the new tagged-keys.
This process is done lazily, as nodes are being read from time to time.
Whenever a legacy-key node is read from the database, the key is migrated to the new tagged-key format and deleted by compaction.
20 changes: 13 additions & 7 deletions zilliqa/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ pub struct DbConfig {
/// Whether to enable state-sync/state-migration
#[serde(default)]
pub state_sync: bool,
/// config.toml setting is ignored.
#[serde(default)]
pub state_prune: bool,
/// RocksDB block cache size, in bytes.
#[serde(default = "rocksdb_cache_size_default")]
pub rocksdb_cache_size: usize,
Expand Down Expand Up @@ -239,6 +242,7 @@ impl Default for DbConfig {
conn_cache_size: sql_cache_size_default(),
auto_checkpoint: sql_auto_checkpoint_default(),
state_sync: false,
state_prune: false,
rocksdb_cache_size: rocksdb_cache_size_default(),
rocksdb_compaction_period: rocksdb_compaction_period_default(),
rocksdb_max_open_files: rocksdb_max_open_files_default(),
Expand Down Expand Up @@ -376,31 +380,33 @@ impl NodeConfig {
}
}

// deprecated settings
anyhow::ensure!(
self.state_cache_size == state_cache_size_default(),
"state_cache_size is deprecated. Use db.rocksdb_cache_size and db.rocksdb_state_cache_size instead."
);

anyhow::ensure!(!self.db.state_prune, "db.state_prune must not be set");
// sync/prune settings
anyhow::ensure!(
self.sync.base_height == u64_max() || self.sync.prune_interval == u64_max(),
"base_height and prune_interval cannot be set at the same time"
"sync.base_height and sync.prune_interval cannot be set at the same time"
);
// when set, >> 15 to avoid pruning forks; > 256 to be EVM-safe; arbitrarily picked.
anyhow::ensure!(
self.sync.prune_interval >= MIN_PRUNE_INTERVAL,
"prune_interval must be at least {MIN_PRUNE_INTERVAL}",
"sync.prune_interval must be at least {MIN_PRUNE_INTERVAL}",
);
anyhow::ensure!(
self.sync.prune_interval == u64::MAX || !self.do_checkpoints,
"state_prune and do_checkpoints cannot be set at the same time"
);
// 10 is a reasonable minimum for a node to be useful.
anyhow::ensure!(
self.sync.block_request_batch_size >= 10,
"block_request_batch_size must be at least 10"
);
// 1000 would saturate a typical node.
anyhow::ensure!(
self.sync.max_blocks_in_flight <= 1000,
"max_blocks_in_flight must be at most 1000"
);
// the minimum required for the next leader selection
anyhow::ensure!(
self.max_missed_view_age >= MISSED_VIEW_WINDOW,
"max_missed_view_age must be at least {MISSED_VIEW_WINDOW}"
Expand Down
94 changes: 57 additions & 37 deletions zilliqa/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl Consensus {
secret_key,
config,
sync,
message_sender,
message_sender: message_sender.clone(),
reset_timeout,
votes: DashMap::new(),
buffered_votes: DashMap::new(),
Expand Down Expand Up @@ -579,7 +579,8 @@ impl Consensus {
}

// Initialize state trie storage
consensus.db.state_trie()?.init_state_trie(forks)?;
let state_trie = consensus.db.state_trie()?;
state_trie.init_state_trie(forks)?;

// If timestamp of when current high_qc was written exists then use it to estimate the minimum number of blocks the network has moved on since shut down
// This is useful in scenarios in which consensus has failed since this node went down
Expand Down Expand Up @@ -2415,46 +2416,65 @@ impl Consensus {
}
}

if self.block_is_first_in_epoch(block.number())
&& !block.is_genesis()
&& self.config.do_checkpoints
&& self.epoch_is_checkpoint(self.epoch_number(block.number()))
&& let Some(checkpoint_path) = self.db.get_checkpoint_dir()?
{
let parent = self
.db
.get_block(block.parent_hash().into())?
.ok_or(anyhow!(
"Trying to checkpoint block, but we don't have its parent"
))?;
let transactions: Vec<SignedTransaction> = block
.transactions
.iter()
.map(|txn_hash| {
let tx = self.db.get_transaction(txn_hash)?.ok_or(anyhow!(
"failed to fetch transaction {} for checkpoint parent {}",
txn_hash,
parent.hash()
))?;
Ok::<_, anyhow::Error>(tx.tx)
})
.collect::<Result<Vec<SignedTransaction>>>()?;

self.message_sender.send_message_to_coordinator(
InternalMessage::ExportBlockCheckpoint(
Box::new(block),
transactions,
Box::new(parent),
self.db.state_trie()?.clone(),
self.state.view_history.read().clone(),
checkpoint_path,
),
)?;
if self.block_is_first_in_epoch(block.number()) && !block.is_genesis() {
// Do snapshots
// at epoch/block boundaries to avoid state inconsistencies.
if self.config.sync.prune_interval != u64::MAX {
let multiple =
self.config.sync.prune_interval / self.config.consensus.blocks_per_epoch;
// gap > prune_interval to reduce size amplification.
if self
.epoch_number(block.number())
.is_multiple_of(multiple.saturating_add(1))
{
let range = self.db.available_range()?;
self.snapshot_at(*range.start(), block.view())?;
}
};
// Do checkpoints
if self.config.do_checkpoints
&& self.db.get_checkpoint_dir()?.is_some()
&& self.epoch_is_checkpoint(self.epoch_number(block.number()))
{
self.checkpoint_at(block.number())?;
}
}

Ok(())
}

/// Trigger a snapshot
pub fn snapshot_at(&self, block_number: u64, new_ceil: u64) -> Result<()> {
// skip if there is a snapshot in progress.
let Some(mut tag_lock) = self.db.tag_view.try_lock() else {
return Ok(());
};
// error if the lowest block does not exist
let Some(block) = self.get_canonical_block_by_number(block_number)? else {
return Err(anyhow::format_err!("Snapshot: missing block"));
};
// skip if the lowest block unsafe to snapshot
// 'unsafe' means that the block exists in a tag range that could be pruned away during compaction.
if block.view() < *tag_lock {
return Ok(());
}

let trie_storage = self.db.state_trie()?;
// raise the ceiling to the new tag, promoting all new state
let old_ceil = trie_storage.set_tag_ceil(new_ceil)?;
// store the previous tag, which is the next floor.
*tag_lock = old_ceil;
tracing::info!(block_number, new_ceil, old_ceil, "Snapshot: trigger");

// trigger snapshot
self.message_sender
.send_message_to_coordinator(InternalMessage::SnapshotTrie(
trie_storage,
block.state_root_hash().into(),
block_number,
))
}

/// Trigger a checkpoint, for debugging.
/// Returns (file_name, block_hash). At some time after you call this function, hopefully a checkpoint will end up in the file
pub fn checkpoint_at(&self, block_number: u64) -> Result<(String, String)> {
Expand Down
Loading
Loading