Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 42 additions & 40 deletions crates/apollo_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use apollo_consensus_config::config::{
use apollo_infra_utils::debug_every_n_ms;
use apollo_network::network_manager::BroadcastTopicClientTrait;
use apollo_network_types::network_types::BroadcastedMessageMetadata;
use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType};
use apollo_protobuf::consensus::{ConsensusBlockInfo, ProposalInit, Vote, VoteType};
use apollo_protobuf::converters::ProtobufConversionError;
use apollo_time::time::{Clock, ClockExt, DefaultClock};
use futures::channel::mpsc;
Expand Down Expand Up @@ -95,7 +95,7 @@ impl std::fmt::Debug for RunConsensusArguments {
/// - `vote_receiver`: The channels to receive votes from the network. These are self contained
/// messages.
/// - `proposals_receiver`: The channel to receive proposals from the network. Proposals are
/// represented as streams (ProposalInit, Content.*, ProposalFin).
/// represented as streams (ConsensusBlockInfo, Content.*, ProposalFin).
// Always print the validator ID since some tests collate multiple consensus logs in a single file.
#[instrument(
skip_all,
Expand Down Expand Up @@ -174,14 +174,14 @@ pub enum RunHeightRes {
Sync,
}

type ProposalReceiverTuple<T> = (ProposalInit, mpsc::Receiver<T>);
type ProposalReceiverTuple<T> = (ConsensusBlockInfo, mpsc::Receiver<T>);

/// Manages votes and proposals for future heights.
#[derive(Debug)]
struct ConsensusCache<ContextT: ConsensusContext> {
// Mapping: { Height : Vec<Vote> }
future_votes: BTreeMap<BlockNumber, Vec<Vote>>,
// Mapping: { Height : { Round : (Init, Receiver)}}
// Mapping: { Height : { Round : (BlockInfo, Receiver)}}
future_proposals_cache:
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
/// Configuration for determining which messages should be cached.
Expand Down Expand Up @@ -227,7 +227,7 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
fn get_current_height_proposals(
&mut self,
height: BlockNumber,
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
) -> Vec<(ConsensusBlockInfo, mpsc::Receiver<ContextT::ProposalPart>)> {
loop {
let Some(entry) = self.future_proposals_cache.first_entry() else {
return Vec::new();
Expand Down Expand Up @@ -259,14 +259,14 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
/// Caches a proposal for a future height.
fn cache_future_proposal(
&mut self,
proposal_init: ProposalInit,
block_info: ConsensusBlockInfo,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) {
self.future_proposals_cache
.entry(proposal_init.height)
.entry(block_info.height)
.or_default()
.entry(proposal_init.round)
.or_insert((proposal_init, content_receiver));
.entry(block_info.round)
.or_insert((block_info, content_receiver));
}

fn report_max_cached_block_number_metric(&self, height: BlockNumber) {
Expand Down Expand Up @@ -320,7 +320,7 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
&self,
current_height: &BlockNumber,
current_round: Round,
proposal: &ProposalInit,
proposal: &ConsensusBlockInfo,
) -> bool {
self.should_cache_msg(
current_height,
Expand Down Expand Up @@ -574,9 +574,16 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {

let cached_proposals = self.cache.get_current_height_proposals(height);
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
for (init, content_receiver) in cached_proposals {
let new_requests =
self.handle_proposal_known_init(context, height, shc, init, content_receiver).await;
for (block_info, content_receiver) in cached_proposals {
let new_requests = self
.handle_proposal_known_block_info(
context,
height,
shc,
block_info,
content_receiver,
)
.await;
pending_requests.extend(new_requests);
}

Expand Down Expand Up @@ -697,18 +704,18 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
return Ok(VecDeque::new());
};

let proposal_init: ProposalInit = match first_part.try_into() {
Ok(proposal_init) => proposal_init,
let block_info: ConsensusBlockInfo = match first_part.try_into() {
Ok(block_info) => block_info,
Err(e) => {
warn!("Failed to parse incoming proposal init. Dropping proposal: {e}");
warn!("Failed to parse incoming block info. Dropping proposal: {e}");
return Ok(VecDeque::new());
}
};

match proposal_init.height.cmp(&height) {
match block_info.height.cmp(&height) {
std::cmp::Ordering::Greater => {
if self.cache.should_cache_proposal(&height, 0, &proposal_init) {
debug!("Received a proposal for a future height. {:?}", proposal_init);
if self.cache.should_cache_proposal(&height, 0, &block_info) {
debug!("Received a proposal for a future height. {:?}", block_info);
// Note: new proposals with the same height/round will be ignored.
//
// TODO(matan): This only work for trusted peers. In the case of possibly
Expand All @@ -717,27 +724,23 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// "good" nodes can propose).
//
// When moving to version 1.0 make sure this is addressed.
self.cache.cache_future_proposal(proposal_init, content_receiver);
self.cache.cache_future_proposal(block_info, content_receiver);
}
Ok(VecDeque::new())
}
std::cmp::Ordering::Less => {
trace!("Drop proposal from past height. {:?}", proposal_init);
trace!("Drop proposal from past height. {:?}", block_info);
Ok(VecDeque::new())
}
std::cmp::Ordering::Equal => match shc {
Some(shc) => {
if self.cache.should_cache_proposal(
&height,
shc.current_round(),
&proposal_init,
) {
if self.cache.should_cache_proposal(&height, shc.current_round(), &block_info) {
Ok(self
.handle_proposal_known_init(
.handle_proposal_known_block_info(
context,
height,
shc,
proposal_init,
block_info,
content_receiver,
)
.await)
Expand All @@ -746,26 +749,25 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
}
None => {
trace!("Drop proposal from just completed height. {:?}", proposal_init);
trace!("Drop proposal from just completed height. {:?}", block_info);
Ok(VecDeque::new())
}
},
}
}

async fn handle_proposal_known_init(
async fn handle_proposal_known_block_info(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
block_info: ConsensusBlockInfo,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Requests {
// Store the stream; requests will reference it by (height, round)
self.current_height_proposals_streams
.insert((height, proposal_init.round), content_receiver);
self.current_height_proposals_streams.insert((height, block_info.round), content_receiver);
let leader_fn = make_leader_fn(context, height);
shc.handle_proposal(&leader_fn, proposal_init)
shc.handle_proposal(&leader_fn, block_info)
}

// Handle a single consensus message.
Expand Down Expand Up @@ -896,14 +898,14 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
.boxed();
Ok(Some(fut))
}
SMRequest::StartValidateProposal(init) => {
SMRequest::StartValidateProposal(block_info) => {
// Look up the stored stream.
let key = (height, init.round);
let round = block_info.round;
let valid_round = block_info.valid_round;
let key = (height, round);
if let Some(stream) = self.current_height_proposals_streams.remove(&key) {
let timeout = timeouts.get_proposal_timeout(init.round);
let receiver = context.validate_proposal(init, timeout, stream).await;
let round = init.round;
let valid_round = init.valid_round;
let timeout = timeouts.get_proposal_timeout(round);
let receiver = context.validate_proposal(block_info, timeout, stream).await;
let fut = async move {
let proposal_id = receiver.await.ok();
StateMachineEvent::FinishedValidation(proposal_id, round, valid_round)
Expand Down
34 changes: 17 additions & 17 deletions crates/apollo_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use starknet_types_core::felt::Felt;
use super::{run_consensus, MultiHeightManager, RunHeightRes};
use crate::storage::MockHeightVotedStorageTrait;
use crate::test_utils::{
block_info,
precommit,
prevote,
proposal_init,
MockTestContext,
NoOpHeightVotedStorage,
TestProposalPart,
Expand Down Expand Up @@ -148,15 +148,15 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) {
// Send messages for height 2 followed by those for height 1.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_2, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await;

send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await;
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn run_consensus_sync(consensus_config: ConsensusConfig) {
// Send messages for height 2.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_2, ROUND_0, *PROPOSER_ID))],
)
.await;
let TestSubscriberChannels { mock_network, subscriber_channels } =
Expand Down Expand Up @@ -279,7 +279,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) {

send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_2)).await;
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) {
// reach a decision.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_1, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_1, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *PROPOSER_ID)).await;
Expand All @@ -361,7 +361,7 @@ async fn timely_message_handling(consensus_config: ConsensusConfig) {
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(0);
let (mut content_sender, content_receiver) = mpsc::channel(0);
content_sender
.try_send(TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID)))
.try_send(TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID)))
.unwrap();
proposal_receiver_sender.try_send(content_receiver).unwrap();

Expand Down Expand Up @@ -416,7 +416,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu
// Send proposal and votes for height 2 (should be dropped when processing height 0).
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_2, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await;
Expand All @@ -425,7 +425,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu
// Send proposal and votes for height 1 (should be cached when processing height 0).
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await;
Expand All @@ -434,7 +434,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu
// Send proposal and votes for height 0 (current height - needed to reach consensus).
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_0, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_0, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ZERO), HEIGHT_0, ROUND_0, *PROPOSER_ID)).await;
Expand Down Expand Up @@ -543,12 +543,12 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C
// Send proposals for rounds 0 and 1, proposal for round 1 should be dropped.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
)
.await;
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_1, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_1, *PROPOSER_ID))],
)
.await;

Expand Down Expand Up @@ -627,7 +627,7 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C
// Send proposal for round 2.
send_proposal(
&mut proposal_sender_clone,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_2, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_2, *PROPOSER_ID))],
)
.await;
// Send votes for round 2.
Expand Down Expand Up @@ -809,7 +809,7 @@ async fn manager_runs_normally_when_height_is_greater_than_last_voted_height(
// Send a proposal for the height we already voted on:
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID)).await;
Expand Down Expand Up @@ -868,7 +868,7 @@ async fn manager_waits_until_height_passes_last_voted_height(consensus_config: C
// Send a proposal for the height we already voted on:
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID)).await;
Expand Down Expand Up @@ -990,7 +990,7 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) {
// Send proposal first
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT, ROUND_0, *PROPOSER_ID))],
)
.await;

Expand Down Expand Up @@ -1118,7 +1118,7 @@ async fn manager_ignores_invalid_network_messages(consensus_config: ConsensusCon
// Send a valid proposal and valid votes.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_1, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await;
Expand Down
Loading
Loading