diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index 09b4274cd92815..819f07132ff065 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -1037,18 +1037,30 @@ pub(super) fn make_shreds_from_data( ) -> Result, Error> { let now = Instant::now(); let chained = chained_merkle_root.is_some(); - let resigned = chained && is_last_in_slot; + // let resigned = chained && is_last_in_slot; + // only sign if last batch in slot and is chained + let sign_last_batch = chained && is_last_in_slot; let proof_size = PROOF_ENTRIES_FOR_32_32_BATCH; - let data_buffer_per_shred_size = ShredData::capacity(proof_size, chained, resigned)?; + + // unsigned data_buffer size + let data_buffer_per_shred_size = ShredData::capacity(proof_size, chained, false)?; let data_buffer_total_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size; + // signed data_buffer size + let data_buffer_per_shred_size_signed = if sign_last_batch { + ShredData::capacity(proof_size, chained, true)? + } else { + 0 + }; + let data_buffer_total_size_signed = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size_signed; + // Common header for the data shreds. let mut common_header_data = ShredCommonHeader { signature: Signature::default(), shred_variant: ShredVariant::MerkleData { proof_size, chained, - resigned, + resigned: false, }, slot, index: next_shred_index, @@ -1061,7 +1073,7 @@ pub(super) fn make_shreds_from_data( shred_variant: ShredVariant::MerkleCode { proof_size, chained, - resigned, + resigned: false, }, index: next_code_index, ..common_header_data @@ -1082,13 +1094,33 @@ pub(super) fn make_shreds_from_data( }; // Pre-allocate shreds to avoid reallocations. + let mut sig_data: &[u8] = &[]; let mut shreds = { - let number_of_batches = data.len().div_ceil(data_buffer_total_size); + let number_of_batches = if sign_last_batch { + if data.len() > data_buffer_total_size_signed { + let remainder_to_be_signed = data.len() % data_buffer_total_size; + if remainder_to_be_signed <= data_buffer_total_size_signed { + (data, sig_data) = data.split_at(data.len() - remainder_to_be_signed); + } + else { + (data, sig_data) = data.split_at(data.len()); + assert_eq!(sig_data.len(), 0); + } + data.len().div_ceil(data_buffer_total_size) + 1 + } else { + sig_data = data; + data = &[]; + 1 + } + } else { + data.len().div_ceil(data_buffer_total_size) + }; let total_num_shreds = SHREDS_PER_FEC_BLOCK * number_of_batches; Vec::::with_capacity(total_num_shreds) }; stats.data_bytes += data.len(); + // Split the data into full erasure batches and initialize data and coding // shreds for each batch. while data.len() >= data_buffer_total_size { @@ -1118,17 +1150,17 @@ pub(super) fn make_shreds_from_data( // 2.) Shreds is_empty, which only happens when we entered w/ zero data. // // In either case, we want to generate empty data shreds. - if !data.is_empty() || shreds.is_empty() { + if !data.is_empty() || (shreds.is_empty() && !sign_last_batch) { stats.padding_bytes += data_buffer_total_size - data.len(); common_header_data.shred_variant = ShredVariant::MerkleData { proof_size, chained, - resigned, + resigned: false, }; common_header_code.shred_variant = ShredVariant::MerkleCode { proof_size, chained, - resigned, + resigned: false, }; common_header_data.fec_set_index = common_header_data.index; common_header_code.fec_set_index = common_header_data.fec_set_index; @@ -1142,6 +1174,31 @@ pub(super) fn make_shreds_from_data( }); shreds.extend(make_shreds_code_header_only(&mut common_header_code).map(Shred::ShredCode)); } + if !sig_data.is_empty() || (shreds.is_empty() && sign_last_batch) { + // send signed last batch + stats.padding_bytes += data_buffer_total_size_signed - sig_data.len(); + common_header_data.shred_variant = ShredVariant::MerkleData { + proof_size, + chained, + resigned: true, + }; + common_header_code.shred_variant = ShredVariant::MerkleCode { + proof_size, + chained, + resigned: true, + }; + common_header_data.fec_set_index = common_header_data.index; + common_header_code.fec_set_index = common_header_data.fec_set_index; + shreds.extend({ + // Create data chunks out of remaining data + padding. + let chunks = sig_data + .chunks(data_buffer_per_shred_size_signed) + .chain(std::iter::repeat(&[][..])) // possible padding + .take(DATA_SHREDS_PER_FEC_BLOCK); + make_shreds_data(&mut common_header_data, data_header, chunks).map(Shred::ShredData) + }); + shreds.extend(make_shreds_code_header_only(&mut common_header_code).map(Shred::ShredCode)); + } // Adjust flags for the very last data shred. if let Some(Shred::ShredData(shred)) = shreds @@ -1691,7 +1748,11 @@ mod test { let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); let keypair = Keypair::new(); let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen())); - let resigned = chained && is_last_in_slot; + + // only sign last batch if it is chained and is the last in slot + // let resigned = chained && is_last_in_slot; + let sign_last_batch = chained && is_last_in_slot; + let slot = 149_745_689; let parent_slot = slot - rng.gen_range(1..65536); let shred_version = rng.gen(); @@ -1724,13 +1785,14 @@ mod test { }) .collect(); // Assert that the input data can be recovered from data shreds. + let data2 = data_shreds + .iter() + .flat_map(|shred| shred.data().unwrap()) + .copied() + .collect::>(); assert_eq!( data, - data_shreds - .iter() - .flat_map(|shred| shred.data().unwrap()) - .copied() - .collect::>() + data2 ); // Assert that shreds sanitize and verify. let pubkey = keypair.pubkey(); @@ -1780,8 +1842,10 @@ mod test { // Verify common, data and coding headers. let mut num_data_shreds = 0; let mut num_coding_shreds = 0; - for shred in &shreds { + for (index, shred) in shreds.iter().enumerate() { let common_header = shred.common_header(); + let resigned = sign_last_batch && index >= shreds.len() - 64; + assert_eq!(common_header.slot, slot); assert_eq!(common_header.version, shred_version); let proof_size = shred.proof_size().unwrap(); diff --git a/ledger/src/shred/wire.rs b/ledger/src/shred/wire.rs index 47eee42124a977..6a8e6ae13a0ecb 100644 --- a/ledger/src/shred/wire.rs +++ b/ledger/src/shred/wire.rs @@ -498,9 +498,11 @@ mod tests { let mut shreds = make_merkle_shreds_for_tests(&mut rng, slot, data_size, chained, is_last_in_slot) .unwrap(); - for shred in &mut shreds { + let shreds_len = shreds.len(); + for (index, shred) in shreds.iter_mut().enumerate() { let signature = make_dummy_signature(&mut rng); - if chained && is_last_in_slot { + let is_last_batch = index >= shreds_len - 64; + if chained && is_last_in_slot && is_last_batch { shred.set_retransmitter_signature(&signature).unwrap(); } else { assert_matches!( @@ -509,7 +511,9 @@ mod tests { ); } } - for shred in &shreds { + + for (index, shred) in shreds.iter().enumerate() { + let is_last_batch = index >= shreds_len - 64; let mut packet = Packet::default(); if repaired { packet.meta_mut().flags |= PacketFlags::REPAIR; @@ -578,9 +582,9 @@ mod tests { } assert_eq!( is_retransmitter_signed_variant(bytes).unwrap(), - chained && is_last_in_slot + chained && is_last_in_slot && is_last_batch, ); - if chained && is_last_in_slot { + if chained && is_last_in_slot && is_last_batch{ assert_eq!( get_retransmitter_signature_offset(bytes).unwrap(), shred.retransmitter_signature_offset().unwrap(),