Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 79 additions & 15 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,18 +1037,30 @@ pub(super) fn make_shreds_from_data(
) -> Result<Vec<Shred>, Error> {
let now = Instant::now();
let chained = chained_merkle_root.is_some();
let resigned = chained && is_last_in_slot;
// let resigned = chained && is_last_in_slot;
// only sign if last batch in slot and is chained
let sign_last_batch = chained && is_last_in_slot;
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid using batches because we are calling make_shreds_from_data() on batches. so i would call this sign_last_fec_set

let proof_size = PROOF_ENTRIES_FOR_32_32_BATCH;
let data_buffer_per_shred_size = ShredData::capacity(proof_size, chained, resigned)?;

// unsigned data_buffer size
let data_buffer_per_shred_size = ShredData::capacity(proof_size, chained, false)?;
let data_buffer_total_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size;

// signed data_buffer size
let data_buffer_per_shred_size_signed = if sign_last_batch {
ShredData::capacity(proof_size, chained, true)?
} else {
0
};
let data_buffer_total_size_signed = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size_signed;

// Common header for the data shreds.
let mut common_header_data = ShredCommonHeader {
signature: Signature::default(),
shred_variant: ShredVariant::MerkleData {
proof_size,
chained,
resigned,
resigned: false,
},
slot,
index: next_shred_index,
Expand All @@ -1061,7 +1073,7 @@ pub(super) fn make_shreds_from_data(
shred_variant: ShredVariant::MerkleCode {
proof_size,
chained,
resigned,
resigned: false,
},
index: next_code_index,
..common_header_data
Expand All @@ -1082,13 +1094,33 @@ pub(super) fn make_shreds_from_data(
};

// Pre-allocate shreds to avoid reallocations.
let mut sig_data: &[u8] = &[];
let mut shreds = {
let number_of_batches = data.len().div_ceil(data_buffer_total_size);
let number_of_batches = if sign_last_batch {
if data.len() > data_buffer_total_size_signed {
let remainder_to_be_signed = data.len() % data_buffer_total_size;
if remainder_to_be_signed <= data_buffer_total_size_signed {
(data, sig_data) = data.split_at(data.len() - remainder_to_be_signed);
}
Comment on lines +1101 to +1104
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when remainder_to_be_signed is 0, sig_data becomes 0. so we don't actually sign anything

Comment on lines +1101 to +1104
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think there is a bug here. when remainder_to_be_signed is 0, sig_data is empty. so nothing ends up getting signed.

else {
(data, sig_data) = data.split_at(data.len());
assert_eq!(sig_data.len(), 0);
}
data.len().div_ceil(data_buffer_total_size) + 1
} else {
sig_data = data;
data = &[];
1
}
} else {
data.len().div_ceil(data_buffer_total_size)
};
let total_num_shreds = SHREDS_PER_FEC_BLOCK * number_of_batches;
Vec::<Shred>::with_capacity(total_num_shreds)
};
stats.data_bytes += data.len();
Comment on lines +1097 to 1121
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, this is pretty hard to follow. what if we just did something like:

let (unsigned_data, signed_data) = if sign_last_fec_set {
    // Reserve at least one signed batch (may be empty) at the end.
    if data.len() > data_buffer_total_size_signed {
        let split_at = data.len() - data_buffer_total_size_signed; // sign everything except the last batch
        data.split_at(split_at)
    } else {
        (&[][..], data) // only enough data for one fec set, sign the whole thing
    }
} else {
    (data, &[][..]) // not last batch, so don't sign
};
stats.data_bytes += unsigned_data.len() + signed_data.len();

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then do:

let unsigned_sets = unsigned_data.len().div_ceil(data_buffer_total_size);
let number_of_batches = if sign_last_fec_set {
    unsigned_sets + 1
} else {
    unsigned_sets
};
let mut shreds = Vec::<Shred>::with_capacity(SHREDS_PER_FEC_BLOCK * number_of_batches);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This very elegant. Love it!



// Split the data into full erasure batches and initialize data and coding
// shreds for each batch.
while data.len() >= data_buffer_total_size {
Expand Down Expand Up @@ -1118,17 +1150,17 @@ pub(super) fn make_shreds_from_data(
// 2.) Shreds is_empty, which only happens when we entered w/ zero data.
//
// In either case, we want to generate empty data shreds.
if !data.is_empty() || shreds.is_empty() {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor this all the way down to line 1201 into a function and pass in sign_last_batch and chunk_size or something where chunk_size is data_buffer_per_shred_size or data_buffer_per_shred_size_signed or something like this

if !data.is_empty() || (shreds.is_empty() && !sign_last_batch) {
stats.padding_bytes += data_buffer_total_size - data.len();
common_header_data.shred_variant = ShredVariant::MerkleData {
proof_size,
chained,
resigned,
resigned: false,
};
common_header_code.shred_variant = ShredVariant::MerkleCode {
proof_size,
chained,
resigned,
resigned: false,
};
common_header_data.fec_set_index = common_header_data.index;
common_header_code.fec_set_index = common_header_data.fec_set_index;
Expand All @@ -1142,6 +1174,31 @@ pub(super) fn make_shreds_from_data(
});
shreds.extend(make_shreds_code_header_only(&mut common_header_code).map(Shred::ShredCode));
}
if !sig_data.is_empty() || (shreds.is_empty() && sign_last_batch) {
// send signed last batch
stats.padding_bytes += data_buffer_total_size_signed - sig_data.len();
common_header_data.shred_variant = ShredVariant::MerkleData {
proof_size,
chained,
resigned: true,
};
common_header_code.shred_variant = ShredVariant::MerkleCode {
proof_size,
chained,
resigned: true,
};
common_header_data.fec_set_index = common_header_data.index;
common_header_code.fec_set_index = common_header_data.fec_set_index;
shreds.extend({
// Create data chunks out of remaining data + padding.
let chunks = sig_data
.chunks(data_buffer_per_shred_size_signed)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't want to resign data, this is going to be chunking in the wrong size

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on this, we will only get here if there is something to sign (sig_data will only have at most one fec_set worth of data). So it will only pull chunk. I could probably assert that.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yess you are right. messed that up. no need to assert

.chain(std::iter::repeat(&[][..])) // possible padding
.take(DATA_SHREDS_PER_FEC_BLOCK);
make_shreds_data(&mut common_header_data, data_header, chunks).map(Shred::ShredData)
});
shreds.extend(make_shreds_code_header_only(&mut common_header_code).map(Shred::ShredCode));
}

// Adjust flags for the very last data shred.
if let Some(Shred::ShredData(shred)) = shreds
Expand Down Expand Up @@ -1691,7 +1748,11 @@ mod test {
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let keypair = Keypair::new();
let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen()));
let resigned = chained && is_last_in_slot;

// only sign last batch if it is chained and is the last in slot
// let resigned = chained && is_last_in_slot;
let sign_last_batch = chained && is_last_in_slot;

let slot = 149_745_689;
let parent_slot = slot - rng.gen_range(1..65536);
let shred_version = rng.gen();
Expand Down Expand Up @@ -1724,13 +1785,14 @@ mod test {
})
.collect();
// Assert that the input data can be recovered from data shreds.
let data2 = data_shreds
.iter()
.flat_map(|shred| shred.data().unwrap())
.copied()
.collect::<Vec<_>>();
assert_eq!(
data,
data_shreds
.iter()
.flat_map(|shred| shred.data().unwrap())
.copied()
.collect::<Vec<_>>()
data2
);
// Assert that shreds sanitize and verify.
let pubkey = keypair.pubkey();
Expand Down Expand Up @@ -1780,8 +1842,10 @@ mod test {
// Verify common, data and coding headers.
let mut num_data_shreds = 0;
let mut num_coding_shreds = 0;
for shred in &shreds {
for (index, shred) in shreds.iter().enumerate() {
let common_header = shred.common_header();
let resigned = sign_last_batch && index >= shreds.len() - 64;

assert_eq!(common_header.slot, slot);
assert_eq!(common_header.version, shred_version);
let proof_size = shred.proof_size().unwrap();
Expand Down
14 changes: 9 additions & 5 deletions ledger/src/shred/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,11 @@ mod tests {
let mut shreds =
make_merkle_shreds_for_tests(&mut rng, slot, data_size, chained, is_last_in_slot)
.unwrap();
for shred in &mut shreds {
let shreds_len = shreds.len();
for (index, shred) in shreds.iter_mut().enumerate() {
let signature = make_dummy_signature(&mut rng);
if chained && is_last_in_slot {
let is_last_batch = index >= shreds_len - 64;
if chained && is_last_in_slot && is_last_batch {
shred.set_retransmitter_signature(&signature).unwrap();
} else {
assert_matches!(
Expand All @@ -509,7 +511,9 @@ mod tests {
);
}
}
for shred in &shreds {

for (index, shred) in shreds.iter().enumerate() {
let is_last_batch = index >= shreds_len - 64;
let mut packet = Packet::default();
if repaired {
packet.meta_mut().flags |= PacketFlags::REPAIR;
Expand Down Expand Up @@ -578,9 +582,9 @@ mod tests {
}
assert_eq!(
is_retransmitter_signed_variant(bytes).unwrap(),
chained && is_last_in_slot
chained && is_last_in_slot && is_last_batch,
);
if chained && is_last_in_slot {
if chained && is_last_in_slot && is_last_batch{
assert_eq!(
get_retransmitter_signature_offset(bytes).unwrap(),
shred.retransmitter_signature_offset().unwrap(),
Expand Down
Loading