Skip to content

Commit cb30828

Browse files
committed
feat: migrate storage to populate batch queue from submissions
Recently, the indexing pallet imposed a bound on batches finding quorum with the help of a new BatchQueue in storage. This adds a storage migration to populate the initial contents of this BatchQueue, by analyzing the submissions storage. This implementation is simple but dumb. There's no way for us to know what order the batches were initially submitted in from current storage, so this just inserts them to the batch queue in their iteration order (lexicographic ordering of the hash of the batch ids). Furthermore, this does not do any pruning, so ideally this the batch queue capacity should exceed the number of batches finding quorum in all live networks so that they are pruned more gradually.
1 parent 942620a commit cb30828

File tree

8 files changed

+295
-1
lines changed

8 files changed

+295
-1
lines changed

pallets/indexing/src/benchmarking.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,46 @@ mod benchmarks {
226226
);
227227
}
228228

229+
#[benchmark]
230+
fn migration_v1_v2_step() {
231+
let hash = <<T as frame_system::Config>::Hashing as Hasher>::hash(&[]);
232+
let batch_id = BatchId::default();
233+
(0..MAX_SUBMITTERS * 2)
234+
.map(|submitter_num| {
235+
let scope = if submitter_num % 2 == 0 {
236+
QuorumScope::Public
237+
} else {
238+
QuorumScope::Privileged
239+
};
240+
241+
(
242+
account::<T::AccountId>("submitter", submitter_num, 0),
243+
scope,
244+
)
245+
})
246+
.for_each(|(account_id, scope)| {
247+
crate::SubmissionsV1::<T, I>::insert((&batch_id, scope, account_id), hash);
248+
});
249+
250+
let mut meter = WeightMeter::new();
251+
252+
#[block]
253+
{
254+
crate::migrations::v2::LazyMigrationV2::<T, weights::SubstrateWeight<T>, I>::step(
255+
None, &mut meter,
256+
)
257+
.unwrap();
258+
}
259+
260+
// Check that the new storage is decodable:
261+
assert_eq!(crate::BatchQueue::<T, I>::get(0).unwrap(), batch_id);
262+
// uses twice the weight once for migration and then for checking if there is another key.
263+
assert_eq!(
264+
meter.consumed(),
265+
weights::SubstrateWeight::<T>::migration_v1_v2_step() * 2
266+
);
267+
}
268+
229269
impl_benchmark_test_suite!(
230270
PalletWithApi,
231271
crate::mock::new_test_ext(),

pallets/indexing/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ pub mod pallet {
586586
.next()
587587
.is_some()
588588
{
589-
return vec![];
589+
return Vec::new();
590590
}
591591

592592
let batches_to_prune = push_batch_to_queue_and_prune::<T, I>(batch_id);

pallets/indexing/src/migrations/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
33
pub mod v1;
44

5+
pub mod v2;
6+
57
/// A unique identifier across all pallets.
68
///
79
/// This constant represents a unique identifier for the migrations of this pallet.
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//! Migration from pallet-indexing storage v1 to v2
2+
//!
3+
//! This migration handles changes to the `Submitters`, which went from being a mapping of data
4+
//! hashes to submitters to the reverse.
5+
6+
use frame_support::migrations::{MigrationId, SteppedMigration, SteppedMigrationError};
7+
use frame_support::pallet_prelude::PhantomData;
8+
use frame_support::weights::WeightMeter;
9+
use sxt_core::indexing::BatchId;
10+
use sxt_core::tables::QuorumScope;
11+
12+
use super::PALLET_MIGRATIONS_ID;
13+
use crate::pallet::{Config, SubmissionsV1};
14+
15+
mod tests;
16+
17+
/// Populates the `BatchQueue` from existing `Submissions`.
18+
///
19+
/// As we cannot know the order that the batches were submitted from current storage information,
20+
/// the batches will be represented in the queue in their `SubmissionsV1` iteration order
21+
/// (lexicographic).
22+
///
23+
/// This will not do any pruning, even if current submissions exceed the `BATCH_QUEUE_CAPACITY`.
24+
/// So, it is recommended to start using the batch queue with a capacity higher than any live
25+
/// network's current submission count, to avoid a bunch of batches getting pruned on the very next
26+
/// submission.
27+
pub struct LazyMigrationV2<T: Config<I>, W: crate::weights::WeightInfo, I: 'static = ()>(
28+
PhantomData<(T, W, I)>,
29+
);
30+
31+
impl<T: Config<I>, W: crate::weights::WeightInfo, I: 'static> SteppedMigration
32+
for LazyMigrationV2<T, W, I>
33+
{
34+
type Cursor = (BatchId, QuorumScope, T::AccountId);
35+
type Identifier = MigrationId<26>;
36+
37+
fn id() -> Self::Identifier {
38+
MigrationId {
39+
pallet_id: *PALLET_MIGRATIONS_ID,
40+
version_from: 1,
41+
version_to: 2,
42+
}
43+
}
44+
45+
fn step(
46+
mut cursor: Option<Self::Cursor>,
47+
meter: &mut WeightMeter,
48+
) -> Result<Option<Self::Cursor>, SteppedMigrationError> {
49+
let required = W::migration_v1_v2_step();
50+
51+
if meter.remaining().any_lt(required) {
52+
return Err(SteppedMigrationError::InsufficientWeight { required });
53+
}
54+
55+
// We loop here to do as much progress as possible per step.
56+
loop {
57+
if meter.try_consume(required).is_err() {
58+
break;
59+
}
60+
61+
let mut iter = if let Some(key) = &cursor {
62+
// If a cursor is provided, start iterating from the stored value
63+
SubmissionsV1::<T, I>::iter_keys_from(SubmissionsV1::<T, I>::hashed_key_for(key))
64+
} else {
65+
// If no cursor is provided, start iterating from the beginning.
66+
SubmissionsV1::<T, I>::iter_keys()
67+
};
68+
69+
// If there's a next item in the iterator, perform the migration.
70+
if let Some((batch_id, scope, account)) = iter.next() {
71+
let batch_index = crate::BatchQueue::<T, I>::count();
72+
crate::BatchQueue::<T, I>::insert(batch_index as u64, batch_id.clone());
73+
74+
// update cursor always
75+
cursor = Some((batch_id.clone(), scope, account));
76+
77+
// update cursor to the last key w/ this batch id, skipping them in the migration
78+
for key in iter {
79+
if key.0 == batch_id {
80+
cursor = Some(key);
81+
} else {
82+
break;
83+
}
84+
}
85+
} else {
86+
cursor = None; // Signal that the migration is complete (no more items to process).
87+
break;
88+
}
89+
}
90+
Ok(cursor)
91+
}
92+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#![cfg(all(test, not(feature = "runtime-benchmarks")))]
2+
3+
use codec::Decode;
4+
use frame_support::traits::OnRuntimeUpgrade;
5+
use frame_system::ensure_signed;
6+
use native_api::Api;
7+
use pallet_migrations::WeightInfo as _;
8+
use sp_core::Hasher;
9+
use sxt_core::indexing::{BatchId, SubmittersByScope};
10+
use sxt_core::tables::QuorumScope;
11+
12+
use crate::migrations::v1;
13+
use crate::mock::{
14+
new_test_ext,
15+
run_to_block,
16+
AllPalletsWithSystem,
17+
MigratorServiceWeight,
18+
RuntimeOrigin,
19+
System,
20+
Test,
21+
};
22+
use crate::weights::WeightInfo as _;
23+
24+
fn submissions_from_seed(
25+
seed: u64,
26+
) -> (
27+
BatchId,
28+
<Test as frame_system::Config>::Hash,
29+
SubmittersByScope<<Test as frame_system::Config>::AccountId>,
30+
) {
31+
let batch_id = BatchId::try_from(seed.to_le_bytes().to_vec()).unwrap();
32+
let data_hash = <<Test as frame_system::Config>::Hashing as Hasher>::hash(&seed.to_le_bytes());
33+
34+
let num_privileged_submissions = seed % 32;
35+
36+
// this math is chosen to
37+
// - have some variety with num_privileged_submissions
38+
// - always have at least 1 public submission (simplifies test logic)
39+
let num_public_submissions = ((seed * 2) % 31) + 1;
40+
41+
let privileged_submitters_by_scope = (0..num_privileged_submissions)
42+
.map(|submitter_seed_index| {
43+
let submitter_index = seed * 32 + submitter_seed_index;
44+
ensure_signed(RuntimeOrigin::signed(submitter_index)).unwrap()
45+
})
46+
.fold(SubmittersByScope::default(), |acc, submitter| {
47+
acc.with_submitter(submitter, &QuorumScope::Privileged)
48+
.unwrap()
49+
});
50+
let submitters_by_scope = (0..num_public_submissions)
51+
.map(|submitter_seed_index| {
52+
let submitter_index = seed * 32 + submitter_seed_index;
53+
ensure_signed(RuntimeOrigin::signed(submitter_index)).unwrap()
54+
})
55+
.fold(privileged_submitters_by_scope, |acc, submitter| {
56+
acc.with_submitter(submitter, &QuorumScope::Public).unwrap()
57+
});
58+
59+
(batch_id, data_hash, submitters_by_scope)
60+
}
61+
62+
#[test]
63+
fn lazy_migration_works() {
64+
new_test_ext().execute_with(|| {
65+
(0..64u64)
66+
.map(submissions_from_seed)
67+
.for_each(|(batch_id, data_hash, submitters)| {
68+
submitters
69+
.iter_scope(&QuorumScope::Public)
70+
.for_each(|submitter| {
71+
crate::SubmissionsV1::<Test, Api>::insert(
72+
(&batch_id, QuorumScope::Public, &submitter),
73+
&data_hash,
74+
);
75+
});
76+
submitters
77+
.iter_scope(&QuorumScope::Privileged)
78+
.for_each(|submitter| {
79+
crate::SubmissionsV1::<Test, Api>::insert(
80+
(&batch_id, QuorumScope::Privileged, &submitter),
81+
&data_hash,
82+
);
83+
});
84+
});
85+
86+
// Give it enough weight do do exactly 8 iterations:
87+
let limit = <Test as pallet_migrations::Config>::WeightInfo::progress_mbms_none()
88+
+ pallet_migrations::Pallet::<Test>::exec_migration_max_weight()
89+
+ crate::weights::SubstrateWeight::<Test>::migration_v1_v2_step() * 8;
90+
MigratorServiceWeight::set(&limit);
91+
92+
System::set_block_number(1);
93+
AllPalletsWithSystem::on_runtime_upgrade(); // onboard MBMs
94+
95+
// check migration progress across many blocks
96+
let mut last_num_migrated = 0;
97+
for block in 2..=10 {
98+
run_to_block(block);
99+
100+
let num_migrated = (0..64)
101+
.map(submissions_from_seed)
102+
.map(|(seeded_batch_id, _, _)| {
103+
if crate::BatchQueue::<Test, Api>::iter()
104+
.any(|(_, batch_id)| seeded_batch_id == batch_id)
105+
{
106+
1usize
107+
} else {
108+
0usize
109+
}
110+
})
111+
.sum();
112+
113+
// part of the first block of migration is completing lazy migration v1 (noop), but
114+
// this costs some weight so we cannot migrate 8 batches in the first block.
115+
let expected_migrated_number = if block == 2 || block == 10 { 4 } else { 8 };
116+
117+
assert_eq!(num_migrated, last_num_migrated + expected_migrated_number);
118+
last_num_migrated = num_migrated;
119+
}
120+
121+
// Check that everything is migrated now
122+
(0..64u64)
123+
.map(submissions_from_seed)
124+
.for_each(|(seeded_batch_id, _, _)| {
125+
assert!(crate::BatchQueue::<Test, Api>::iter()
126+
.any(|(_, batch_id)| seeded_batch_id == batch_id))
127+
});
128+
129+
assert_eq!(crate::BatchQueue::<Test, Api>::count(), 64);
130+
});
131+
}

pallets/indexing/src/mock.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl pallet_migrations::Config for Test {
5454
#[cfg(not(feature = "runtime-benchmarks"))]
5555
type Migrations = (
5656
crate::migrations::v1::LazyMigrationV1<Test, crate::weights::SubstrateWeight<Test>, Api>,
57+
crate::migrations::v2::LazyMigrationV2<Test, crate::weights::SubstrateWeight<Test>, Api>,
5758
);
5859
#[cfg(feature = "runtime-benchmarks")]
5960
type Migrations = pallet_migrations::mock_helpers::MockedMigrations;

pallets/indexing/src/weights.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ pub trait WeightInfo {
5959
/// Storage: `Indexing::SubmissionsV1` (r:0 w:64)
6060
/// Proof: `Indexing::SubmissionsV1` (`max_values`: None, `max_size`: Some(150), added: 2625, mode: `MaxEncodedLen`)
6161
fn migration_v0_v1_step() -> Weight;
62+
/// Storage: `Indexing::SubmissionsV1` (r:65 w:0)
63+
/// Proof: `Indexing::SubmissionsV1` (`max_values`: None, `max_size`: Some(150), added: 2625, mode: `MaxEncodedLen`)
64+
/// Storage: `Indexing::CounterForBatchQueue` (r:1 w:1)
65+
/// Proof: `Indexing::CounterForBatchQueue` (`max_values`: Some(1), `max_size`: Some(4), added: 499, mode: `MaxEncodedLen`)
66+
/// Storage: `Indexing::BatchQueue` (r:1 w:1)
67+
/// Proof: `Indexing::BatchQueue` (`max_values`: None, `max_size`: Some(45), added: 2520, mode: `MaxEncodedLen`)
68+
fn migration_v1_v2_step() -> Weight;
6269
}
6370

6471
pub struct SubstrateWeight<T>(PhantomData<T>);
@@ -133,4 +140,20 @@ impl<T: frame_system::Config> WeightInfo for SubstrateWeight<T> {
133140
.saturating_add(T::DbWeight::get().reads(2))
134141
.saturating_add(T::DbWeight::get().writes(65))
135142
}
143+
/// Storage: `Indexing::SubmissionsV1` (r:65 w:0)
144+
/// Proof: `Indexing::SubmissionsV1` (`max_values`: None, `max_size`: Some(150), added: 2625, mode: `MaxEncodedLen`)
145+
/// Storage: `Indexing::CounterForBatchQueue` (r:1 w:1)
146+
/// Proof: `Indexing::CounterForBatchQueue` (`max_values`: Some(1), `max_size`: Some(4), added: 499, mode: `MaxEncodedLen`)
147+
/// Storage: `Indexing::BatchQueue` (r:1 w:1)
148+
/// Proof: `Indexing::BatchQueue` (`max_values`: None, `max_size`: Some(45), added: 2520, mode: `MaxEncodedLen`)
149+
fn migration_v1_v2_step() -> Weight {
150+
// Proof Size summary in bytes:
151+
// Measured: `6277`
152+
// Estimated: `171615`
153+
// Minimum execution time: 133_010_000 picoseconds.
154+
Weight::from_parts(135_916_000, 0)
155+
.saturating_add(Weight::from_parts(0, 171615))
156+
.saturating_add(T::DbWeight::get().reads(67))
157+
.saturating_add(T::DbWeight::get().writes(2))
158+
}
136159
}

runtime/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,11 @@ impl pallet_migrations::Config for Runtime {
341341
pallet_indexing::SubstrateWeight<Runtime>,
342342
native_api::Api,
343343
>,
344+
pallet_indexing::migrations::v2::LazyMigrationV2<
345+
Runtime,
346+
pallet_indexing::SubstrateWeight<Runtime>,
347+
native_api::Api,
348+
>,
344349
);
345350
// Benchmarks need mocked migrations to guarantee that they succeed.
346351
#[cfg(feature = "runtime-benchmarks")]

0 commit comments

Comments
 (0)