diff --git a/crates/apollo_consensus/src/manager.rs b/crates/apollo_consensus/src/manager.rs index 7f8a4cda0fb..b6e846f8c5a 100644 --- a/crates/apollo_consensus/src/manager.rs +++ b/crates/apollo_consensus/src/manager.rs @@ -21,7 +21,7 @@ use apollo_consensus_config::config::{ use apollo_infra_utils::debug_every_n_ms; use apollo_network::network_manager::BroadcastTopicClientTrait; use apollo_network_types::network_types::BroadcastedMessageMetadata; -use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType}; +use apollo_protobuf::consensus::{ConsensusBlockInfo, ProposalInit, Vote, VoteType}; use apollo_protobuf::converters::ProtobufConversionError; use apollo_time::time::{Clock, ClockExt, DefaultClock}; use futures::channel::mpsc; @@ -95,7 +95,7 @@ impl std::fmt::Debug for RunConsensusArguments { /// - `vote_receiver`: The channels to receive votes from the network. These are self contained /// messages. /// - `proposals_receiver`: The channel to receive proposals from the network. Proposals are -/// represented as streams (ProposalInit, Content.*, ProposalFin). +/// represented as streams (ConsensusBlockInfo, Content.*, ProposalFin). // Always print the validator ID since some tests collate multiple consensus logs in a single file. #[instrument( skip_all, @@ -174,14 +174,14 @@ pub enum RunHeightRes { Sync, } -type ProposalReceiverTuple = (ProposalInit, mpsc::Receiver); +type ProposalReceiverTuple = (ConsensusBlockInfo, mpsc::Receiver); /// Manages votes and proposals for future heights. #[derive(Debug)] struct ConsensusCache { // Mapping: { Height : Vec } future_votes: BTreeMap>, - // Mapping: { Height : { Round : (Init, Receiver)}} + // Mapping: { Height : { Round : (BlockInfo, Receiver)}} future_proposals_cache: BTreeMap>>, /// Configuration for determining which messages should be cached. @@ -227,7 +227,7 @@ impl ConsensusCache { fn get_current_height_proposals( &mut self, height: BlockNumber, - ) -> Vec<(ProposalInit, mpsc::Receiver)> { + ) -> Vec<(ConsensusBlockInfo, mpsc::Receiver)> { loop { let Some(entry) = self.future_proposals_cache.first_entry() else { return Vec::new(); @@ -259,14 +259,14 @@ impl ConsensusCache { /// Caches a proposal for a future height. fn cache_future_proposal( &mut self, - proposal_init: ProposalInit, + block_info: ConsensusBlockInfo, content_receiver: mpsc::Receiver, ) { self.future_proposals_cache - .entry(proposal_init.height) + .entry(block_info.height) .or_default() - .entry(proposal_init.round) - .or_insert((proposal_init, content_receiver)); + .entry(block_info.round) + .or_insert((block_info, content_receiver)); } fn report_max_cached_block_number_metric(&self, height: BlockNumber) { @@ -320,7 +320,7 @@ impl ConsensusCache { &self, current_height: &BlockNumber, current_round: Round, - proposal: &ProposalInit, + proposal: &ConsensusBlockInfo, ) -> bool { self.should_cache_msg( current_height, @@ -574,9 +574,16 @@ impl MultiHeightManager { let cached_proposals = self.cache.get_current_height_proposals(height); trace!("Cached proposals for height {}: {:?}", height, cached_proposals); - for (init, content_receiver) in cached_proposals { - let new_requests = - self.handle_proposal_known_init(context, height, shc, init, content_receiver).await; + for (block_info, content_receiver) in cached_proposals { + let new_requests = self + .handle_proposal_known_block_info( + context, + height, + shc, + block_info, + content_receiver, + ) + .await; pending_requests.extend(new_requests); } @@ -697,18 +704,18 @@ impl MultiHeightManager { return Ok(VecDeque::new()); }; - let proposal_init: ProposalInit = match first_part.try_into() { - Ok(proposal_init) => proposal_init, + let block_info: ConsensusBlockInfo = match first_part.try_into() { + Ok(block_info) => block_info, Err(e) => { - warn!("Failed to parse incoming proposal init. Dropping proposal: {e}"); + warn!("Failed to parse incoming block info. Dropping proposal: {e}"); return Ok(VecDeque::new()); } }; - match proposal_init.height.cmp(&height) { + match block_info.height.cmp(&height) { std::cmp::Ordering::Greater => { - if self.cache.should_cache_proposal(&height, 0, &proposal_init) { - debug!("Received a proposal for a future height. {:?}", proposal_init); + if self.cache.should_cache_proposal(&height, 0, &block_info) { + debug!("Received a proposal for a future height. {:?}", block_info); // Note: new proposals with the same height/round will be ignored. // // TODO(matan): This only work for trusted peers. In the case of possibly @@ -717,27 +724,23 @@ impl MultiHeightManager { // "good" nodes can propose). // // When moving to version 1.0 make sure this is addressed. - self.cache.cache_future_proposal(proposal_init, content_receiver); + self.cache.cache_future_proposal(block_info, content_receiver); } Ok(VecDeque::new()) } std::cmp::Ordering::Less => { - trace!("Drop proposal from past height. {:?}", proposal_init); + trace!("Drop proposal from past height. {:?}", block_info); Ok(VecDeque::new()) } std::cmp::Ordering::Equal => match shc { Some(shc) => { - if self.cache.should_cache_proposal( - &height, - shc.current_round(), - &proposal_init, - ) { + if self.cache.should_cache_proposal(&height, shc.current_round(), &block_info) { Ok(self - .handle_proposal_known_init( + .handle_proposal_known_block_info( context, height, shc, - proposal_init, + block_info, content_receiver, ) .await) @@ -746,26 +749,25 @@ impl MultiHeightManager { } } None => { - trace!("Drop proposal from just completed height. {:?}", proposal_init); + trace!("Drop proposal from just completed height. {:?}", block_info); Ok(VecDeque::new()) } }, } } - async fn handle_proposal_known_init( + async fn handle_proposal_known_block_info( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, - proposal_init: ProposalInit, + block_info: ConsensusBlockInfo, content_receiver: mpsc::Receiver, ) -> Requests { // Store the stream; requests will reference it by (height, round) - self.current_height_proposals_streams - .insert((height, proposal_init.round), content_receiver); + self.current_height_proposals_streams.insert((height, block_info.round), content_receiver); let leader_fn = make_leader_fn(context, height); - shc.handle_proposal(&leader_fn, proposal_init) + shc.handle_proposal(&leader_fn, block_info) } // Handle a single consensus message. @@ -896,14 +898,14 @@ impl MultiHeightManager { .boxed(); Ok(Some(fut)) } - SMRequest::StartValidateProposal(init) => { + SMRequest::StartValidateProposal(block_info) => { // Look up the stored stream. - let key = (height, init.round); + let round = block_info.round; + let valid_round = block_info.valid_round; + let key = (height, round); if let Some(stream) = self.current_height_proposals_streams.remove(&key) { - let timeout = timeouts.get_proposal_timeout(init.round); - let receiver = context.validate_proposal(init, timeout, stream).await; - let round = init.round; - let valid_round = init.valid_round; + let timeout = timeouts.get_proposal_timeout(round); + let receiver = context.validate_proposal(block_info, timeout, stream).await; let fut = async move { let proposal_id = receiver.await.ok(); StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) diff --git a/crates/apollo_consensus/src/manager_test.rs b/crates/apollo_consensus/src/manager_test.rs index f7167a9d39d..da2a17083bf 100644 --- a/crates/apollo_consensus/src/manager_test.rs +++ b/crates/apollo_consensus/src/manager_test.rs @@ -34,9 +34,9 @@ use starknet_types_core::felt::Felt; use super::{run_consensus, MultiHeightManager, RunHeightRes}; use crate::storage::MockHeightVotedStorageTrait; use crate::test_utils::{ + block_info, precommit, prevote, - proposal_init, MockTestContext, NoOpHeightVotedStorage, TestProposalPart, @@ -148,7 +148,7 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) { // Send messages for height 2 followed by those for height 1. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_2, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; @@ -156,7 +156,7 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) { send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; @@ -237,7 +237,7 @@ async fn run_consensus_sync(consensus_config: ConsensusConfig) { // Send messages for height 2. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_2, ROUND_0, *PROPOSER_ID))], ) .await; let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -279,7 +279,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) { send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_2)).await; @@ -337,7 +337,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) { // reach a decision. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_1, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_1, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *PROPOSER_ID)).await; @@ -361,7 +361,7 @@ async fn timely_message_handling(consensus_config: ConsensusConfig) { let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(0); let (mut content_sender, content_receiver) = mpsc::channel(0); content_sender - .try_send(TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))) + .try_send(TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))) .unwrap(); proposal_receiver_sender.try_send(content_receiver).unwrap(); @@ -416,7 +416,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu // Send proposal and votes for height 2 (should be dropped when processing height 0). send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_2, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; @@ -425,7 +425,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu // Send proposal and votes for height 1 (should be cached when processing height 0). send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; @@ -434,7 +434,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu // Send proposal and votes for height 0 (current height - needed to reach consensus). send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_0, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_0, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ZERO), HEIGHT_0, ROUND_0, *PROPOSER_ID)).await; @@ -543,12 +543,12 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C // Send proposals for rounds 0 and 1, proposal for round 1 should be dropped. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_1, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_1, *PROPOSER_ID))], ) .await; @@ -627,7 +627,7 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C // Send proposal for round 2. send_proposal( &mut proposal_sender_clone, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_2, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_2, *PROPOSER_ID))], ) .await; // Send votes for round 2. @@ -809,7 +809,7 @@ async fn manager_runs_normally_when_height_is_greater_than_last_voted_height( // Send a proposal for the height we already voted on: send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID)).await; @@ -868,7 +868,7 @@ async fn manager_waits_until_height_passes_last_voted_height(consensus_config: C // Send a proposal for the height we already voted on: send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID)).await; @@ -990,7 +990,7 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) { // Send proposal first send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT, ROUND_0, *PROPOSER_ID))], ) .await; @@ -1118,7 +1118,7 @@ async fn manager_ignores_invalid_network_messages(consensus_config: ConsensusCon // Send a valid proposal and valid votes. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], + vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; diff --git a/crates/apollo_consensus/src/simulation_test.rs b/crates/apollo_consensus/src/simulation_test.rs index 51bcc020cd2..9a898c1eac7 100644 --- a/crates/apollo_consensus/src/simulation_test.rs +++ b/crates/apollo_consensus/src/simulation_test.rs @@ -10,7 +10,7 @@ use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::Range; use apollo_consensus_config::config::TimeoutsConfig; -use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType}; +use apollo_protobuf::consensus::{ConsensusBlockInfo, Vote, VoteType}; use lazy_static::lazy_static; use rand::rngs::StdRng; use rand::seq::SliceRandom; @@ -85,7 +85,7 @@ enum InputEvent { /// A vote message from peer node. Vote(Vote), /// A proposal message. - Proposal(ProposalInit), + Proposal(ConsensusBlockInfo), /// An internal event. Internal(StateMachineEvent), } @@ -320,11 +320,12 @@ impl DiscreteEventSimulation { .min(round_start_tick + ROUND_DURATION); self.schedule_at_tick( proposal_tick, - InputEvent::Proposal(ProposalInit { + InputEvent::Proposal(ConsensusBlockInfo { height: HEIGHT_0, round, proposer: leader_id, valid_round: None, + ..Default::default() }), ); } @@ -397,11 +398,12 @@ impl DiscreteEventSimulation { // Send a proposal even when not the leader self.schedule_at_tick( round_start_tick + 1, - InputEvent::Proposal(ProposalInit { + InputEvent::Proposal(ConsensusBlockInfo { height: HEIGHT_0, round, proposer: node_id, valid_round: None, + ..Default::default() }), ); } @@ -524,14 +526,14 @@ impl DiscreteEventSimulation { fn handle_requests(&mut self, reqs: VecDeque) -> Option { for req in reqs { match req { - SMRequest::StartValidateProposal(init) => { + SMRequest::StartValidateProposal(block_info) => { let delay = self.rng.gen_range(VALIDATION_DELAY_RANGE); let validate_finish_tick = self.current_tick + delay; let proposal_commitment = - Some(proposal_commitment_for_round(init.round, false)); + Some(proposal_commitment_for_round(block_info.round, false)); let result = StateMachineEvent::FinishedValidation( proposal_commitment, - init.round, + block_info.round, None, ); self.schedule_at_tick(validate_finish_tick, InputEvent::Internal(result)); diff --git a/crates/apollo_consensus/src/single_height_consensus.rs b/crates/apollo_consensus/src/single_height_consensus.rs index 8c2fb8b966d..190011f2cb4 100644 --- a/crates/apollo_consensus/src/single_height_consensus.rs +++ b/crates/apollo_consensus/src/single_height_consensus.rs @@ -20,7 +20,7 @@ const DUPLICATE_VOTE_LOG_PERIOD_MS: u64 = 10_000; use apollo_consensus_config::config::TimeoutsConfig; use apollo_infra_utils::trace_every_n_ms; -use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType}; +use apollo_protobuf::consensus::{ConsensusBlockInfo, Vote, VoteType}; use starknet_api::block::BlockNumber; use tracing::{debug, info, instrument, trace, warn}; @@ -89,49 +89,50 @@ impl SingleHeightConsensus { self.state_machine.start(leader_fn) } - /// Process the proposal init and initiate block validation by returning + /// Process the proposal block info message and initiate block validation by returning /// `SMRequest::StartValidateProposal` to the manager. #[instrument(skip_all)] pub(crate) fn handle_proposal( &mut self, leader_fn: &LeaderFn, - init: ProposalInit, + block_info: ConsensusBlockInfo, ) -> Requests where LeaderFn: Fn(Round) -> ValidatorId, { - debug!("Received {init:?}"); + debug!("Received {block_info:?}"); let height = self.state_machine.height(); - if init.height != height { - warn!("Invalid proposal height: expected {:?}, got {:?}", height, init.height); + if block_info.height != height { + warn!("Invalid proposal height: expected {:?}, got {:?}", height, block_info.height); return VecDeque::new(); } - let proposer_id = leader_fn(init.round); - if init.proposer != proposer_id { - warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer); + let proposer_id = leader_fn(block_info.round); + if block_info.proposer != proposer_id { + warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, block_info.proposer); return VecDeque::new(); } // Avoid duplicate validations: // - If SM already has an entry for this round, a (re)proposal was already recorded. // - If we already started validating this round, ignore repeats. - if self.state_machine.has_proposal_for_round(init.round) - || self.pending_validation_rounds.contains(&init.round) + if self.state_machine.has_proposal_for_round(block_info.round) + || self.pending_validation_rounds.contains(&block_info.round) { - warn!("Round {} already handled a proposal, ignoring", init.round); + warn!("Round {} already handled a proposal, ignoring", block_info.round); return VecDeque::new(); } - let timeout = self.timeouts.get_proposal_timeout(init.round); + let timeout = self.timeouts.get_proposal_timeout(block_info.round); info!( - "Accepting {init:?}. node_round: {}, timeout: {timeout:?}", + "Accepting {block_info:?}. node_round: {}, timeout: {timeout:?}", self.state_machine.round() ); + // TODO(Asmaa): rename the metric. CONSENSUS_PROPOSALS_VALID_INIT.increment(1); // Since validating the proposal is non-blocking, avoid validating the same round twice in // parallel (e.g., due to repeats or spam). - self.pending_validation_rounds.insert(init.round); + self.pending_validation_rounds.insert(block_info.round); // Ask the manager to start validation. - VecDeque::from([SMRequest::StartValidateProposal(init)]) + VecDeque::from([SMRequest::StartValidateProposal(block_info)]) } #[instrument(skip_all)] diff --git a/crates/apollo_consensus/src/single_height_consensus_test.rs b/crates/apollo_consensus/src/single_height_consensus_test.rs index 34a90a8f157..2a838e9bbde 100644 --- a/crates/apollo_consensus/src/single_height_consensus_test.rs +++ b/crates/apollo_consensus/src/single_height_consensus_test.rs @@ -8,7 +8,7 @@ use test_case::test_case; use super::SingleHeightConsensus; use crate::state_machine::{SMRequest, StateMachineEvent, Step}; -use crate::test_utils::{precommit, prevote, TestBlock}; +use crate::test_utils::{block_info, precommit, prevote, TestBlock}; use crate::types::{ProposalCommitment, Round, ValidatorId}; use crate::votes_threshold::QuorumType; @@ -29,10 +29,6 @@ const HEIGHT_0: BlockNumber = BlockNumber(0); const ROUND_0: Round = 0; const ROUND_1: Round = 1; -fn get_proposal_init_for_height(height: BlockNumber) -> ProposalInit { - ProposalInit { height, ..*PROPOSAL_INIT } -} - #[test] fn proposer() { let mut shc = SingleHeightConsensus::new( @@ -92,7 +88,7 @@ fn proposer() { #[test_case(false; "single_proposal")] #[test_case(true; "repeat_proposal")] fn validator(repeat_proposal: bool) { - let proposal_init = get_proposal_init_for_height(HEIGHT_0); + let block_info = block_info(HEIGHT_0, ROUND_0, *PROPOSER_ID); let mut shc = SingleHeightConsensus::new( HEIGHT_0, false, @@ -103,17 +99,18 @@ fn validator(repeat_proposal: bool) { ); let leader_fn = |_round| -> ValidatorId { *PROPOSER_ID }; - // Accept init -> should request validation. - let ret = shc.handle_proposal(&leader_fn, proposal_init); + // Accept block info -> should request validation. + let round = block_info.round; + let ret = shc.handle_proposal(&leader_fn, block_info.clone()); assert_matches!(ret, mut reqs => { - assert_matches!(reqs.pop_front(), Some(SMRequest::StartValidateProposal(init)) if init == proposal_init); + assert_matches!(reqs.pop_front(), Some(SMRequest::StartValidateProposal(info)) if info == block_info); assert!(reqs.is_empty()); }); // After validation finished -> expect prevote broadcast request. let ret = shc.handle_event( &leader_fn, - StateMachineEvent::FinishedValidation(Some(BLOCK.id), proposal_init.round, None), + StateMachineEvent::FinishedValidation(Some(BLOCK.id), round, None), ); assert_matches!(ret, mut reqs => { assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote); @@ -121,8 +118,8 @@ fn validator(repeat_proposal: bool) { }); if repeat_proposal { - // Duplicate proposal init should be ignored. - let ret = shc.handle_proposal(&leader_fn, proposal_init); + // Duplicate block info should be ignored. + let ret = shc.handle_proposal(&leader_fn, block_info.clone()); assert!(matches!(ret, rs if rs.is_empty())); } @@ -156,7 +153,7 @@ fn validator(repeat_proposal: bool) { #[test_case(true; "repeat")] #[test_case(false; "equivocation")] fn vote_twice(same_vote: bool) { - let proposal_init = get_proposal_init_for_height(HEIGHT_0); + let block_info = block_info(HEIGHT_0, ROUND_0, *PROPOSER_ID); let mut shc = SingleHeightConsensus::new( HEIGHT_0, false, @@ -167,10 +164,11 @@ fn vote_twice(same_vote: bool) { ); let leader_fn = |_round| -> ValidatorId { *PROPOSER_ID }; // Validate a proposal so the SM is ready to prevote. - shc.handle_proposal(&leader_fn, proposal_init); + let round = block_info.round; + shc.handle_proposal(&leader_fn, block_info); shc.handle_event( &leader_fn, - StateMachineEvent::FinishedValidation(Some(BLOCK.id), proposal_init.round, None), + StateMachineEvent::FinishedValidation(Some(BLOCK.id), round, None), ); let _ = shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *PROPOSER_ID)); @@ -398,7 +396,7 @@ async fn duplicate_votes_during_awaiting_finished_building_are_ignored() { #[test] fn broadcast_vote_before_decision_on_validation_finish() { - let proposal_init = get_proposal_init_for_height(HEIGHT_0); + let block_info = block_info(HEIGHT_0, ROUND_0, *PROPOSER_ID); let mut shc = SingleHeightConsensus::new( HEIGHT_0, false, @@ -410,9 +408,10 @@ fn broadcast_vote_before_decision_on_validation_finish() { let leader_fn = |_round| -> ValidatorId { *PROPOSER_ID }; // 1. Accept proposal -> should request validation - let ret = shc.handle_proposal(&leader_fn, proposal_init); + let round = block_info.round; + let ret = shc.handle_proposal(&leader_fn, block_info); assert_matches!(ret, mut reqs => { - assert_matches!(reqs.pop_front(), Some(SMRequest::StartValidateProposal(init)) if init == proposal_init); + assert_matches!(reqs.pop_front(), Some(SMRequest::StartValidateProposal(_block_info))); assert!(reqs.is_empty()); }); @@ -443,7 +442,7 @@ fn broadcast_vote_before_decision_on_validation_finish() { // 6. Should return BOTH BroadcastVote (precommit) and DecisionReached let ret = shc.handle_event( &leader_fn, - StateMachineEvent::FinishedValidation(Some(BLOCK.id), proposal_init.round, None), + StateMachineEvent::FinishedValidation(Some(BLOCK.id), round, None), ); assert_matches!(ret, mut reqs => { assert_matches!( diff --git a/crates/apollo_consensus/src/state_machine.rs b/crates/apollo_consensus/src/state_machine.rs index ba8173821d7..26588334e59 100644 --- a/crates/apollo_consensus/src/state_machine.rs +++ b/crates/apollo_consensus/src/state_machine.rs @@ -9,7 +9,7 @@ mod state_machine_test; use std::collections::{HashMap, HashSet, VecDeque}; -use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType}; +use apollo_protobuf::consensus::{ConsensusBlockInfo, ProposalInit, Vote, VoteType}; use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tracing::{debug, info, trace, warn}; @@ -64,7 +64,7 @@ pub(crate) enum SMRequest { /// Request to build a proposal for a new round. StartBuildProposal(Round), /// Request to validate a received proposal from the network. - StartValidateProposal(ProposalInit), + StartValidateProposal(ConsensusBlockInfo), /// Request to broadcast a Prevote or Precommit vote. BroadcastVote(Vote), /// Request to schedule a timeout for a specific step and round. diff --git a/crates/apollo_consensus/src/stream_handler_test.rs b/crates/apollo_consensus/src/stream_handler_test.rs index dc2c6dc9895..618c516a374 100644 --- a/crates/apollo_consensus/src/stream_handler_test.rs +++ b/crates/apollo_consensus/src/stream_handler_test.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use apollo_consensus_config::config::StreamHandlerConfig; use apollo_network::network_manager::{BroadcastTopicClientTrait, ReceivedBroadcastedMessage}; use apollo_network_types::network_types::BroadcastedMessageMetadata; -use apollo_protobuf::consensus::{ProposalInit, ProposalPart, StreamMessageBody}; +use apollo_protobuf::consensus::{ConsensusBlockInfo, ProposalPart, StreamMessageBody}; use apollo_protobuf::converters::ProtobufConversionError; use apollo_test_utils::{get_rng, GetTestInstance}; use futures::channel::mpsc::{self, Receiver, SendError, Sender}; @@ -119,7 +119,7 @@ fn setup() -> ( fn build_init_message(round: u32, stream_id: u64, message_id: u32) -> StreamMessage { StreamMessage { - message: StreamMessageBody::Content(ProposalPart::Init(ProposalInit { + message: StreamMessageBody::Content(ProposalPart::BlockInfo(ConsensusBlockInfo { round, ..Default::default() })), @@ -162,8 +162,9 @@ async fn outbound_single() { // Send the content of the stream. for i in 0..num_messages { - let init = ProposalPart::Init(ProposalInit { round: i, ..Default::default() }); - sender.send(init).await.unwrap(); + let block_info = + ProposalPart::BlockInfo(ConsensusBlockInfo { round: i, ..Default::default() }); + sender.send(block_info).await.unwrap(); } // Check the content is sent to the network in order. @@ -210,8 +211,9 @@ async fn outbound_multiple() { for stream_id in 0..num_streams { let sender = stream_senders.get_mut(as_usize(stream_id)).unwrap(); for i in 0..num_messages { - let init = ProposalPart::Init(ProposalInit { round: i, ..Default::default() }); - sender.send(init).await.unwrap(); + let block_info = + ProposalPart::BlockInfo(ConsensusBlockInfo { round: i, ..Default::default() }); + sender.send(block_info).await.unwrap(); } } @@ -267,7 +269,10 @@ async fn inbound_in_order() { let mut receiver = streamhandler_to_client_receiver.next().now_or_never().unwrap().unwrap(); for i in 0..num_messages { let message = receiver.next().await.unwrap(); - assert_eq!(message, ProposalPart::Init(ProposalInit { round: i, ..Default::default() })); + assert_eq!( + message, + ProposalPart::BlockInfo(ConsensusBlockInfo { round: i, ..Default::default() }) + ); } // Check that the receiver was closed: assert!(matches!(receiver.try_next(), Ok(None))); @@ -302,7 +307,10 @@ async fn lru_cache_for_inbound_streams() { let message = receiver.next().await.unwrap(); assert_eq!( message, - ProposalPart::Init(ProposalInit { round: i.try_into().unwrap(), ..Default::default() }) + ProposalPart::BlockInfo(ConsensusBlockInfo { + round: i.try_into().unwrap(), + ..Default::default() + }) ); if i == 0 { // This stream was reopened, but it should only have one message, and left open. @@ -348,10 +356,9 @@ async fn inbound_multiple() { for i in 0..num_messages { let message = receiver.next().await.unwrap(); actual_msgs.get_mut(as_usize(sid)).unwrap().push(message); - expected_msgs - .get_mut(as_usize(sid)) - .unwrap() - .push(ProposalPart::Init(ProposalInit { round: i, ..Default::default() })); + expected_msgs.get_mut(as_usize(sid)).unwrap().push(ProposalPart::BlockInfo( + ConsensusBlockInfo { round: i, ..Default::default() }, + )); } // Check that the receiver was closed: assert!(matches!(receiver.try_next(), Ok(None))); @@ -396,7 +403,10 @@ async fn inbound_delayed_first() { // Fin is communicated by dropping the sender, hence `..num_message` not `..=num_messages` for i in 0..num_messages { let message = receiver.next().await.unwrap(); - assert_eq!(message, ProposalPart::Init(ProposalInit { round: i, ..Default::default() })); + assert_eq!( + message, + ProposalPart::BlockInfo(ConsensusBlockInfo { round: i, ..Default::default() }) + ); } // Check that the receiver was closed: assert!(matches!(receiver.try_next(), Ok(None))); @@ -433,7 +443,10 @@ async fn inbound_delayed_middle() { let mut receiver = streamhandler_to_client_receiver.next().now_or_never().unwrap().unwrap(); for i in 0..missing_message_id { let message = receiver.next().await.unwrap(); - assert_eq!(message, ProposalPart::Init(ProposalInit { round: i, ..Default::default() })); + assert_eq!( + message, + ProposalPart::BlockInfo(ConsensusBlockInfo { round: i, ..Default::default() }) + ); } // Send the missing message now. @@ -446,7 +459,10 @@ async fn inbound_delayed_middle() { // Fin is communicated by dropping the sender, hence `..num_message` not `..=num_messages` for i in missing_message_id..num_messages { let message = receiver.next().await.unwrap(); - assert_eq!(message, ProposalPart::Init(ProposalInit { round: i, ..Default::default() })); + assert_eq!( + message, + ProposalPart::BlockInfo(ConsensusBlockInfo { round: i, ..Default::default() }) + ); } // Check that the receiver was closed: assert!(matches!(receiver.try_next(), Ok(None))); diff --git a/crates/apollo_consensus/src/test_utils.rs b/crates/apollo_consensus/src/test_utils.rs index 144bf19ff20..d0b0a3e01f1 100644 --- a/crates/apollo_consensus/src/test_utils.rs +++ b/crates/apollo_consensus/src/test_utils.rs @@ -2,7 +2,13 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; -use apollo_protobuf::consensus::{ProposalCommitment, ProposalInit, Vote, VoteType}; +use apollo_protobuf::consensus::{ + ConsensusBlockInfo, + ProposalCommitment, + ProposalInit, + Vote, + VoteType, +}; use apollo_protobuf::converters::ProtobufConversionError; use apollo_storage::db::DbConfig; use apollo_storage::StorageConfig; @@ -24,21 +30,21 @@ pub struct TestBlock { #[derive(Debug, PartialEq, Clone)] pub enum TestProposalPart { - Init(ProposalInit), + BlockInfo(ConsensusBlockInfo), Invalid, } -impl From for TestProposalPart { - fn from(init: ProposalInit) -> Self { - TestProposalPart::Init(init) +impl From for TestProposalPart { + fn from(block_info: ConsensusBlockInfo) -> Self { + TestProposalPart::BlockInfo(block_info) } } -impl TryFrom for ProposalInit { +impl TryFrom for ConsensusBlockInfo { type Error = ProtobufConversionError; fn try_from(part: TestProposalPart) -> Result { - if let TestProposalPart::Init(init) = part { - return Ok(init); + if let TestProposalPart::BlockInfo(block_info) = part { + return Ok(block_info); } Err(ProtobufConversionError::SerdeJsonError("Invalid proposal part".to_string())) } @@ -46,8 +52,8 @@ impl TryFrom for ProposalInit { impl From for Vec { fn from(part: TestProposalPart) -> Vec { - if let TestProposalPart::Init(init) = part { - return init.into(); + if let TestProposalPart::BlockInfo(block_info) = part { + return block_info.into(); } vec![] } @@ -57,7 +63,7 @@ impl TryFrom> for TestProposalPart { type Error = ProtobufConversionError; fn try_from(value: Vec) -> Result { - Ok(TestProposalPart::Init(value.try_into()?)) + Ok(TestProposalPart::BlockInfo(value.try_into()?)) } } @@ -77,7 +83,7 @@ mock! { async fn validate_proposal( &mut self, - init: ProposalInit, + block_info: ConsensusBlockInfo, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver; @@ -130,6 +136,10 @@ pub fn proposal_init(height: BlockNumber, round: Round, proposer: ValidatorId) - ProposalInit { height, round, proposer, ..Default::default() } } +pub fn block_info(height: BlockNumber, round: Round, proposer: ValidatorId) -> ConsensusBlockInfo { + ConsensusBlockInfo { height, round, proposer, ..Default::default() } +} + #[derive(Debug)] pub struct NoOpHeightVotedStorage; diff --git a/crates/apollo_consensus/src/types.rs b/crates/apollo_consensus/src/types.rs index 9e8dddee9bd..7f140ba6139 100644 --- a/crates/apollo_consensus/src/types.rs +++ b/crates/apollo_consensus/src/types.rs @@ -9,8 +9,8 @@ use apollo_network::network_manager::{ GenericReceiver, }; use apollo_network_types::network_types::BroadcastedMessageMetadata; +use apollo_protobuf::consensus::{ConsensusBlockInfo, ProposalInit, Vote}; pub use apollo_protobuf::consensus::{ProposalCommitment, Round}; -use apollo_protobuf::consensus::{ProposalInit, Vote}; use apollo_protobuf::converters::ProtobufConversionError; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; @@ -30,11 +30,11 @@ pub type ValidatorId = ContractAddress; #[async_trait] pub trait ConsensusContext { /// The parts of the proposal that are streamed in. - /// Must contain at least the ProposalInit and ProposalFin. + /// Must contain at least the ConsensusBlockInfo and ProposalFin. type ProposalPart: TryFrom, Error = ProtobufConversionError> + Into> - + TryInto - + From + + TryInto + + From + Clone + Send + Debug; @@ -76,7 +76,7 @@ pub trait ConsensusContext { /// by ConsensusContext. async fn validate_proposal( &mut self, - init: ProposalInit, + block_info: ConsensusBlockInfo, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver; @@ -86,7 +86,7 @@ pub trait ConsensusContext { /// /// Params: /// - `id`: The `ProposalCommitment` associated with the block's content. - /// - `init`: The `ProposalInit` that is broadcast to the network. + /// - `init`: The consensus metadata for reproposing. async fn repropose(&mut self, id: ProposalCommitment, init: ProposalInit); /// Get the set of validators for a given height. These are the nodes that can propose and vote diff --git a/crates/apollo_consensus_orchestrator/src/build_proposal.rs b/crates/apollo_consensus_orchestrator/src/build_proposal.rs index f87349db79c..928ac5f99d2 100644 --- a/crates/apollo_consensus_orchestrator/src/build_proposal.rs +++ b/crates/apollo_consensus_orchestrator/src/build_proposal.rs @@ -105,10 +105,9 @@ pub(crate) enum BuildProposalError { pub(crate) async fn build_proposal( mut args: ProposalBuildArguments, ) -> BuildProposalResult { - let block_info = initiate_build(&args).await?; - args.stream_sender.send(ProposalPart::Init(args.proposal_init)).await.map_err(|e| { - BuildProposalError::SendError(format!("Failed to send proposal init: {e:?}")) - })?; + let block_info = initiate_build(&mut args).await?; + let height = block_info.height; + args.stream_sender .send(ProposalPart::BlockInfo(block_info.clone())) .await @@ -120,7 +119,7 @@ pub(crate) async fn build_proposal( // with `repropose` being called before `valid_proposals` is updated. let mut valid_proposals = args.valid_proposals.lock().expect("Lock was poisoned"); valid_proposals.insert_proposal_for_height( - &args.proposal_init.height, + &height, &proposal_commitment, block_info, content, @@ -143,7 +142,9 @@ async fn get_proposal_timestamp( clock.unix_now() } -async fn initiate_build(args: &ProposalBuildArguments) -> BuildProposalResult { +async fn initiate_build( + args: &mut ProposalBuildArguments, +) -> BuildProposalResult { let timestamp = get_proposal_timestamp( args.use_state_sync_block_timestamp, &args.deps.state_sync_client, @@ -159,8 +160,11 @@ async fn initiate_build(args: &ProposalBuildArguments) -> BuildProposalResult); +type ValidationParams = (ConsensusBlockInfo, Duration, mpsc::Receiver); type HeightToIdToContent = BTreeMap< BlockNumber, @@ -509,6 +509,7 @@ impl ConsensusContext for SequencerConsensusContext { let use_state_sync_block_timestamp = self.config.static_config.deployment_mode.use_state_sync_block_timestamp(); + let round = proposal_init.round; let args = ProposalBuildArguments { deps: self.deps.clone(), batcher_deadline, @@ -552,9 +553,7 @@ impl ConsensusContext for SequencerConsensusContext { } } } - .instrument( - error_span!("consensus_build_proposal", %proposal_id, round=proposal_init.round), - ), + .instrument(error_span!("consensus_build_proposal", %proposal_id, round)), ); assert!(self.active_proposal.is_none()); self.active_proposal = Some((cancel_token_clone, handle)); @@ -565,13 +564,13 @@ impl ConsensusContext for SequencerConsensusContext { #[instrument(skip_all)] async fn validate_proposal( &mut self, - proposal_init: ProposalInit, + block_info: ConsensusBlockInfo, timeout: Duration, content_receiver: mpsc::Receiver, ) -> oneshot::Receiver { - assert_eq!(Some(proposal_init.height), self.current_height); + assert_eq!(Some(block_info.height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); - match proposal_init.round.cmp(&self.current_round) { + match block_info.round.cmp(&self.current_round) { std::cmp::Ordering::Less => { trace!("Dropping proposal from past round"); fin_receiver @@ -579,17 +578,14 @@ impl ConsensusContext for SequencerConsensusContext { std::cmp::Ordering::Greater => { trace!("Queueing proposal for future round."); self.queued_proposals.insert( - proposal_init.round, - ( - (proposal_init.height, proposal_init.proposer, timeout, content_receiver), - fin_sender, - ), + block_info.round, + ((block_info, timeout, content_receiver), fin_sender), ); fin_receiver } std::cmp::Ordering::Equal => { let block_info_validation = BlockInfoValidation { - height: proposal_init.height, + height: block_info.height, block_timestamp_window_seconds: self .config .static_config @@ -604,8 +600,8 @@ impl ConsensusContext for SequencerConsensusContext { .unwrap_or(self.l2_gas_price), }; self.validate_current_round_proposal( + block_info, block_info_validation, - proposal_init.proposer, timeout, self.config.static_config.validate_proposal_margin_millis, content_receiver, @@ -631,15 +627,9 @@ impl ConsensusContext for SequencerConsensusContext { let mut stream_sender = self.start_stream(HeightAndRound(height.0, init.round)).await; tokio::spawn( async move { - let res = send_reproposal( - id, - init, - block_info, - txs, - &mut stream_sender, - transaction_converter, - ) - .await; + let res = + send_reproposal(id, block_info, txs, &mut stream_sender, transaction_converter) + .await; match res { Ok(()) => { info!(?id, ?init, "Reproposal succeeded."); @@ -753,8 +743,13 @@ impl ConsensusContext for SequencerConsensusContext { ); return false; } + // TODO(Asmaa): Consider storing only the necessary fields from the previous block + // (L1 gas prices, timestamp) instead of the full ConsensusBlockInfo. self.previous_block_info = Some(ConsensusBlockInfo { height, + round: 0, + valid_round: None, + proposer: sync_block.block_header_without_hash.sequencer.0, timestamp: timestamp.0, builder: sync_block.block_header_without_hash.sequencer.0, l1_da_mode: sync_block.block_header_without_hash.l1_da_mode, @@ -835,11 +830,11 @@ impl ConsensusContext for SequencerConsensusContext { } } // Validate the proposal for the current round if exists. - let Some(((height, validator, timeout, content), fin_sender)) = to_process else { + let Some(((block_info, timeout, content), fin_sender)) = to_process else { return Ok(()); }; let block_info_validation = BlockInfoValidation { - height, + height: block_info.height, block_timestamp_window_seconds: self .config .static_config @@ -854,8 +849,8 @@ impl ConsensusContext for SequencerConsensusContext { .unwrap_or(self.l2_gas_price), }; self.validate_current_round_proposal( + block_info, block_info_validation, - validator, timeout, self.config.static_config.validate_proposal_margin_millis, content, @@ -869,8 +864,8 @@ impl ConsensusContext for SequencerConsensusContext { impl SequencerConsensusContext { async fn validate_current_round_proposal( &mut self, + block_info: ConsensusBlockInfo, block_info_validation: BlockInfoValidation, - proposer: ValidatorId, timeout: Duration, batcher_timeout_margin: Duration, content_receiver: mpsc::Receiver, @@ -878,13 +873,14 @@ impl SequencerConsensusContext { ) { let proposal_id = ProposalId(self.proposal_id); self.proposal_id += 1; - info!(?timeout, %proposal_id, %proposer, round=self.current_round, "Start validating proposal"); + info!(?timeout, %proposal_id, proposer=%block_info.proposer, round=self.current_round, "Start validating proposal"); let cancel_token = CancellationToken::new(); let cancel_token_clone = cancel_token.clone(); let gas_price_params = make_gas_price_params(&self.config.dynamic_config); let args = ProposalValidateArguments { deps: self.deps.clone(), + block_info, block_info_validation, proposal_id, timeout, @@ -954,13 +950,11 @@ async fn validate_and_send( async fn send_reproposal( id: ProposalCommitment, - init: ProposalInit, block_info: ConsensusBlockInfo, txs: Vec>, stream_sender: &mut StreamSender, transaction_converter: Arc, ) -> Result<(), ReproposeError> { - stream_sender.send(ProposalPart::Init(init)).await?; stream_sender.send(ProposalPart::BlockInfo(block_info)).await?; let mut n_executed_txs: usize = 0; for batch in txs.iter() { diff --git a/crates/apollo_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/apollo_consensus_orchestrator/src/sequencer_consensus_context_test.rs index dcb83760b46..f39df4890ff 100644 --- a/crates/apollo_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/apollo_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -1,7 +1,14 @@ use std::future::ready; use std::sync::Arc; -use apollo_batcher_types::batcher_types::{CentralObjects, DecisionReachedResponse}; +use apollo_batcher_types::batcher_types::{ + CentralObjects, + DecisionReachedResponse, + ProposalCommitment as BatcherProposalCommitment, + ProposalStatus, + SendProposalContent, + SendProposalContentResponse, +}; use apollo_batcher_types::communication::BatcherClientError; use apollo_batcher_types::errors::BatcherError; use apollo_config_manager_types::communication::MockConfigManagerClient; @@ -83,37 +90,12 @@ async fn validate_proposal_success() { // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); - - let content_receiver = - send_proposal_to_validator_context(&mut context, block_info(BlockNumber(0))).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + context.validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT, content_receiver).await; assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); } -#[tokio::test] -async fn dont_send_block_info() { - let (mut deps, _network) = create_test_and_network_deps(); - - deps.batcher - .expect_start_height() - .times(1) - .withf(|input| input.height == BlockNumber(0)) - .return_const(Ok(())); - let mut context = deps.build_context(); - - // Initialize the context for a specific height, starting with round 0. - context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); - - let (mut content_sender, content_receiver) = - mpsc::channel(context.config.static_config.proposal_buffer_size); - let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; - content_sender.close_channel(); - // No block info was sent, the proposal is invalid. - assert!(fin_receiver.await.is_err()); -} - #[rstest] #[case::execute_all_txs(true)] #[case::dont_execute_last_tx(false)] @@ -135,8 +117,7 @@ async fn validate_then_repropose(#[case] execute_all_txs: bool) { // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - let block_info = ProposalPart::BlockInfo(block_info(BlockNumber(0))); - content_sender.send(block_info.clone()).await.unwrap(); + let block_info = block_info(BlockNumber(0), 0); let transactions = ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() }); content_sender.send(transactions.clone()).await.unwrap(); @@ -146,15 +127,14 @@ async fn validate_then_repropose(#[case] execute_all_txs: bool) { }); content_sender.send(fin.clone()).await.unwrap(); let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + context.validate_proposal(block_info.clone(), TIMEOUT, content_receiver).await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); let init = ProposalInit { round: 1, ..Default::default() }; context.repropose(ProposalCommitment(STATE_DIFF_COMMITMENT.0.0), init).await; let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); - assert_eq!(receiver.next().await.unwrap(), ProposalPart::Init(init)); - assert_eq!(receiver.next().await.unwrap(), block_info); + assert_eq!(receiver.next().await.unwrap(), ProposalPart::BlockInfo(block_info)); assert_eq!( receiver.next().await.unwrap(), ProposalPart::Transactions(TransactionBatch { transactions: executed_transactions }) @@ -183,37 +163,29 @@ async fn proposals_from_different_rounds() { // The proposal from the past round is ignored. let (mut content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap(); content_sender.send(prop_part_txs.clone()).await.unwrap(); - let mut init = ProposalInit { round: 0, ..Default::default() }; - let fin_receiver_past_round = context.validate_proposal(init, TIMEOUT, content_receiver).await; + let fin_receiver_past_round = + context.validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT, content_receiver).await; // No fin was sent, channel remains open. assert!(fin_receiver_past_round.await.is_err()); // The proposal from the current round should be validated. let (mut content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap(); content_sender.send(prop_part_txs.clone()).await.unwrap(); content_sender.send(prop_part_fin.clone()).await.unwrap(); - init.round = 1; - let fin_receiver_curr_round = context.validate_proposal(init, TIMEOUT, content_receiver).await; + let fin_receiver_curr_round = + context.validate_proposal(block_info(BlockNumber(0), 1), TIMEOUT, content_receiver).await; assert_eq!(fin_receiver_curr_round.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); // The proposal from the future round should not be processed. let (mut content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap(); content_sender.send(prop_part_txs.clone()).await.unwrap(); content_sender.send(prop_part_fin.clone()).await.unwrap(); - let fin_receiver_future_round = context - .validate_proposal( - ProposalInit { round: 2, ..Default::default() }, - TIMEOUT, - content_receiver, - ) - .await; + let fin_receiver_future_round = + context.validate_proposal(block_info(BlockNumber(0), 2), TIMEOUT, content_receiver).await; content_sender.close_channel(); // Even with sending fin and closing the channel. assert!(fin_receiver_future_round.now_or_never().is_none()); @@ -222,7 +194,39 @@ async fn proposals_from_different_rounds() { #[tokio::test] async fn interrupt_active_proposal() { let (mut deps, _network) = create_test_and_network_deps(); - deps.setup_deps_for_validate(SetupDepsArgs::default()); + // This test validates two proposals: round 0 (interrupted) and round 1 (successful) + deps.setup_default_expectations(); + + // Expect 2 validate_block calls (one for each round) + deps.batcher.expect_validate_block().times(2).returning(|_| Ok(())); + deps.batcher + .expect_start_height() + .withf(|input| input.height == BlockNumber(0)) + .return_const(Ok(())); + + // Round 0: Will be interrupted and send Abort + deps.batcher.expect_send_proposal_content().times(1).returning(|input| { + assert!(matches!(input.content, SendProposalContent::Abort)); + Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) + }); + + // Round 1: Will send Txs then Finish + deps.batcher.expect_send_proposal_content().times(1).returning(|input| { + let SendProposalContent::Txs(txs) = input.content else { + panic!("Expected Txs"); + }; + assert_eq!(txs, *INTERNAL_TX_BATCH); + Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) + }); + deps.batcher.expect_send_proposal_content().times(1).returning(|input| { + assert!(matches!(input.content, SendProposalContent::Finish(_))); + Ok(SendProposalContentResponse { + response: ProposalStatus::Finished(BatcherProposalCommitment { + state_diff_commitment: STATE_DIFF_COMMITMENT, + }), + }) + }); + let mut context = deps.build_context(); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); @@ -232,17 +236,11 @@ async fn interrupt_active_proposal() { let (mut _content_sender_0, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); let fin_receiver_0 = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + context.validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT, content_receiver).await; - let content_receiver = - send_proposal_to_validator_context(&mut context, block_info(BlockNumber(0))).await; - let fin_receiver_1 = context - .validate_proposal( - ProposalInit { round: 1, ..Default::default() }, - TIMEOUT, - content_receiver, - ) - .await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; + let fin_receiver_1 = + context.validate_proposal(block_info(BlockNumber(0), 1), TIMEOUT, content_receiver).await; // Move the context to the next round. context.set_height_and_round(BlockNumber(0), 1).await.unwrap(); @@ -261,7 +259,6 @@ async fn build_proposal() { let fin_receiver = context.build_proposal(ProposalInit::default(), TIMEOUT).await.unwrap(); // Test proposal parts. let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); - assert_eq!(receiver.next().await.unwrap(), ProposalPart::Init(ProposalInit::default())); let block_info = receiver.next().await.unwrap(); let after: u64 = chrono::Utc::now().timestamp().try_into().expect("Timestamp conversion failed"); @@ -448,12 +445,12 @@ async fn batcher_not_ready(#[case] proposer: bool) { let fin_receiver = context.build_proposal(ProposalInit::default(), TIMEOUT).await.unwrap(); assert_eq!(fin_receiver.await, Err(Canceled)); } else { - let (mut content_sender, content_receiver) = + let (_content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap(); - let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + let fin_receiver = context + .validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT, content_receiver) + .await; assert_eq!(fin_receiver.await, Err(Canceled)); } } @@ -477,7 +474,6 @@ async fn propose_then_repropose(#[case] execute_all_txs: bool) { let fin_receiver = context.build_proposal(ProposalInit::default(), TIMEOUT).await.unwrap(); let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); // Receive the proposal parts. - let _init = receiver.next().await.unwrap(); let block_info = receiver.next().await.unwrap(); let _txs = receiver.next().await.unwrap(); let fin = receiver.next().await.unwrap(); @@ -492,7 +488,6 @@ async fn propose_then_repropose(#[case] execute_all_txs: bool) { .await; // Re-propose sends the same proposal. let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); - let _init = receiver.next().await.unwrap(); assert_eq!(receiver.next().await.unwrap(), block_info); let reproposed_txs = ProposalPart::Transactions(TransactionBatch { transactions }); @@ -514,29 +509,29 @@ async fn gas_price_fri_out_of_range() { .return_const(Ok(())); let mut context = deps.build_context(); context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); - let (mut content_sender, content_receiver) = + let (_content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - // Send a block info with l1_gas_price_fri that is outside the margin of error. - let mut block_info_1 = block_info(BlockNumber(0)); + // Receive a block info with l1_gas_price_fri that is outside the margin of error. + let mut block_info_1 = block_info(BlockNumber(0), 0); block_info_1.l1_gas_price_fri = block_info_1.l1_gas_price_fri.checked_mul_u128(2).unwrap(); - content_sender.send(ProposalPart::BlockInfo(block_info_1).clone()).await.unwrap(); // Use a large enough timeout to ensure fin_receiver was canceled due to invalid block_info, // not due to a timeout. let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT * 100, content_receiver).await; + context.validate_proposal(block_info_1, TIMEOUT * 100, content_receiver).await; assert_eq!(fin_receiver.await, Err(Canceled)); // Do the same for data gas price. let (mut content_sender, content_receiver) = mpsc::channel(context.config.static_config.proposal_buffer_size); - let mut block_info_2 = block_info(BlockNumber(0)); + let mut block_info_2 = block_info(BlockNumber(0), 0); block_info_2.l1_data_gas_price_fri = block_info_2.l1_data_gas_price_fri.checked_mul_u128(2).unwrap(); content_sender.send(ProposalPart::BlockInfo(block_info_2).clone()).await.unwrap(); // Use a large enough timeout to ensure fin_receiver was canceled due to invalid block_info, // not due to a timeout. - let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT * 100, content_receiver).await; + let fin_receiver = context + .validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT * 100, content_receiver) + .await; assert_eq!(fin_receiver.await, Err(Canceled)); // TODO(guyn): How to check that the rejection is due to the l1_gas_price_fri mismatch? } @@ -576,7 +571,7 @@ async fn gas_price_limits(#[case] maximum: bool) { context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); - let mut block_info = block_info(BlockNumber(0)); + let mut block_info = block_info(BlockNumber(0), 0); if maximum { // Set the gas price to the maximum value. @@ -596,13 +591,12 @@ async fn gas_price_limits(#[case] maximum: bool) { block_info.l1_data_gas_price_wei.wei_to_fri(ETH_TO_FRI_RATE).unwrap(); } - // Send the block info, some transactions and then fin. - let content_receiver = send_proposal_to_validator_context(&mut context, block_info).await; + // Send transactions and then fin. + let content_receiver = send_proposal_to_validator_context(&mut context).await; // Even though we used the minimum/maximum gas price, not the values we gave the provider, // the proposal should be still be valid due to the clamping of limit prices. - let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + let fin_receiver = context.validate_proposal(block_info, TIMEOUT, content_receiver).await; assert_eq!(fin_receiver.await, Ok(ProposalCommitment(STATE_DIFF_COMMITMENT.0.0))); } @@ -701,7 +695,6 @@ async fn oracle_fails_on_startup(#[case] l1_oracle_failure: bool) { let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); - assert_eq!(receiver.next().await.unwrap(), ProposalPart::Init(ProposalInit::default())); let block_info = receiver.next().await.unwrap(); let ProposalPart::BlockInfo(info) = block_info else { panic!("Expected ProposalPart::BlockInfo"); @@ -803,10 +796,9 @@ async fn oracle_fails_on_second_block(#[case] l1_oracle_failure: bool) { // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); - let content_receiver = - send_proposal_to_validator_context(&mut context, block_info(BlockNumber(0))).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + context.validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT, content_receiver).await; let proposal_commitment = fin_receiver.await.unwrap(); assert_eq!(proposal_commitment.0, STATE_DIFF_COMMITMENT.0.0); @@ -821,16 +813,13 @@ async fn oracle_fails_on_second_block(#[case] l1_oracle_failure: bool) { let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); - assert_eq!( - receiver.next().await.unwrap(), - ProposalPart::Init(ProposalInit { height: BlockNumber(1), ..Default::default() }) - ); let info = receiver.next().await.unwrap(); let ProposalPart::BlockInfo(info) = info else { panic!("Expected ProposalPart::BlockInfo"); }; + assert_eq!(info.height, BlockNumber(1)); - let previous_block_info = block_info(BlockNumber(0)); + let previous_block_info = block_info(BlockNumber(0), 0); assert_eq!(info.l1_gas_price_wei, previous_block_info.l1_gas_price_wei); assert_eq!(info.l1_data_gas_price_wei, previous_block_info.l1_data_gas_price_wei); @@ -1111,10 +1100,9 @@ async fn change_gas_price_overrides() { // Validate block number 0. context.set_height_and_round(BlockNumber(0), 0).await.unwrap(); - let content_receiver = - send_proposal_to_validator_context(&mut context, block_info(BlockNumber(0))).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; let fin_receiver = - context.validate_proposal(ProposalInit::default(), TIMEOUT, content_receiver).await; + context.validate_proposal(block_info(BlockNumber(0), 0), TIMEOUT, content_receiver).await; let proposal_commitment = fin_receiver.await.unwrap(); assert_eq!(proposal_commitment.0, STATE_DIFF_COMMITMENT.0.0); @@ -1130,22 +1118,21 @@ async fn change_gas_price_overrides() { // Validate block number 1, round 0. context.set_height_and_round(BlockNumber(1), 0).await.unwrap(); - let init = ProposalInit { height: BlockNumber(1), ..Default::default() }; // This should fail, since the gas price is different from the input block info. - let content_receiver = - send_proposal_to_validator_context(&mut context, block_info(BlockNumber(1))).await; - let fin_receiver = context.validate_proposal(init, TIMEOUT, content_receiver).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; + let fin_receiver = + context.validate_proposal(block_info(BlockNumber(1), 0), TIMEOUT, content_receiver).await; let proposal_commitment = fin_receiver.await.unwrap_err(); assert!(matches!(proposal_commitment, Canceled)); // Modify the incoming block info to make sure it matches the overrides. Now it passes. - let mut modified_block_info = block_info(BlockNumber(1)); + let mut modified_block_info = block_info(BlockNumber(1), 0); modified_block_info.l2_gas_price_fri = GasPrice(ODDLY_SPECIFIC_L2_GAS_PRICE); - let content_receiver = - send_proposal_to_validator_context(&mut context, modified_block_info.clone()).await; - let fin_receiver = context.validate_proposal(init, TIMEOUT, content_receiver).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; + let fin_receiver = + context.validate_proposal(modified_block_info, TIMEOUT, content_receiver).await; let proposal_commitment = fin_receiver.await.unwrap(); assert_eq!(proposal_commitment.0, STATE_DIFF_COMMITMENT.0.0); @@ -1159,26 +1146,23 @@ async fn change_gas_price_overrides() { // This should fail, as we have changed the config, without updating the block info. context.set_height_and_round(BlockNumber(1), 1).await.unwrap(); - let init = ProposalInit { height: BlockNumber(1), round: 1, ..Default::default() }; - - let content_receiver = - send_proposal_to_validator_context(&mut context, block_info(BlockNumber(1))).await; - let fin_receiver = context.validate_proposal(init, TIMEOUT, content_receiver).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; + let fin_receiver = + context.validate_proposal(block_info(BlockNumber(1), 1), TIMEOUT, content_receiver).await; let proposal_commitment = fin_receiver.await.unwrap_err(); assert!(matches!(proposal_commitment, Canceled)); // Add the new overrides so validation passes. - let mut modified_block_info = block_info(BlockNumber(1)); + let mut modified_block_info = block_info(BlockNumber(1), 1); modified_block_info.l1_data_gas_price_fri = GasPrice(ODDLY_SPECIFIC_L1_DATA_GAS_PRICE); // Note that the eth to fri conversion rate by default is 10^18 so we can just replace wei to // fri 1:1. modified_block_info.l1_data_gas_price_fri = GasPrice(ODDLY_SPECIFIC_L1_DATA_GAS_PRICE); - let content_receiver = - send_proposal_to_validator_context(&mut context, modified_block_info).await; - - let fin_receiver = context.validate_proposal(init, TIMEOUT, content_receiver).await; + let content_receiver = send_proposal_to_validator_context(&mut context).await; + let fin_receiver = + context.validate_proposal(modified_block_info, TIMEOUT, content_receiver).await; let proposal_commitment = fin_receiver.await.unwrap(); assert_eq!(proposal_commitment.0, STATE_DIFF_COMMITMENT.0.0); @@ -1192,15 +1176,20 @@ async fn change_gas_price_overrides() { let config_manager_client = make_config_manager_client(new_dynamic_config); context.deps.config_manager_client = Some(Arc::new(config_manager_client)); - let init = ProposalInit { height: BlockNumber(2), ..Default::default() }; - - let fin_receiver = context.build_proposal(init, TIMEOUT).await.unwrap().await.unwrap(); + let fin_receiver = context + .build_proposal(ProposalInit { height: BlockNumber(2), ..Default::default() }, TIMEOUT) + .await + .unwrap() + .await + .unwrap(); assert_eq!(fin_receiver.0, STATE_DIFF_COMMITMENT.0.0); let (_, mut receiver) = network.outbound_proposal_receiver.next().await.unwrap(); - assert_eq!(receiver.next().await.unwrap(), init.into()); - let _info = receiver.next().await.unwrap(); + let info = receiver.next().await.unwrap(); + let ProposalPart::BlockInfo(_) = info else { + panic!("Expected ProposalPart::BlockInfo"); + }; } fn make_config_manager_client(provider_config: ContextDynamicConfig) -> MockConfigManagerClient { diff --git a/crates/apollo_consensus_orchestrator/src/test_utils.rs b/crates/apollo_consensus_orchestrator/src/test_utils.rs index f6218851e52..253335ea0e8 100644 --- a/crates/apollo_consensus_orchestrator/src/test_utils.rs +++ b/crates/apollo_consensus_orchestrator/src/test_utils.rs @@ -349,7 +349,7 @@ pub(crate) fn generate_invoke_tx(nonce: u8) -> ConsensusTransaction { })) } -pub(crate) fn block_info(height: BlockNumber) -> ConsensusBlockInfo { +pub(crate) fn block_info(height: BlockNumber, round: u32) -> ConsensusBlockInfo { let context_config = ContextConfig::default(); let l1_gas_price_wei = GasPrice(TEMP_ETH_GAS_FEE_IN_WEI + context_config.dynamic_config.l1_gas_tip_wei); @@ -365,6 +365,9 @@ pub(crate) fn block_info(height: BlockNumber) -> ConsensusBlockInfo { .expect("L1 data gas price must be non-zero"); ConsensusBlockInfo { height, + round, + valid_round: None, + proposer: Default::default(), timestamp: chrono::Utc::now().timestamp().try_into().expect("Timestamp conversion failed"), builder: Default::default(), l1_da_mode: L1DataAvailabilityMode::Blob, @@ -380,11 +383,9 @@ pub(crate) fn block_info(height: BlockNumber) -> ConsensusBlockInfo { // content_receiver. pub(crate) async fn send_proposal_to_validator_context( context: &mut SequencerConsensusContext, - block_info: ConsensusBlockInfo, ) -> mpsc::Receiver { let (mut content_sender, content_receiver) = mpsc::channel(context.get_config().static_config.proposal_buffer_size); - content_sender.send(ProposalPart::BlockInfo(block_info)).await.unwrap(); content_sender .send(ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() })) .await diff --git a/crates/apollo_consensus_orchestrator/src/utils_test.rs b/crates/apollo_consensus_orchestrator/src/utils_test.rs index b8cbd71d9ae..0a763eade45 100644 --- a/crates/apollo_consensus_orchestrator/src/utils_test.rs +++ b/crates/apollo_consensus_orchestrator/src/utils_test.rs @@ -36,6 +36,9 @@ async fn get_block_info(args: &ProposalBuildArguments) -> ConsensusBlockInfo { ConsensusBlockInfo { height: args.proposal_init.height, + round: args.proposal_init.round, + valid_round: args.proposal_init.valid_round, + proposer: args.proposal_init.proposer, timestamp, builder: args.builder_address, l1_da_mode: args.l1_da_mode, diff --git a/crates/apollo_consensus_orchestrator/src/validate_proposal.rs b/crates/apollo_consensus_orchestrator/src/validate_proposal.rs index 5ad7f3e2cee..f82538775a8 100644 --- a/crates/apollo_consensus_orchestrator/src/validate_proposal.rs +++ b/crates/apollo_consensus_orchestrator/src/validate_proposal.rs @@ -54,6 +54,7 @@ const GAS_PRICE_ABS_DIFF_MARGIN: u128 = 1; pub(crate) struct ProposalValidateArguments { pub deps: SequencerConsensusContextDeps, + pub block_info: ConsensusBlockInfo, pub block_info_validation: BlockInfoValidation, pub proposal_id: ProposalId, pub timeout: Duration, @@ -81,11 +82,6 @@ enum HandledProposalPart { Failed(String), } -enum SecondProposalPart { - BlockInfo(ConsensusBlockInfo), - Fin(ProposalFin), -} - type ValidateProposalResult = Result; #[derive(Debug, thiserror::Error, EnumDiscriminants)] @@ -114,8 +110,6 @@ pub(crate) enum ValidateProposalError { ValidationTimeout(String), #[error("Proposal interrupted while {0}")] ProposalInterrupted(String), - #[error("Got an invalid second proposal part: {0:?}.")] - InvalidSecondProposalPart(Option), #[error("Batcher returned Invalid status: {0}.")] InvalidProposal(String), #[error("Proposal part {1:?} failed validation: {0}.")] @@ -137,25 +131,9 @@ pub(crate) async fn validate_proposal( return Err(ValidateProposalError::CannotCalculateDeadline { timeout: args.timeout, now }); }; - let block_info = match await_second_proposal_part( - &args.cancel_token, - deadline, - &mut args.content_receiver, - args.deps.clock.as_ref(), - ) - .await? - { - SecondProposalPart::BlockInfo(block_info) => block_info, - SecondProposalPart::Fin(ProposalFin { - proposal_commitment, - executed_transaction_count: _, - }) => { - return Ok(proposal_commitment); - } - }; is_block_info_valid( - args.block_info_validation.clone(), - block_info.clone(), + &args.block_info_validation, + &args.block_info, args.deps.clock.as_ref(), args.deps.l1_gas_price_provider, &args.gas_price_params, @@ -165,7 +143,7 @@ pub(crate) async fn validate_proposal( initiate_validation( args.deps.batcher.clone(), args.deps.state_sync_client, - block_info.clone(), + &args.block_info, args.proposal_id, args.timeout + args.batcher_timeout_margin, args.deps.clock.as_ref(), @@ -224,7 +202,7 @@ pub(crate) async fn validate_proposal( valid_proposals.insert_proposal_for_height( &args.block_info_validation.height, &built_block, - block_info, + args.block_info, content, &args.proposal_id, ); @@ -240,8 +218,8 @@ pub(crate) async fn validate_proposal( #[instrument(level = "warn", skip_all, fields(?block_info_validation, ?block_info_proposed))] async fn is_block_info_valid( - block_info_validation: BlockInfoValidation, - block_info_proposed: ConsensusBlockInfo, + block_info_validation: &BlockInfoValidation, + block_info_proposed: &ConsensusBlockInfo, clock: &dyn Clock, l1_gas_price_provider: Arc, gas_price_params: &GasPriceParams, @@ -306,8 +284,8 @@ async fn is_block_info_valid( )) { return Err(ValidateProposalError::InvalidBlockInfo( - block_info_proposed, - block_info_validation, + block_info_proposed.clone(), + block_info_validation.clone(), format!( "L1 gas price mismatch: expected L1 gas price FRI={l1_gas_price_fri}, \ proposed={l1_gas_price_fri_proposed}, expected L1 data gas price \ @@ -334,45 +312,10 @@ fn within_margin(number1: GasPrice, number2: GasPrice, margin_percent: u128) -> // The second proposal part when validating a proposal must be: // 1. Fin - empty proposal. // 2. BlockInfo - required to begin executing TX batches. -async fn await_second_proposal_part( - cancel_token: &CancellationToken, - deadline: DateTime, - content_receiver: &mut mpsc::Receiver, - clock: &dyn Clock, -) -> ValidateProposalResult { - tokio::select! { - _ = cancel_token.cancelled() => { - Err(ValidateProposalError::ProposalInterrupted( - "waiting for second proposal part".to_string(), - )) - } - _ = clock.sleep_until(deadline) => { - Err(ValidateProposalError::ValidationTimeout( - "waiting for second proposal part".to_string(), - )) - } - proposal_part = content_receiver.next() => { - match proposal_part { - Some(ProposalPart::BlockInfo(block_info)) => { - Ok(SecondProposalPart::BlockInfo(block_info)) - } - Some(ProposalPart::Fin(ProposalFin { proposal_commitment, executed_transaction_count })) => { - warn!("Received an empty proposal."); - Ok(SecondProposalPart::Fin(ProposalFin { proposal_commitment, executed_transaction_count })) - } - x => { - Err(ValidateProposalError::InvalidSecondProposalPart(x - )) - } - } - } - } -} - async fn initiate_validation( batcher: Arc, state_sync_client: Arc, - block_info: ConsensusBlockInfo, + block_info: &ConsensusBlockInfo, proposal_id: ProposalId, timeout_plus_margin: Duration, clock: &dyn Clock, @@ -386,11 +329,11 @@ async fn initiate_validation( retrospective_block_hash: retrospective_block_hash( batcher.clone(), state_sync_client, - &block_info, + block_info, ) .await .map_err(ValidateProposalError::from)?, - block_info: convert_to_sn_api_block_info(&block_info)?, + block_info: convert_to_sn_api_block_info(block_info)?, }; debug!("Initiating validate proposal: input={input:?}"); batcher.validate_block(input.clone()).await.map_err(|err| { diff --git a/crates/apollo_consensus_orchestrator/src/validate_proposal_test.rs b/crates/apollo_consensus_orchestrator/src/validate_proposal_test.rs index 240f0f713c1..a64d8cc1208 100644 --- a/crates/apollo_consensus_orchestrator/src/validate_proposal_test.rs +++ b/crates/apollo_consensus_orchestrator/src/validate_proposal_test.rs @@ -8,12 +8,12 @@ use apollo_batcher_types::batcher_types::{ SendProposalContent, SendProposalContentInput, SendProposalContentResponse, - StartHeightInput, }; -use apollo_batcher_types::communication::{BatcherClient, BatcherClientError}; +use apollo_batcher_types::communication::BatcherClientError; use apollo_consensus_orchestrator_config::config::ContextConfig; use apollo_infra::component_client::ClientError; use apollo_protobuf::consensus::{ + ConsensusBlockInfo, ProposalCommitment as ConsensusProposalCommitment, ProposalFin, ProposalPart, @@ -53,6 +53,7 @@ use crate::validate_proposal::{ struct TestProposalValidateArguments { pub deps: TestDeps, + pub block_info: ConsensusBlockInfo, pub block_info_validation: BlockInfoValidation, pub proposal_id: ProposalId, pub timeout: Duration, @@ -67,6 +68,7 @@ impl From for ProposalValidateArguments { fn from(args: TestProposalValidateArguments) -> Self { ProposalValidateArguments { deps: args.deps.into(), + block_info: args.block_info, block_info_validation: args.block_info_validation, proposal_id: args.proposal_id, timeout: args.timeout, @@ -83,6 +85,7 @@ fn create_proposal_validate_arguments() -> (TestProposalValidateArguments, mpsc::Sender) { let (mut deps, _) = create_test_and_network_deps(); deps.setup_default_expectations(); + let block_info = block_info(BlockNumber(0), 0); let block_info_validation = BlockInfoValidation { height: BlockNumber(0), block_timestamp_window_seconds: 60, @@ -102,6 +105,7 @@ fn create_proposal_validate_arguments() ( TestProposalValidateArguments { deps, + block_info, block_info_validation, proposal_id, timeout, @@ -117,7 +121,22 @@ fn create_proposal_validate_arguments() #[tokio::test] async fn validate_empty_proposal() { - let (proposal_args, mut content_sender) = create_proposal_validate_arguments(); + let (mut proposal_args, mut content_sender) = create_proposal_validate_arguments(); + // Empty proposals call validate_block and send Finish (no Txs) + proposal_args.deps.batcher.expect_validate_block().times(1).returning(|_| Ok(())); + proposal_args + .deps + .batcher + .expect_start_height() + .withf(|input| input.height == BlockNumber(0)) + .return_const(Ok(())); + proposal_args.deps.batcher.expect_send_proposal_content().times(1).returning(|input| { + assert!(matches!(input.content, SendProposalContent::Finish(_))); + Ok(SendProposalContentResponse { + response: ProposalStatus::Finished(ProposalCommitment::default()), + }) + }); + // Send an empty proposal. content_sender .send(ProposalPart::Fin(ProposalFin { @@ -136,21 +155,11 @@ async fn validate_proposal_success() { let (mut proposal_args, mut content_sender) = create_proposal_validate_arguments(); let n_executed_txs_count = 1; // Setup deps to validate the block. - proposal_args - .deps - .setup_deps_for_validate(SetupDepsArgs { n_executed_txs_count, ..Default::default() }); - - // Batcher will expect a "start height" due to setup_deps_for_validate. - proposal_args - .deps - .batcher - .start_height(StartHeightInput { height: BlockNumber(0) }) - .await - .unwrap(); - - // Send a valid block info. - let block_info = block_info(BlockNumber(0)); - content_sender.send(ProposalPart::BlockInfo(block_info)).await.unwrap(); + proposal_args.deps.setup_deps_for_validate(SetupDepsArgs { + n_executed_txs_count, + expect_start_height: false, + ..Default::default() + }); // Send transactions and finally Fin part with executed transaction count. content_sender .send(ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.clone() })) @@ -170,7 +179,20 @@ async fn validate_proposal_success() { #[tokio::test] async fn interrupt_proposal() { - let (proposal_args, _content_sender) = create_proposal_validate_arguments(); + let (mut proposal_args, _content_sender) = create_proposal_validate_arguments(); + // Interrupted proposals call validate_block and send Abort + proposal_args.deps.batcher.expect_validate_block().times(1).returning(|_| Ok(())); + proposal_args + .deps + .batcher + .expect_start_height() + .withf(|input| input.height == BlockNumber(0)) + .return_const(Ok(())); + proposal_args.deps.batcher.expect_send_proposal_content().times(1).returning(|input| { + assert!(matches!(input.content, SendProposalContent::Abort)); + Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) + }); + // Interrupt the proposal. proposal_args.cancel_token.cancel(); @@ -181,6 +203,19 @@ async fn interrupt_proposal() { #[tokio::test] async fn validation_timeout() { let (mut proposal_args, _content_sender) = create_proposal_validate_arguments(); + // Timed out proposals call validate_block and send Abort + proposal_args.deps.batcher.expect_validate_block().times(1).returning(|_| Ok(())); + proposal_args + .deps + .batcher + .expect_start_height() + .withf(|input| input.height == BlockNumber(0)) + .return_const(Ok(())); + proposal_args.deps.batcher.expect_send_proposal_content().times(1).returning(|input| { + assert!(matches!(input.content, SendProposalContent::Abort)); + Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) + }); + // Set a very short timeout to trigger a timeout error. proposal_args.timeout = Duration::from_micros(1); @@ -188,27 +223,13 @@ async fn validation_timeout() { assert!(matches!(res, Err(ValidateProposalError::ValidationTimeout(_)))); } -#[tokio::test] -async fn invalid_second_proposal_part() { - let (proposal_args, mut content_sender) = create_proposal_validate_arguments(); - // Send an invalid proposal part (not BlockInfo or Fin). Send Transactions as 2nd part. - content_sender - .send(ProposalPart::Transactions(TransactionBatch { transactions: vec![] })) - .await - .unwrap(); - - let res = validate_proposal(proposal_args.into()).await; - assert!(matches!(res, Err(ValidateProposalError::InvalidSecondProposalPart(_)))); -} - #[tokio::test] async fn invalid_block_info() { - let (proposal_args, mut content_sender) = create_proposal_validate_arguments(); + let (mut proposal_args, mut content_sender) = create_proposal_validate_arguments(); - let mut block_info = block_info(BlockNumber(0)); - block_info.l2_gas_price_fri = + proposal_args.block_info.l2_gas_price_fri = GasPrice(proposal_args.block_info_validation.l2_gas_price_fri.0 + 1); - content_sender.send(ProposalPart::BlockInfo(block_info)).await.unwrap(); + content_sender.send(ProposalPart::BlockInfo(proposal_args.block_info.clone())).await.unwrap(); let res = validate_proposal(proposal_args.into()).await; assert!(matches!(res, Err(ValidateProposalError::InvalidBlockInfo(_, _, _)))); @@ -216,14 +237,11 @@ async fn invalid_block_info() { #[tokio::test] async fn validate_block_fail() { - let (mut proposal_args, mut content_sender) = create_proposal_validate_arguments(); + let (mut proposal_args, _content_sender) = create_proposal_validate_arguments(); // Setup batcher to return an error when validating the block. proposal_args.deps.batcher.expect_validate_block().returning(|_| { Err(BatcherClientError::ClientError(ClientError::CommunicationFailure("".to_string()))) }); - // Send a valid block info. - let block_info = block_info(BlockNumber(0)); - content_sender.send(ProposalPart::BlockInfo(block_info)).await.unwrap(); let res = validate_proposal(proposal_args.into()).await; assert_matches!(res, Err(ValidateProposalError::Batcher(msg,_ )) @@ -253,9 +271,6 @@ async fn proposal_fin_mismatch() { }), }) }); - // Send a valid block info. - let block_info = block_info(BlockNumber(0)); - content_sender.send(ProposalPart::BlockInfo(block_info)).await.unwrap(); let received_fin = ConsensusProposalCommitment::default(); content_sender .send(ProposalPart::Fin(ProposalFin { @@ -289,9 +304,6 @@ async fn batcher_returns_invalid_proposal() { response: ProposalStatus::InvalidProposal("test error".to_string()), }) }); - // Send a valid block info. - let block_info = block_info(BlockNumber(0)); - content_sender.send(ProposalPart::BlockInfo(block_info)).await.unwrap(); content_sender .send(ProposalPart::Fin(ProposalFin { proposal_commitment: ConsensusProposalCommitment::default(), diff --git a/crates/apollo_integration_tests/src/flow_test_setup.rs b/crates/apollo_integration_tests/src/flow_test_setup.rs index e27eaf3adef..6b9579c2064 100644 --- a/crates/apollo_integration_tests/src/flow_test_setup.rs +++ b/crates/apollo_integration_tests/src/flow_test_setup.rs @@ -393,7 +393,7 @@ impl TxCollector { let StreamMessage { stream_id: first_stream_id, - message: init_message, + message: block_info_message, message_id: incoming_message_id, } = messages_cache.remove(&0).expect("Stream is missing its first message"); @@ -401,15 +401,16 @@ impl TxCollector { incoming_message_id, 0, "Expected the first message in the stream to have id 0, got {incoming_message_id}" ); - let StreamMessageBody::Content(ProposalPart::Init(incoming_proposal_init)) = init_message + let StreamMessageBody::Content(ProposalPart::BlockInfo(incoming_block_info)) = + block_info_message else { - panic!("Expected an init message. Got: {init_message:?}") + panic!("Expected a block info message. Got: {block_info_message:?}") }; self.accumulated_txs .lock() .await - .start_round(incoming_proposal_init.height, incoming_proposal_init.round); + .start_round(incoming_block_info.height, incoming_block_info.round); let mut got_proposal_fin = false; let mut got_channel_fin = false; @@ -418,15 +419,12 @@ impl TxCollector { messages_cache.remove(&i).expect("Stream should have all consecutive messages"); assert_eq!(stream_id, first_stream_id, "Expected the same stream id for all messages"); match message { - StreamMessageBody::Content(ProposalPart::Init(init)) => { - panic!("Unexpected init: {init:?}") + StreamMessageBody::Content(ProposalPart::BlockInfo(block_info)) => { + panic!("Unexpected block info: {block_info:?}") } StreamMessageBody::Content(ProposalPart::Fin(..)) => { got_proposal_fin = true; } - StreamMessageBody::Content(ProposalPart::BlockInfo(_)) => { - // TODO(Asmaa): Add validation for block info. - } StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => { // TODO(Arni): add calculate_transaction_hash to consensus transaction and use // it here. diff --git a/crates/apollo_protobuf/src/consensus.rs b/crates/apollo_protobuf/src/consensus.rs index 324224249e0..fdafd75a3b8 100644 --- a/crates/apollo_protobuf/src/consensus.rs +++ b/crates/apollo_protobuf/src/consensus.rs @@ -69,7 +69,7 @@ pub struct StreamMessage { pub message_id: u64, } -/// This message must be sent first when proposing a new block. +/// Contains the minimal information needed to start building a proposal. #[derive(Clone, Copy, Debug, PartialEq)] pub struct ProposalInit { /// The height of the consensus (block number). @@ -82,21 +82,37 @@ pub struct ProposalInit { pub proposer: ContractAddress, } +/// This message must be sent first when proposing a new block. /// This struct differs from `BlockInfo` in `starknet_api` because we send L1 gas prices in ETH and /// include the ETH to STRK conversion rate. This allows for more informative validations, as we can /// distinguish whether an issue comes from the L1 price reading or the conversion rate instead of /// comparing after multiplication. #[derive(Clone, Debug, PartialEq)] pub struct ConsensusBlockInfo { + /// The height of the consensus (block number). pub height: BlockNumber, + /// The current round of the consensus. + pub round: Round, + /// The last round that was valid. + pub valid_round: Option, + /// Address of the one who proposed the block in consensus. + pub proposer: ContractAddress, + /// Block timestamp. pub timestamp: u64, + /// Address of the one who builds/sequences the block. pub builder: ContractAddress, + /// L1 data availability mode. pub l1_da_mode: L1DataAvailabilityMode, + /// L2 gas price in FRI. pub l2_gas_price_fri: GasPrice, + /// L1 gas price in FRI. pub l1_gas_price_fri: GasPrice, + /// L1 data gas price in FRI. pub l1_data_gas_price_fri: GasPrice, // Keeping the wei prices for now, to use with L1 transactions. + /// L1 gas price in WEI. pub l1_gas_price_wei: GasPrice, + /// L1 data gas price in WEI. pub l1_data_gas_price_wei: GasPrice, } @@ -115,6 +131,25 @@ impl Default for ProposalInit { } } +impl Default for ConsensusBlockInfo { + fn default() -> Self { + ConsensusBlockInfo { + height: Default::default(), + round: Default::default(), + valid_round: Default::default(), + proposer: ContractAddress::from(DEFAULT_VALIDATOR_ID), + builder: ContractAddress::from(DEFAULT_VALIDATOR_ID), + timestamp: Default::default(), + l1_da_mode: L1DataAvailabilityMode::Calldata, + l2_gas_price_fri: Default::default(), + l1_gas_price_fri: Default::default(), + l1_data_gas_price_fri: Default::default(), + l1_gas_price_wei: Default::default(), + l1_data_gas_price_wei: Default::default(), + } + } +} + /// There is one or more batches of transactions in a proposed block. #[derive(Debug, Clone, PartialEq)] pub struct TransactionBatch { @@ -135,34 +170,32 @@ pub struct ProposalFin { /// A part of the proposal. #[derive(Debug, Clone, PartialEq)] pub enum ProposalPart { - /// The initialization part of the proposal. - Init(ProposalInit), - /// Identifies the content of the proposal; contains `id(v)` in Tendermint terms. - Fin(ProposalFin), /// The block info part of the proposal. BlockInfo(ConsensusBlockInfo), + /// Identifies the content of the proposal; contains `id(v)` in Tendermint terms. + Fin(ProposalFin), /// A part of the proposal that contains one or more transactions. Transactions(TransactionBatch), } -impl TryInto for ProposalPart { +impl TryInto for ProposalPart { type Error = ProtobufConversionError; - fn try_into(self: ProposalPart) -> Result { + fn try_into(self: ProposalPart) -> Result { match self { - ProposalPart::Init(init) => Ok(init), + ProposalPart::BlockInfo(block_info) => Ok(block_info), _ => Err(ProtobufConversionError::WrongEnumVariant { type_description: "ProposalPart", - expected: "Init", + expected: "BlockInfo", value_as_str: format!("{self:?}"), }), } } } -impl From for ProposalPart { - fn from(value: ProposalInit) -> Self { - ProposalPart::Init(value) +impl From for ProposalPart { + fn from(value: ConsensusBlockInfo) -> Self { + ProposalPart::BlockInfo(value) } } diff --git a/crates/apollo_protobuf/src/converters/consensus.rs b/crates/apollo_protobuf/src/converters/consensus.rs index 0e4a309e812..a771d186219 100644 --- a/crates/apollo_protobuf/src/converters/consensus.rs +++ b/crates/apollo_protobuf/src/converters/consensus.rs @@ -19,7 +19,6 @@ use crate::consensus::{ IntoFromProto, ProposalCommitment, ProposalFin, - ProposalInit, ProposalPart, StreamMessage, StreamMessageBody, @@ -185,34 +184,13 @@ where } } -impl TryFrom for ProposalInit { +impl TryFrom for ConsensusBlockInfo { type Error = ProtobufConversionError; - fn try_from(value: protobuf::ProposalInit) -> Result { - let height = value.height; + fn try_from(value: protobuf::BlockInfo) -> Result { + let height = BlockNumber(value.height); let round = value.round; let valid_round = value.valid_round; let proposer = value.proposer.ok_or(missing("proposer"))?.try_into()?; - Ok(ProposalInit { height: BlockNumber(height), round, valid_round, proposer }) - } -} - -impl From for protobuf::ProposalInit { - fn from(value: ProposalInit) -> Self { - protobuf::ProposalInit { - height: value.height.0, - round: value.round, - valid_round: value.valid_round, - proposer: Some(value.proposer.into()), - } - } -} - -auto_impl_into_and_try_from_vec_u8!(ProposalInit, protobuf::ProposalInit); - -impl TryFrom for ConsensusBlockInfo { - type Error = ProtobufConversionError; - fn try_from(value: protobuf::BlockInfo) -> Result { - let height = value.height; let timestamp = value.timestamp; let builder = value.builder.ok_or(missing("builder"))?.try_into()?; let l1_da_mode = enum_int_to_l1_data_availability_mode(value.l1_da_mode)?; @@ -227,7 +205,10 @@ impl TryFrom for ConsensusBlockInfo { let l1_data_gas_price_wei = GasPrice(value.l1_data_gas_price_wei.ok_or(missing("l1_data_gas_price_wei"))?.into()); Ok(ConsensusBlockInfo { - height: BlockNumber(height), + height, + round, + valid_round, + proposer, timestamp, builder, l1_da_mode, @@ -244,6 +225,9 @@ impl From for protobuf::BlockInfo { fn from(value: ConsensusBlockInfo) -> Self { protobuf::BlockInfo { height: value.height.0, + round: value.round, + valid_round: value.valid_round, + proposer: Some(value.proposer.into()), timestamp: value.timestamp, builder: Some(value.builder.into()), l1_da_mode: l1_data_availability_mode_to_enum_int(value.l1_da_mode), @@ -310,9 +294,8 @@ impl TryFrom for ProposalPart { }; match part { - Message::Init(init) => Ok(ProposalPart::Init(init.try_into()?)), - Message::Fin(fin) => Ok(ProposalPart::Fin(fin.try_into()?)), Message::BlockInfo(block_info) => Ok(ProposalPart::BlockInfo(block_info.try_into()?)), + Message::Fin(fin) => Ok(ProposalPart::Fin(fin.try_into()?)), Message::Transactions(content) => Ok(ProposalPart::Transactions(content.try_into()?)), } } @@ -321,15 +304,12 @@ impl TryFrom for ProposalPart { impl From for protobuf::ProposalPart { fn from(value: ProposalPart) -> Self { match value { - ProposalPart::Init(init) => protobuf::ProposalPart { - message: Some(protobuf::proposal_part::Message::Init(init.into())), + ProposalPart::BlockInfo(block_info) => protobuf::ProposalPart { + message: Some(protobuf::proposal_part::Message::BlockInfo(block_info.into())), }, ProposalPart::Fin(fin) => protobuf::ProposalPart { message: Some(protobuf::proposal_part::Message::Fin(fin.into())), }, - ProposalPart::BlockInfo(block_info) => protobuf::ProposalPart { - message: Some(protobuf::proposal_part::Message::BlockInfo(block_info.into())), - }, ProposalPart::Transactions(content) => protobuf::ProposalPart { message: Some(protobuf::proposal_part::Message::Transactions(content.into())), }, diff --git a/crates/apollo_protobuf/src/converters/consensus_test.rs b/crates/apollo_protobuf/src/converters/consensus_test.rs index fd441502c53..b8969c3bf6e 100644 --- a/crates/apollo_protobuf/src/converters/consensus_test.rs +++ b/crates/apollo_protobuf/src/converters/consensus_test.rs @@ -14,7 +14,6 @@ use starknet_api::rpc_transaction::{ use crate::consensus::{ ConsensusBlockInfo, ProposalFin, - ProposalInit, ProposalPart, StreamMessage, StreamMessageBody, @@ -77,17 +76,6 @@ fn convert_vote_to_vec_u8_and_back() { assert_eq!(vote, res_data); } -#[test] -fn convert_proposal_init_to_vec_u8_and_back() { - let mut rng = get_rng(); - - let proposal_init = ProposalInit::get_test_instance(&mut rng); - - let bytes_data: Vec = proposal_init.into(); - let res_data = ProposalInit::try_from(bytes_data).unwrap(); - assert_eq!(proposal_init, res_data); -} - #[test] fn convert_block_info_to_vec_u8_and_back() { let mut rng = get_rng(); diff --git a/crates/apollo_protobuf/src/converters/test_instances.rs b/crates/apollo_protobuf/src/converters/test_instances.rs index 792c28a9ae6..b7884fb575b 100644 --- a/crates/apollo_protobuf/src/converters/test_instances.rs +++ b/crates/apollo_protobuf/src/converters/test_instances.rs @@ -14,7 +14,6 @@ use crate::consensus::{ ConsensusBlockInfo, ProposalCommitment, ProposalFin, - ProposalInit, ProposalPart, StreamMessage, StreamMessageBody, @@ -35,12 +34,6 @@ auto_impl_get_test_instance! { Prevote = 0, Precommit = 1, } - pub struct ProposalInit { - pub height: BlockNumber, - pub round: u32, - pub valid_round: Option, - pub proposer: ContractAddress, - } pub struct ProposalCommitment(pub StarkHash); pub struct ProposalFin { pub proposal_commitment: ProposalCommitment, @@ -51,6 +44,9 @@ auto_impl_get_test_instance! { } pub struct ConsensusBlockInfo { pub height: BlockNumber, + pub round: u32, + pub valid_round: Option, + pub proposer: ContractAddress, pub timestamp: u64, pub builder: ContractAddress, pub l1_da_mode: L1DataAvailabilityMode, @@ -61,10 +57,9 @@ auto_impl_get_test_instance! { pub l1_data_gas_price_wei: GasPrice, } pub enum ProposalPart { - Init(ProposalInit) = 0, + BlockInfo(ConsensusBlockInfo) = 0, Fin(ProposalFin) = 1, - BlockInfo(ConsensusBlockInfo) = 2, - Transactions(TransactionBatch) = 3, + Transactions(TransactionBatch) = 2, } } diff --git a/crates/apollo_protobuf/src/proto/p2p/proto/consensus/consensus.proto b/crates/apollo_protobuf/src/proto/p2p/proto/consensus/consensus.proto index 43481ea2119..3ee4229cb99 100644 --- a/crates/apollo_protobuf/src/proto/p2p/proto/consensus/consensus.proto +++ b/crates/apollo_protobuf/src/proto/p2p/proto/consensus/consensus.proto @@ -42,23 +42,19 @@ message StreamMessage { uint64 message_id = 4; } -message ProposalInit { - uint64 height = 1; - uint32 round = 2; - optional uint32 valid_round = 3; - Address proposer = 4; -} - message BlockInfo { uint64 height = 1; - uint64 timestamp = 2; - Address builder = 3; - L1DataAvailabilityMode l1_da_mode = 4; - Uint128 l2_gas_price_fri = 5; - Uint128 l1_gas_price_fri = 6; - Uint128 l1_data_gas_price_fri = 7; - Uint128 l1_gas_price_wei = 8; - Uint128 l1_data_gas_price_wei = 9; + uint32 round = 2; + optional uint32 valid_round = 3; + Address proposer = 4; + uint64 timestamp = 5; + Address builder = 6; + L1DataAvailabilityMode l1_da_mode = 7; + Uint128 l2_gas_price_fri = 8; + Uint128 l1_gas_price_fri = 9; + Uint128 l1_data_gas_price_fri = 10; + Uint128 l1_gas_price_wei = 11; + Uint128 l1_data_gas_price_wei = 12; } message TransactionBatch { @@ -73,19 +69,13 @@ message ProposalFin { } // Network format: -// 1. First message is ProposalInit -// 2. block_info is sent once +// 1. First message is BlockInfo (includes all block metadata) +// 2. transactions is sent repeatedly (for non-empty blocks) // 3. Last message is ProposalFin -// -// Empty block - no other messages sent. -// -// Block with transactions: -// 4. transactions is sent repeatedly message ProposalPart { oneof message { - ProposalInit init = 1; - ProposalFin fin = 2; - BlockInfo block_info = 3; - TransactionBatch transactions = 4; + BlockInfo block_info = 1; + ProposalFin fin = 2; + TransactionBatch transactions = 3; } } diff --git a/crates/apollo_protobuf/src/protobuf/protoc_output.rs b/crates/apollo_protobuf/src/protobuf/protoc_output.rs index f3fd7d81c34..b89da57c52b 100644 --- a/crates/apollo_protobuf/src/protobuf/protoc_output.rs +++ b/crates/apollo_protobuf/src/protobuf/protoc_output.rs @@ -434,7 +434,7 @@ pub mod stream_message { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ProposalInit { +pub struct BlockInfo { #[prost(uint64, tag = "1")] pub height: u64, #[prost(uint32, tag = "2")] @@ -443,27 +443,21 @@ pub struct ProposalInit { pub valid_round: ::core::option::Option, #[prost(message, optional, tag = "4")] pub proposer: ::core::option::Option
, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BlockInfo { - #[prost(uint64, tag = "1")] - pub height: u64, - #[prost(uint64, tag = "2")] + #[prost(uint64, tag = "5")] pub timestamp: u64, - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "6")] pub builder: ::core::option::Option
, - #[prost(enumeration = "L1DataAvailabilityMode", tag = "4")] + #[prost(enumeration = "L1DataAvailabilityMode", tag = "7")] pub l1_da_mode: i32, - #[prost(message, optional, tag = "5")] + #[prost(message, optional, tag = "8")] pub l2_gas_price_fri: ::core::option::Option, - #[prost(message, optional, tag = "6")] + #[prost(message, optional, tag = "9")] pub l1_gas_price_fri: ::core::option::Option, - #[prost(message, optional, tag = "7")] + #[prost(message, optional, tag = "10")] pub l1_data_gas_price_fri: ::core::option::Option, - #[prost(message, optional, tag = "8")] + #[prost(message, optional, tag = "11")] pub l1_gas_price_wei: ::core::option::Option, - #[prost(message, optional, tag = "9")] + #[prost(message, optional, tag = "12")] pub l1_data_gas_price_wei: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -483,18 +477,13 @@ pub struct ProposalFin { pub executed_transaction_count: u64, } /// Network format: -/// 1. First message is ProposalInit -/// 2. block_info is sent once +/// 1. First message is BlockInfo (includes all block metadata) +/// 2. transactions is sent repeatedly (for non-empty blocks) /// 3. Last message is ProposalFin -/// -/// Empty block - no other messages sent. -/// -/// Block with transactions: -/// 4. transactions is sent repeatedly #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ProposalPart { - #[prost(oneof = "proposal_part::Message", tags = "1, 2, 3, 4")] + #[prost(oneof = "proposal_part::Message", tags = "1, 2, 3")] pub message: ::core::option::Option, } /// Nested message and enum types in `ProposalPart`. @@ -503,12 +492,10 @@ pub mod proposal_part { #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Message { #[prost(message, tag = "1")] - Init(super::ProposalInit), + BlockInfo(super::BlockInfo), #[prost(message, tag = "2")] Fin(super::ProposalFin), #[prost(message, tag = "3")] - BlockInfo(super::BlockInfo), - #[prost(message, tag = "4")] Transactions(super::TransactionBatch), } }