From 359e4381f4c750c199a781a5ee21ea3ad235ce0e Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 17:54:30 +0500 Subject: [PATCH 1/7] Update consensus logic: get_attestation_target, aggregation broadcast, and attestation propagation --- src/lean_spec/subspecs/chain/service.py | 11 ++- .../subspecs/containers/state/state.py | 34 +++++++++ src/lean_spec/subspecs/forkchoice/store.py | 75 +++++++++---------- .../subspecs/networking/service/service.py | 31 +++++++- src/lean_spec/subspecs/node/node.py | 18 ++++- src/lean_spec/subspecs/sync/service.py | 46 +++++++++++- src/lean_spec/subspecs/validator/service.py | 3 + tests/interop/helpers/node_runner.py | 32 +++++++- tests/lean_spec/helpers/builders.py | 2 +- .../lean_spec/subspecs/chain/test_service.py | 7 +- .../forkchoice/test_attestation_target.py | 14 ++-- .../forkchoice/test_store_attestations.py | 38 +++++----- .../forkchoice/test_time_management.py | 16 ++-- 13 files changed, 237 insertions(+), 90 deletions(-) diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 17a86e20..475f3c3d 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -125,9 +125,10 @@ async def run(self) -> None: # # This minimal service does not produce blocks. # Block production requires validator keys. - new_store = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) # Update sync service's store reference. @@ -137,6 +138,11 @@ async def run(self) -> None: # the updated time. self.sync_service.store = new_store + # Publish any new aggregated attestations produced this tick + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + logger.info( "Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d", self.clock.current_slot(), @@ -162,9 +168,10 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store = self.sync_service.store.on_tick( + new_store, _ = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store return self.clock.total_intervals() diff --git a/src/lean_spec/subspecs/containers/state/state.py b/src/lean_spec/subspecs/containers/state/state.py index a494f0fe..32c57959 100644 --- a/src/lean_spec/subspecs/containers/state/state.py +++ b/src/lean_spec/subspecs/containers/state/state.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging from typing import AbstractSet, Collection, Iterable from lean_spec.subspecs.ssz.hash import hash_tree_root @@ -33,6 +34,8 @@ Validators, ) +logger = logging.getLogger(__name__) + class State(Container): """The main consensus state object.""" @@ -449,6 +452,13 @@ def process_attestations( # # The rules below filter out invalid or irrelevant votes. for attestation in attestations: + logger.debug( + "Processing attestation: target=slot%d source=slot%d participants=%s", + attestation.data.target.slot, + attestation.data.source.slot, + attestation.aggregation_bits.to_validator_indices(), + ) + source = attestation.data.source target = attestation.data.target @@ -457,6 +467,7 @@ def process_attestations( # A vote may only originate from a point in history that is already justified. # A source that lacks existing justification cannot be used to anchor a new vote. if not justified_slots.is_slot_justified(finalized_slot, source.slot): + logger.debug("Skipping attestation: source slot %d not justified", source.slot) continue # Ignore votes for targets that have already reached consensus. @@ -468,6 +479,7 @@ def process_attestations( # Ignore votes that reference zero-hash slots. if source.root == ZERO_HASH or target.root == ZERO_HASH: + logger.debug("Skipping attestation: zero root in source/target") continue # Ensure the vote refers to blocks that actually exist on our chain. @@ -491,6 +503,11 @@ def process_attestations( ) if not source_matches or not target_matches: + logger.debug( + "Skipping attestation: root mismatch (source_match=%s target_match=%s)", + source_matches, + target_matches, + ) continue # Ensure time flows forward. @@ -498,6 +515,11 @@ def process_attestations( # A target must always lie strictly after its source slot. # Otherwise the vote makes no chronological sense. if target.slot <= source.slot: + logger.debug( + "Skipping attestation: target slot %d <= source slot %d", + target.slot, + source.slot, + ) continue # Ensure the target falls on a slot that can be justified after the finalized one. @@ -514,6 +536,11 @@ def process_attestations( # Any target outside this pattern is not eligible for justification, # so votes for it are simply ignored. if not target.slot.is_justifiable_after(self.latest_finalized.slot): + logger.debug( + "Skipping attestation: target slot %d not justifiable after finalized slot %d", + target.slot, + self.latest_finalized.slot, + ) continue # Record the vote. @@ -542,6 +569,12 @@ def process_attestations( count = sum(bool(justified) for justified in justifications[target.root]) if 3 * count >= (2 * len(self.validators)): + logger.info( + "Supermajority reached for target slot %d: %d votes (threshold: %d)", + target.slot, + count, + (2 * len(self.validators) + 2) // 3, + ) # The block becomes justified # # The chain now considers this block part of its safe head. @@ -573,6 +606,7 @@ def process_attestations( old_finalized_slot = finalized_slot latest_finalized = source finalized_slot = latest_finalized.slot + logger.info("Finalization advanced to slot %d", finalized_slot) # Rebase/prune justification tracking across the new finalized boundary. # diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index d54636a2..c567ce2f 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -7,6 +7,7 @@ __all__ = ["Store"] import copy +import logging from collections import defaultdict from lean_spec.subspecs.chain.config import ( @@ -45,6 +46,8 @@ ) from lean_spec.types.container import Container +logger = logging.getLogger(__name__) + class Store(Container): """ @@ -488,7 +491,6 @@ def on_gossip_aggregated_attestation( new_attestation_data_by_root = dict(self.attestation_data_by_root) new_attestation_data_by_root[data_root] = data - store = self for vid in validator_ids: # Update Proof Map # @@ -497,7 +499,7 @@ def on_gossip_aggregated_attestation( new_aggregated_payloads.setdefault(key, []).append(proof) # Return store with updated aggregated payloads and attestation data - return store.model_copy( + return self.model_copy( update={ "latest_new_aggregated_payloads": new_aggregated_payloads, "attestation_data_by_root": new_attestation_data_by_root, @@ -856,28 +858,8 @@ def update_head(self) -> "Store": ) def accept_new_attestations(self) -> "Store": - """ - Process pending aggregated payloads and update forkchoice head. - - Moves aggregated payloads from latest_new_aggregated_payloads to - latest_known_aggregated_payloads, making them eligible to contribute to - fork choice weights. This migration happens at specific interval ticks. - - The Interval Tick System - ------------------------- - Aggregated payloads progress through intervals: - - Interval 0: Block proposal - - Interval 1: Validators cast attestations (enter "new") - - Interval 2: Aggregators create proofs & broadcast - - Interval 3: Safe target update - - Interval 4: Process accumulated attestations - - This staged progression ensures proper timing and prevents premature - influence on fork choice decisions. + """Process pending aggregated payloads and update forkchoice head.""" - Returns: - New Store with migrated aggregated payloads and updated head. - """ # Merge new aggregated payloads into known aggregated payloads merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads) for sig_key, proofs in self.latest_new_aggregated_payloads.items(): @@ -937,7 +919,7 @@ def update_safe_target(self) -> "Store": return self.model_copy(update={"safe_target": safe_target}) - def aggregate_committee_signatures(self) -> "Store": + def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Aggregate committee signatures for attestations in committee_signatures. @@ -945,7 +927,7 @@ def aggregate_committee_signatures(self) -> "Store": Attestations are reconstructed from gossip_signatures using attestation_data_by_root. Returns: - New Store with updated latest_new_aggregated_payloads. + Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation). """ new_aggregated_payloads = dict(self.latest_new_aggregated_payloads) @@ -970,13 +952,14 @@ def aggregate_committee_signatures(self) -> "Store": committee_signatures, ) - # iterate to broadcast aggregated attestations + # Create list for broadcasting + new_aggregates: list[SignedAggregatedAttestation] = [] for aggregated_attestation, aggregated_signature in aggregated_results: - _ = SignedAggregatedAttestation( + agg = SignedAggregatedAttestation( data=aggregated_attestation.data, proof=aggregated_signature, ) - # Note: here we should broadcast the aggregated signature to committee_aggregators topic + new_aggregates.append(agg) # Compute new aggregated payloads new_gossip_sigs = dict(self.gossip_signatures) @@ -998,9 +981,11 @@ def aggregate_committee_signatures(self) -> "Store": "latest_new_aggregated_payloads": new_aggregated_payloads, "gossip_signatures": new_gossip_sigs, } - ) + ), new_aggregates - def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def tick_interval( + self, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance store time by one interval and perform interval-specific actions. @@ -1042,11 +1027,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto is_aggregator: Whether the node is an aggregator. Returns: - New Store with advanced time and interval-specific updates applied. + Tuple of (new Store with advanced time, list of new SignedAggregatedAttestation). """ # Advance time by one interval store = self.model_copy(update={"time": self.time + Uint64(1)}) current_interval = store.time % INTERVALS_PER_SLOT + new_aggregates: list[SignedAggregatedAttestation] = [] if current_interval == Uint64(0): # Start of slot - process attestations if proposal exists @@ -1055,7 +1041,7 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto elif current_interval == Uint64(2): # Aggregation interval - aggregators create proofs if is_aggregator: - store = store.aggregate_committee_signatures() + store, new_aggregates = store.aggregate_committee_signatures() elif current_interval == Uint64(3): # Fast confirm - update safe target based on received proofs store = store.update_safe_target() @@ -1063,9 +1049,11 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto # End of slot - accept accumulated attestations store = store.accept_new_attestations() - return store + return store, new_aggregates - def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance forkchoice store time to given timestamp. @@ -1079,7 +1067,7 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) is_aggregator: Whether the node is an aggregator. Returns: - New Store with time advanced and all interval actions performed. + Tuple of (new Store with time advanced, list of all produced SignedAggregatedAttestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) @@ -1087,14 +1075,16 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) # Tick forward one interval at a time store = self + all_new_aggregates: list[SignedAggregatedAttestation] = [] while store.time < tick_interval_time: # Check if proposal should be signaled for next interval should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time # Advance by one interval with appropriate signaling - store = store.tick_interval(should_signal_proposal, is_aggregator) + store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator) + all_new_aggregates.extend(new_aggregates) - return store + return store, all_new_aggregates def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: """ @@ -1122,7 +1112,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: slot_time = self.config.genesis_time + slot_duration_seconds # Advance time to current slot (ticking intervals) - store = self.on_tick(slot_time, True) + store, _ = self.on_tick(slot_time, True) # Process any pending attestations before proposal store = store.accept_new_attestations() @@ -1168,8 +1158,11 @@ def get_attestation_target(self) -> Checkpoint: # # This ensures the target doesn't advance too far ahead of safe target, # providing a balance between liveness and safety. + # + # MODIFIED: We allow the target to be up to 1 slot ahead of safe_target + # to ensure the chain can actually start advancing from genesis. for _ in range(JUSTIFICATION_LOOKBACK_SLOTS): - if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot: + if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1): target_block_root = self.blocks[target_block_root].parent_root else: break @@ -1186,7 +1179,7 @@ def get_attestation_target(self) -> Checkpoint: # Create checkpoint from selected target block target_block = self.blocks[target_block_root] - return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot) + return Checkpoint(root=target_block_root, slot=target_block.slot) def produce_attestation_data(self, slot: Slot) -> AttestationData: """ @@ -1293,7 +1286,7 @@ def produce_block_with_signatures( # # The builder iteratively collects valid attestations. # It returns the final block, post-state, and signature proofs. - final_block, final_post_state, _, signatures = head_state.build_block( + final_block, final_post_state, collected_attestations, signatures = head_state.build_block( slot=slot, proposer_index=validator_index, parent_root=head_root, diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 193c64f1..4f617ae6 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -28,12 +28,13 @@ from lean_spec.snappy import frame_compress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.peer.info import PeerInfo from lean_spec.subspecs.networking.types import ConnectionState from .events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -146,14 +147,22 @@ async def _handle_event(self, event: NetworkEvent) -> None: ) await self.sync_service.on_gossip_block(block, peer_id) - case GossipAttestationEvent(attestation=attestation, peer_id=peer_id): + case GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic): # # SyncService will validate signature and update forkchoice. await self.sync_service.on_gossip_attestation( attestation=attestation, + subnet_id=topic.subnet_id or 0, peer_id=peer_id, ) + case GossipAggregatedAttestationEvent(signed_attestation=att, peer_id=peer_id): + # Route aggregated attestations to sync service. + # + # Aggregates contain multiple validator votes and are used + # to advance justification and finalization. + await self.sync_service.on_gossip_aggregated_attestation(att, peer_id) + case PeerStatusEvent(peer_id=peer_id, status=status): # Route peer status updates to sync service. # @@ -234,3 +243,21 @@ async def publish_attestation(self, attestation: SignedAttestation, subnet_id: i await self.event_source.publish(str(topic), compressed) logger.debug("Published attestation for slot %s", attestation.message.slot) + + async def publish_aggregated_attestation( + self, signed_attestation: SignedAggregatedAttestation + ) -> None: + """ + Publish an aggregated attestation to the aggregation gossip topic. + + Args: + signed_attestation: Aggregated attestation to publish. + """ + topic = GossipTopic.committee_aggregation(self.fork_digest) + ssz_bytes = signed_attestation.encode_bytes() + compressed = frame_compress(ssz_bytes) + + await self.event_source.publish(str(topic), compressed) + logger.debug( + "Published aggregated attestation for slot %s", signed_attestation.data.slot + ) diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index eb245c1f..1a339ead 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -22,7 +22,7 @@ from lean_spec.subspecs.chain import SlotClock from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT from lean_spec.subspecs.chain.service import ChainService -from lean_spec.subspecs.containers import Block, BlockBody, State +from lean_spec.subspecs.containers import Block, BlockBody, SignedBlockWithAttestation, State from lean_spec.subspecs.containers.attestation import SignedAttestation from lean_spec.subspecs.containers.block.types import AggregatedAttestations from lean_spec.subspecs.containers.slot import Slot @@ -240,6 +240,13 @@ def from_genesis(cls, config: NodeConfig) -> Node: is_aggregator=config.is_aggregator, ) + # Wire up aggregated attestation publishing. + # + # ReqRespClient implements NetworkRequester which SyncService uses + # to publish aggregates. We route these to NetworkService. + if hasattr(config.network, "set_publish_agg_fn"): + config.network.set_publish_agg_fn(network_service.publish_aggregated_attestation) + # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: @@ -262,12 +269,19 @@ def from_genesis(cls, config: NodeConfig) -> Node: async def publish_attestation_wrapper(attestation: SignedAttestation) -> None: subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) await network_service.publish_attestation(attestation, subnet_id) + # Also route locally so we can aggregate our own attestation + await sync_service.on_gossip_attestation(attestation, subnet_id, peer_id=None) + + async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None: + await network_service.publish_block(block) + # Also route locally so we update our own store + await sync_service.on_gossip_block(block, peer_id=None) validator_service = ValidatorService( sync_service=sync_service, clock=clock, registry=config.validator_registry, - on_block=network_service.publish_block, + on_block=publish_block_wrapper, on_attestation=publish_attestation_wrapper, ) diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index e210d16f..5ba35002 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -46,6 +46,7 @@ from lean_spec.subspecs.chain.clock import SlotClock from lean_spec.subspecs.containers import ( Block, + SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, ) @@ -402,7 +403,8 @@ async def on_gossip_block( async def on_gossip_attestation( self, attestation: SignedAttestation, - peer_id: PeerId, # noqa: ARG002 + subnet_id: int, + peer_id: PeerId | None = None, ) -> None: """ Handle attestation received via gossip. @@ -416,7 +418,8 @@ async def on_gossip_attestation( Args: attestation: The signed attestation received. - peer_id: The peer that propagated the attestation (unused for now). + subnet_id: Subnet ID the attestation was received on. + peer_id: The peer that propagated the attestation (optional). """ # Guard: Only process gossip in states that accept it. # @@ -454,6 +457,45 @@ async def on_gossip_attestation( # These are expected during normal operation and don't indicate bugs. pass + async def on_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, # noqa: ARG002 + ) -> None: + """ + Handle aggregated attestation received via gossip. + + Aggregated attestations are collections of individual votes for the same + target, signed by an aggregator. They provide efficient propagation of + consensus weight. + + Args: + signed_attestation: The signed aggregated attestation received. + peer_id: The peer that propagated the aggregate (unused for now). + """ + if not self._state.accepts_gossip: + return + + try: + self.store = self.store.on_gossip_aggregated_attestation(signed_attestation) + except (AssertionError, KeyError): + # Aggregation validation failed. + pass + + async def publish_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + ) -> None: + """ + Publish an aggregated attestation to the network. + + Called by the chain service when this node acts as an aggregator. + + Args: + signed_attestation: The aggregate to publish. + """ + await self.network.publish_aggregated_attestation(signed_attestation) + async def start_sync(self) -> None: """ Start or resume synchronization. diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index 217ed724..e01f3c01 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -323,7 +323,10 @@ async def _produce_attestations(self, slot: Slot) -> None: Args: slot: Current slot number. """ + # Ensure we are attesting to the latest known head + self.sync_service.store = self.sync_service.store.update_head() store = self.sync_service.store + head_state = store.states.get(store.head) if head_state is None: return diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 691cc2af..eebe910b 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -12,6 +12,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, cast +from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT from lean_spec.subspecs.containers import Checkpoint, Validator from lean_spec.subspecs.containers.state import Validators from lean_spec.subspecs.containers.validator import ValidatorIndex @@ -237,6 +238,7 @@ async def start_node( self, node_index: int, validator_indices: list[int] | None = None, + is_aggregator: bool = False, bootnodes: list[str] | None = None, *, start_services: bool = True, @@ -247,6 +249,7 @@ async def start_node( Args: node_index: Index for this node (for logging/identification). validator_indices: Which validators this node controls. + is_aggregator: Whether this node is aggregator bootnodes: Addresses to connect to on startup. start_services: If True, start the node's services immediately. If False, call test_node.start() manually after mesh is stable. @@ -285,6 +288,7 @@ async def start_node( api_config=None, # Disable API server for interop tests (not needed for P2P testing) validator_registry=validator_registry, fork_digest=self.fork_digest, + is_aggregator=is_aggregator, ) node = Node.from_genesis(config) @@ -353,9 +357,20 @@ async def start_node( await event_source.start_gossipsub() block_topic = f"/leanconsensus/{self.fork_digest}/block/ssz_snappy" - attestation_topic = f"/leanconsensus/{self.fork_digest}/attestation/ssz_snappy" + aggregation_topic = f"/leanconsensus/{self.fork_digest}/aggregation/ssz_snappy" event_source.subscribe_gossip_topic(block_topic) - event_source.subscribe_gossip_topic(attestation_topic) + event_source.subscribe_gossip_topic(aggregation_topic) + + # Determine subnets for our validators and subscribe. + # + # Validators only subscribe to the subnets they are assigned to. + # This matches the Ethereum gossip specification. + if validator_indices: + from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT + for idx in validator_indices: + subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) + topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" + event_source.subscribe_gossip_topic(topic) # Optionally start the node's services. # @@ -411,9 +426,20 @@ async def start_all( # This allows the gossipsub mesh to form before validators start # producing blocks and attestations. Otherwise, early blocks/attestations # would be "Published message to 0 peers" because the mesh is empty. + aggregator_indices = set(range(int(ATTESTATION_COMMITTEE_COUNT))) for i in range(num_nodes): validator_indices = validators_per_node[i] if i < len(validators_per_node) else [] - await self.start_node(i, validator_indices, start_services=False) + + # A node is an aggregator if it controls any of the first + # ATTESTATION_COMMITTEE_COUNT validators. + is_node_aggregator = any(vid in aggregator_indices for vid in validator_indices) + + await self.start_node( + i, + validator_indices, + is_aggregator=is_node_aggregator, + start_services=False, + ) # Stagger node startup like Ream does. # diff --git a/tests/lean_spec/helpers/builders.py b/tests/lean_spec/helpers/builders.py index 9c7e2f56..5766379b 100644 --- a/tests/lean_spec/helpers/builders.py +++ b/tests/lean_spec/helpers/builders.py @@ -526,7 +526,7 @@ def make_signed_block_from_store( slot_duration = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration - advanced_store = store.on_tick(block_time, has_proposal=True) + advanced_store, _ = store.on_tick(block_time, has_proposal=True) return advanced_store, signed_block diff --git a/tests/lean_spec/subspecs/chain/test_service.py b/tests/lean_spec/subspecs/chain/test_service.py index 8e2e9ac4..1406bf26 100644 --- a/tests/lean_spec/subspecs/chain/test_service.py +++ b/tests/lean_spec/subspecs/chain/test_service.py @@ -28,7 +28,9 @@ class MockStore: head: Bytes32 = field(default_factory=lambda: ZERO_HASH) latest_finalized: MockCheckpoint = field(default_factory=MockCheckpoint) - def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple[MockStore, list]: """Record the tick call and return a new store.""" new_store = MockStore( time=time, @@ -37,7 +39,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: latest_finalized=self.latest_finalized, ) new_store.tick_calls.append((time, has_proposal)) - return new_store + return new_store, [] @dataclass @@ -45,6 +47,7 @@ class MockSyncService: """Mock sync service for testing ChainService.""" store: MockStore = field(default_factory=MockStore) + is_aggregator: bool = False class TestChainServiceLifecycle: diff --git a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py index 43f72e05..15c7ceef 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py +++ b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py @@ -162,7 +162,7 @@ def test_safe_target_requires_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target (uses latest_new_aggregated_payloads) store = store.update_safe_target() @@ -206,7 +206,7 @@ def test_safe_target_advances_with_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target store = store.update_safe_target() @@ -246,7 +246,7 @@ def test_update_safe_target_uses_new_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate into new payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target should use new aggregated payloads store = store.update_safe_target() @@ -300,7 +300,7 @@ def test_justification_with_supermajority_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate signatures before producing the next block - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Produce block 2 which includes these attestations store, block_2, signatures = store.produce_block_with_signatures(slot_2, proposer_2) @@ -381,7 +381,7 @@ def test_justification_tracking_with_multiple_targets( ) store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() store = store.update_safe_target() # Neither target should be justified with only half validators @@ -526,7 +526,7 @@ def test_full_attestation_cycle( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Phase 3: Aggregate signatures into payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Phase 4: Update safe target store = store.update_safe_target() @@ -591,7 +591,7 @@ def test_attestation_target_after_on_block( # Process block via on_block on a fresh consumer store consumer_store = observer_store block_time = consumer_store.config.genesis_time + block.slot * Uint64(SECONDS_PER_SLOT) - consumer_store = consumer_store.on_tick(block_time, has_proposal=True) + consumer_store, _ = consumer_store.on_tick(block_time, has_proposal=True) consumer_store = consumer_store.on_block(signed_block) # Get attestation target after on_block diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index f02f73d4..e0044c3e 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -509,7 +509,7 @@ def test_aggregates_gossip_signatures_into_proof(self, key_manager: XmssKeyManag ) # Perform aggregation - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify proofs were created and stored data_root = attestation_data.data_root_bytes() @@ -537,7 +537,7 @@ def test_aggregated_proof_is_valid(self, key_manager: XmssKeyManager) -> None: attesting_validators=attesting_validators, ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() data_root = attestation_data.data_root_bytes() sig_key = SignatureKey(ValidatorIndex(1), data_root) @@ -567,7 +567,7 @@ def test_empty_gossip_signatures_produces_no_proofs(self, key_manager: XmssKeyMa attesting_validators=[], # No attesters ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify no proofs were created assert len(updated_store.latest_new_aggregated_payloads) == 0 @@ -619,7 +619,7 @@ def test_multiple_attestation_data_grouped_separately( } ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify both validators have separate proofs sig_key_1 = SignatureKey(ValidatorIndex(1), data_root_1) @@ -661,7 +661,7 @@ def test_interval_2_triggers_aggregation_for_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) # Verify aggregation was performed data_root = attestation_data.data_root_bytes() @@ -692,7 +692,7 @@ def test_interval_2_skips_aggregation_for_non_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as NON-aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=False) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=False) # Verify aggregation was NOT performed data_root = attestation_data.data_root_bytes() @@ -728,13 +728,12 @@ def test_other_intervals_do_not_trigger_aggregation(self, key_manager: XmssKeyMa # After tick, time becomes time+1, and interval = (time+1) % 5 # So we need time+1 % 5 == target_interval # Therefore time = target_interval - 1 (mod 5) - pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) - test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) - - updated_store = test_store.tick_interval(has_proposal=False, is_aggregator=True) - - assert sig_key not in updated_store.latest_new_aggregated_payloads, ( - f"Aggregation should NOT occur at interval {target_interval}" + pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) + test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) + + updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) + + assert sig_key not in updated_store.latest_new_aggregated_payloads, ( f"Aggregation should NOT occur at interval {target_interval}" ) def test_interval_0_accepts_attestations_with_proposal( @@ -754,7 +753,7 @@ def test_interval_0_accepts_attestations_with_proposal( store = store.model_copy(update={"time": Uint64(4)}) # Tick to interval 0 with proposal - updated_store = store.tick_interval(has_proposal=True, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=True, is_aggregator=True) # Verify time advanced assert updated_store.time == Uint64(5) @@ -810,12 +809,11 @@ def test_gossip_to_aggregation_to_storage(self, key_manager: XmssKeyManager) -> sig_key = SignatureKey(vid, data_root) assert sig_key in store.gossip_signatures, f"Signature for {vid} should be stored" - # Step 2: Advance to interval 2 (aggregation interval) - store = store.model_copy(update={"time": Uint64(1)}) - store = store.tick_interval(has_proposal=False, is_aggregator=True) - - # Step 3: Verify aggregated proofs were created - for vid in attesting_validators: + # Step 2: Advance to interval 2 (aggregation interval) + store = store.model_copy(update={"time": Uint64(1)}) + store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) + + # Step 3: Verify aggregated proofs were created for vid in attesting_validators: sig_key = SignatureKey(vid, data_root) assert sig_key in store.latest_new_aggregated_payloads, ( f"Aggregated proof for {vid} should exist after interval 2" diff --git a/tests/lean_spec/subspecs/forkchoice/test_time_management.py b/tests/lean_spec/subspecs/forkchoice/test_time_management.py index 228f1b82..399739f1 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_time_management.py +++ b/tests/lean_spec/subspecs/forkchoice/test_time_management.py @@ -54,7 +54,7 @@ def test_on_tick_basic(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(200) # Much later time - sample_store = sample_store.on_tick(target_time, has_proposal=True) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=True) # Time should advance assert sample_store.time > initial_time @@ -64,7 +64,7 @@ def test_on_tick_no_proposal(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(100) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Time should still advance assert sample_store.time >= initial_time @@ -75,7 +75,7 @@ def test_on_tick_already_current(self, sample_store: Store) -> None: current_target = sample_store.config.genesis_time + initial_time # Try to advance to current time (should be no-op) - sample_store = sample_store.on_tick(current_target, has_proposal=True) + sample_store, _ = sample_store.on_tick(current_target, has_proposal=True) # Should not change significantly (time can only increase) # Tolerance increased for 5-interval per slot system @@ -86,7 +86,7 @@ def test_on_tick_small_increment(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + initial_time + Uint64(1) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Should advance by small amount assert sample_store.time >= initial_time @@ -100,7 +100,7 @@ def test_tick_interval_basic(self, sample_store: Store) -> None: initial_time = sample_store.time # Tick one interval forward - sample_store = sample_store.tick_interval(has_proposal=False) + sample_store, _ = sample_store.tick_interval(has_proposal=False) # Time should advance by one interval assert sample_store.time == initial_time + Uint64(1) @@ -109,7 +109,7 @@ def test_tick_interval_with_proposal(self, sample_store: Store) -> None: """Test interval ticking with proposal.""" initial_time = sample_store.time - sample_store = sample_store.tick_interval(has_proposal=True) + sample_store, _ = sample_store.tick_interval(has_proposal=True) # Time should advance assert sample_store.time == initial_time + Uint64(1) @@ -120,7 +120,7 @@ def test_tick_interval_sequence(self, sample_store: Store) -> None: # Tick multiple intervals for i in range(5): - sample_store = sample_store.tick_interval(has_proposal=(i % 2 == 0)) + sample_store, _ = sample_store.tick_interval(has_proposal=(i % 2 == 0)) # Should have advanced by 5 intervals assert sample_store.time == initial_time + Uint64(5) @@ -136,7 +136,7 @@ def test_tick_interval_actions_by_phase(self, sample_store: Store) -> None: # Tick through a complete slot cycle for interval in range(INTERVALS_PER_SLOT): has_proposal = interval == 0 # Proposal only in first interval - sample_store = sample_store.tick_interval(has_proposal=has_proposal) + sample_store, _ = sample_store.tick_interval(has_proposal=has_proposal) current_interval = sample_store.time % INTERVALS_PER_SLOT expected_interval = Uint64((interval + 1)) % INTERVALS_PER_SLOT From a72c8de2d6b0efb3ca7da49ca9de4c3826254d28 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 18:40:07 +0500 Subject: [PATCH 2/7] fix: wire aggregated attestation broadcast through network pipeline The aggregated attestation pipeline was broken at multiple points, preventing finalization in multi-node setups: - Add missing GossipAggregatedAttestationEvent to network events - Add AGGREGATED_ATTESTATION decoding and dispatch in event sources - Fix SyncService.publish_aggregated_attestation to use a callback instead of a missing method on NetworkRequester - Wire publish callback directly in Node.from_genesis - Publish aggregates from ChainService._initial_tick (was discarded) - Enable test_late_joiner_sync with is_aggregator=True on node 0 --- src/lean_spec/subspecs/chain/service.py | 8 +++- .../networking/client/event_source.py | 38 ++++++++++++++++++- .../subspecs/networking/service/events.py | 22 ++++++++++- src/lean_spec/subspecs/node/node.py | 7 ++-- src/lean_spec/subspecs/sync/service.py | 15 ++++++-- tests/interop/test_late_joiner.py | 3 +- 6 files changed, 80 insertions(+), 13 deletions(-) diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 475f3c3d..16ab4d3f 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -168,12 +168,18 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store, _ = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store + + # Publish any aggregated attestations produced during catch-up. + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + return self.clock.total_intervals() return None diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index a4973742..8be72b9f 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -106,7 +106,7 @@ from lean_spec.snappy import SnappyDecompressionError, frame_decompress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.config import ( GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12, @@ -130,6 +130,7 @@ ) from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.service.events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -324,7 +325,7 @@ def decode_message( self, topic_str: str, compressed_data: bytes, - ) -> SignedBlockWithAttestation | SignedAttestation | None: + ) -> SignedBlockWithAttestation | SignedAttestation | SignedAggregatedAttestation | None: """ Decode a gossip message from topic and compressed data. @@ -392,6 +393,8 @@ def decode_message( return SignedBlockWithAttestation.decode_bytes(ssz_bytes) case TopicKind.ATTESTATION_SUBNET: return SignedAttestation.decode_bytes(ssz_bytes) + case TopicKind.AGGREGATED_ATTESTATION: + return SignedAggregatedAttestation.decode_bytes(ssz_bytes) except SSZSerializationError as e: raise GossipMessageError(f"SSZ decode failed: {e}") from e @@ -818,6 +821,9 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None: case TopicKind.ATTESTATION_SUBNET: if isinstance(message, SignedAttestation): await self._emit_gossip_attestation(message, event.peer_id) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, event.peer_id) logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id) @@ -1170,6 +1176,25 @@ async def _emit_gossip_attestation( GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic) ) + async def _emit_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, + ) -> None: + """ + Emit a gossip aggregated attestation event. + + Args: + signed_attestation: Aggregated attestation received from gossip. + peer_id: Peer that sent it. + """ + topic = GossipTopic(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=self._fork_digest) + await self._events.put( + GossipAggregatedAttestationEvent( + signed_attestation=signed_attestation, peer_id=peer_id, topic=topic + ) + ) + async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None: """ Accept incoming streams from a connection. @@ -1466,6 +1491,15 @@ async def _handle_gossip_stream(self, peer_id: PeerId, stream: Stream) -> None: # Type mismatch indicates a bug in decode_message. logger.warning("Attestation topic but got %s", type(message).__name__) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, peer_id) + else: + logger.warning( + "Aggregated attestation topic but got %s", + type(message).__name__, + ) + logger.debug("Received gossip %s from %s", topic.kind.value, peer_id) except GossipMessageError as e: diff --git a/src/lean_spec/subspecs/networking/service/events.py b/src/lean_spec/subspecs/networking/service/events.py index 91b6334b..38c4dcb9 100644 --- a/src/lean_spec/subspecs/networking/service/events.py +++ b/src/lean_spec/subspecs/networking/service/events.py @@ -26,7 +26,7 @@ from typing import Protocol, runtime_checkable from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.transport import PeerId @@ -69,6 +69,25 @@ class GossipAttestationEvent: """Topic the attestation was received on (includes fork digest).""" +@dataclass(frozen=True, slots=True) +class GossipAggregatedAttestationEvent: + """ + Aggregated attestation received via gossip subscription. + + Fired when a signed aggregated attestation arrives from the gossipsub network. + Aggregates contain multiple validator votes combined into a single proof. + """ + + signed_attestation: SignedAggregatedAttestation + """The signed aggregated attestation.""" + + peer_id: PeerId + """Peer that propagated this aggregated attestation to us.""" + + topic: GossipTopic + """Topic the aggregated attestation was received on.""" + + @dataclass(frozen=True, slots=True) class PeerStatusEvent: """ @@ -115,6 +134,7 @@ class PeerDisconnectedEvent: NetworkEvent = ( GossipBlockEvent | GossipAttestationEvent + | GossipAggregatedAttestationEvent | PeerStatusEvent | PeerConnectedEvent | PeerDisconnectedEvent diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 1a339ead..1007f105 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -242,10 +242,9 @@ def from_genesis(cls, config: NodeConfig) -> Node: # Wire up aggregated attestation publishing. # - # ReqRespClient implements NetworkRequester which SyncService uses - # to publish aggregates. We route these to NetworkService. - if hasattr(config.network, "set_publish_agg_fn"): - config.network.set_publish_agg_fn(network_service.publish_aggregated_attestation) + # SyncService delegates aggregate publishing to NetworkService + # via a callback, avoiding a circular dependency. + sync_service._publish_agg_fn = network_service.publish_aggregated_attestation # Create API server if configured api_server: ApiServer | None = None diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 5ba35002..b0dcd7a3 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -38,9 +38,9 @@ import asyncio import logging -from collections.abc import Callable +from collections.abc import Callable, Coroutine from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock @@ -68,6 +68,8 @@ BlockProcessor = Callable[[Store, SignedBlockWithAttestation], Store] +PublishAggFn = Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]] + def default_block_processor( store: Store, @@ -77,6 +79,10 @@ def default_block_processor( return store.on_block(block) +async def _noop_publish_agg(signed_attestation: SignedAggregatedAttestation) -> None: + """No-op default for aggregated attestation publishing.""" + + @dataclass(slots=True) class SyncProgress: """ @@ -157,6 +163,9 @@ class SyncService: process_block: BlockProcessor = field(default=default_block_processor) """Block processor function. Defaults to Store.on_block().""" + _publish_agg_fn: PublishAggFn = field(default=_noop_publish_agg) + """Callback for publishing aggregated attestations to the network.""" + _state: SyncState = field(default=SyncState.IDLE) """Current sync state.""" @@ -494,7 +503,7 @@ async def publish_aggregated_attestation( Args: signed_attestation: The aggregate to publish. """ - await self.network.publish_aggregated_attestation(signed_attestation) + await self._publish_agg_fn(signed_attestation) async def start_sync(self) -> None: """ diff --git a/tests/interop/test_late_joiner.py b/tests/interop/test_late_joiner.py index 343a1f63..53c7c919 100644 --- a/tests/interop/test_late_joiner.py +++ b/tests/interop/test_late_joiner.py @@ -24,7 +24,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(240) @pytest.mark.num_validators(3) async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: @@ -36,7 +35,7 @@ async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: """ validators_per_node = [[0], [1], [2]] - await node_cluster.start_node(0, validators_per_node[0]) + await node_cluster.start_node(0, validators_per_node[0], is_aggregator=True) await node_cluster.start_node(1, validators_per_node[1]) node0 = node_cluster.nodes[0] From 9aa20e9a98d57ea3e98f276dc97b96de295f8b85 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 18:50:00 +0500 Subject: [PATCH 3/7] fix: update fill framework and tests for new on_tick tuple return type - Unpack (store, aggregates) tuple from on_tick and aggregate_committee_signatures in fork choice fill framework - Update attestation target selection tests for the +1 safe_target allowance introduced in get_attestation_target --- .../test_fixtures/fork_choice.py | 8 +- .../fc/test_attestation_target_selection.py | 115 +++++++++--------- 2 files changed, 61 insertions(+), 62 deletions(-) diff --git a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py index 0a0138b6..f48b74c1 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py +++ b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py @@ -235,7 +235,9 @@ def make_fixture(self) -> Self: # Time advancement may trigger slot boundaries. # At slot boundaries, pending attestations may become active. # Always act as aggregator to ensure gossip signatures are aggregated - store = store.on_tick(Uint64(step.time), has_proposal=False, is_aggregator=True) + store, _ = store.on_tick( + Uint64(step.time), has_proposal=False, is_aggregator=True + ) elif isinstance(step, BlockStep): # Build a complete signed block from the lightweight spec. @@ -264,7 +266,7 @@ def make_fixture(self) -> Self: # Always act as aggregator to ensure gossip signatures are aggregated slot_duration_seconds = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration_seconds - store = store.on_tick(block_time, has_proposal=True, is_aggregator=True) + store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True) # Process the block through Store. # This validates, applies state transition, and updates head. @@ -408,7 +410,7 @@ def _build_block_from_spec( # First, aggregate any gossip signatures into payloads # This ensures that signatures from previous blocks (like proposer attestations) # are available for extraction - aggregation_store = working_store.aggregate_committee_signatures() + aggregation_store, _ = working_store.aggregate_committee_signatures() # Now combine aggregated payloads from both sources aggregated_payloads = ( diff --git a/tests/consensus/devnet/fc/test_attestation_target_selection.py b/tests/consensus/devnet/fc/test_attestation_target_selection.py index 8044a7fa..c4f6c994 100644 --- a/tests/consensus/devnet/fc/test_attestation_target_selection.py +++ b/tests/consensus/devnet/fc/test_attestation_target_selection.py @@ -17,24 +17,21 @@ def test_attestation_target_at_genesis_initially( fork_choice_test: ForkChoiceTestFiller, ) -> None: """ - Attestation target starts at genesis before safe target updates. + Attestation target starts near genesis before safe target updates. Scenario -------- Process two blocks at slots 1 and 2. Expected: - - After slot 1: target = slot 0 (genesis/finalized) - - After slot 2: target = slot 0 (genesis/finalized) - - Target root automatically validated against block at slot 0 + - After slot 1: target = slot 1 (1 slot ahead of safe target allowed) + - After slot 2: target = slot 1 (walkback stops at safe_target + 1) Why This Matters ---------------- - Initially, the safe target is at genesis (slot 0), so the attestation - target walks back from head to genesis. - - This conservative behavior ensures validators don't attest too far ahead - before there's sufficient attestation weight to advance the safe target. + Initially, the safe target is at genesis (slot 0). The attestation target + is allowed up to 1 slot ahead of safe target to ensure the chain can + start advancing from genesis. """ fork_choice_test( steps=[ @@ -42,14 +39,14 @@ def test_attestation_target_at_genesis_initially( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), ], @@ -87,35 +84,35 @@ def test_attestation_target_advances_with_attestations( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # 1 slot ahead of safe target allowed ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # Walks back to safe_target + 1 ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # Walks back to safe_target + 1 ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # Advances to slot 1 + attestation_target_slot=Slot(1), # 3-step walkback from 4 → 1 ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # Continues advancing + attestation_target_slot=Slot(2), # 3-step walkback from 5 → 2 ), ), ], @@ -150,21 +147,21 @@ def test_attestation_target_with_slot_gaps( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), # Walks back 5→3→1, stops at safe_target+1 ), ), ], @@ -201,56 +198,56 @@ def test_attestation_target_with_extended_chain( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # Advances to slot 1 + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # Stable at 2 + attestation_target_slot=Slot(2), ), ), BlockStep( block=BlockSpec(slot=Slot(6)), checks=StoreChecks( head_slot=Slot(6), - attestation_target_slot=Slot(3), # Continues to advance + attestation_target_slot=Slot(3), ), ), BlockStep( block=BlockSpec(slot=Slot(7)), checks=StoreChecks( head_slot=Slot(7), - attestation_target_slot=Slot(4), # Continues advancing + attestation_target_slot=Slot(4), ), ), BlockStep( block=BlockSpec(slot=Slot(8)), checks=StoreChecks( head_slot=Slot(8), - attestation_target_slot=Slot(5), # Continues advancing + attestation_target_slot=Slot(5), ), ), ], @@ -296,39 +293,39 @@ def test_attestation_target_justifiable_constraint( head_slot=Slot(i), attestation_target_slot=Slot( # Mapping of current slot -> expected target slot - # delta = current_slot - JUSTIFICATION_LOOKBACK_SLOTS - finalized_slot - # delta = current_slot - 3 - 0 + # With +1 allowance: walkback stops at safe_target + 1 + # delta = target_slot - finalized_slot { - 1: 0, # 3-slot walkback reaches safe target at slot 0 - 2: 0, # 3-slot walkback reaches safe target at slot 0 - 3: 0, # 3-slot walkback reaches safe target at slot 0 - 4: 1, # delta = 4 - 3 - 0 = 1, Rule 1: delta 1 ≤ 5 - 5: 2, # delta = 5 - 3 - 0 = 2, Rule 1: delta 2 ≤ 5 - 6: 3, # delta = 6 - 3 - 0 = 3, Rule 1: delta 3 ≤ 5 - 7: 4, # delta = 7 - 3 - 0 = 4, Rule 1: delta 4 ≤ 5 - 8: 5, # delta = 8 - 3 - 0 = 5, Rule 1: delta 5 ≤ 5 - 9: 6, # delta = 6 - 0 = 6, Rule 3: pronic number (2*3) - 10: 6, # delta = 10 - 3 - 0 = 7 - 11: 6, # delta = 11 - 3 - 0 = 8 - 12: 9, # delta = 9 - 0 = 9, Rule 2: perfect square (3^2) - 13: 9, # delta = 13 - 3 - 0 = 10 - 14: 9, # delta = 14 - 3 - 0 = 11 - 15: 12, # delta = 15 - 3 - 0 = 12, Rule 3: pronic number (3*4) - 16: 12, # delta = 16 - 3 - 0 = 13 - 17: 12, # delta = 17 - 3 - 0 = 14 - 18: 12, # delta = 18 - 3 - 0 = 15 - 19: 16, # delta = 19 - 3 - 0 = 16, Rule 2: perfect square (4^2) - 20: 16, # delta = 20 - 3 - 0 = 17 - 21: 16, # delta = 21 - 3 - 0 = 18 - 22: 16, # delta = 22 - 3 - 0 = 19 - 23: 20, # delta = 23 - 3 - 0 = 20, Rule 3: pronic number (4*5) - 24: 20, # delta = 24 - 3 - 0 = 21 - 25: 20, # delta = 25 - 3 - 0 = 22 - 26: 20, # delta = 26 - 3 - 0 = 23 - 27: 20, # delta = 27 - 3 - 0 = 24 - 28: 25, # delta = 28 - 3 - 0 = 25, Rule 2: perfect square (5^2) - 29: 25, # delta = 29 - 3 - 0 = 26 - 30: 25, # delta = 30 - 3 - 0 = 27 + 1: 1, # At safe_target + 1, no walkback needed + 2: 1, # Walks back to safe_target + 1 + 3: 1, # Walks back to safe_target + 1 + 4: 1, # 3-step walkback from 4 → 1 + 5: 2, # 3-step walkback from 5 → 2, delta 2 ≤ 5 + 6: 3, # 3-step walkback from 6 → 3, delta 3 ≤ 5 + 7: 4, # 3-step walkback from 7 → 4, delta 4 ≤ 5 + 8: 5, # 3-step walkback from 8 → 5, delta 5 ≤ 5 + 9: 6, # delta = 6, pronic number (2*3) + 10: 6, # delta = 7, not justifiable → walks to 6 + 11: 6, # delta = 8, not justifiable → walks to 6 + 12: 9, # delta = 9, perfect square (3^2) + 13: 9, # delta = 10, not justifiable → walks to 9 + 14: 9, # delta = 11, not justifiable → walks to 9 + 15: 12, # delta = 12, pronic number (3*4) + 16: 12, # delta = 13, not justifiable → walks to 12 + 17: 12, # delta = 14, not justifiable → walks to 12 + 18: 12, # delta = 15, not justifiable → walks to 12 + 19: 16, # delta = 16, perfect square (4^2) + 20: 16, # delta = 17, not justifiable → walks to 16 + 21: 16, # delta = 18, not justifiable → walks to 16 + 22: 16, # delta = 19, not justifiable → walks to 16 + 23: 20, # delta = 20, pronic number (4*5) + 24: 20, # delta = 21, not justifiable → walks to 20 + 25: 20, # delta = 22, not justifiable → walks to 20 + 26: 20, # delta = 23, not justifiable → walks to 20 + 27: 20, # delta = 24, not justifiable → walks to 20 + 28: 25, # delta = 25, perfect square (5^2) + 29: 25, # delta = 26, not justifiable → walks to 25 + 30: 25, # delta = 27, not justifiable → walks to 25 }[i] ), ), From dc2c45b0560ad60065c4ce20047388ce39fdcffa Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 19:04:21 +0500 Subject: [PATCH 4/7] fix: unskip multi-node interop tests and fix store attribute names - Remove @pytest.mark.skip from test_mesh_finalization and test_mesh_2_2_2_finalization - Update store attribute references: latest_new_attestations -> latest_new_aggregated_payloads, latest_known_attestations -> latest_known_aggregated_payloads - test_partition_recovery remains xfail (known sync service limitation) --- tests/interop/test_multi_node.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index d2edcfe5..2409018a 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -44,7 +44,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: @@ -146,8 +145,12 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # # - High new_atts, low known_atts = processing bottleneck # - Low counts everywhere = gossip not propagating - new_atts = [len(node._store.latest_new_attestations) for node in node_cluster.nodes] - known_atts = [len(node._store.latest_known_attestations) for node in node_cluster.nodes] + new_atts = [ + len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes + ] + known_atts = [ + len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes + ] logger.info( "Progress: head=%s justified=%s finalized=%s new_atts=%s known_atts=%s", @@ -200,7 +203,6 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: ) -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: From a89d33a71dfcf1016f934deda37c53ffeba4254b Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 19:16:23 +0500 Subject: [PATCH 5/7] fix: resolve lint, type, and test issues across tox suite - Fix test_store_attestations indentation: steps 2-3 were inside for loop, causing aggregation to clear signatures before second validator check - Fix store.py lint: remove blank line after docstring, wrap long line - Fix type errors: peer_id accepts None for self-produced blocks throughout sync pipeline (SyncService, HeadSync, BlockCache) - Fix formatting in service.py, node_runner.py, test_multi_node.py --- src/lean_spec/subspecs/forkchoice/store.py | 4 ++-- .../subspecs/networking/service/service.py | 4 +--- src/lean_spec/subspecs/sync/block_cache.py | 5 ++-- src/lean_spec/subspecs/sync/head_sync.py | 8 +++---- src/lean_spec/subspecs/sync/service.py | 2 +- tests/interop/helpers/node_runner.py | 1 + tests/interop/test_multi_node.py | 4 +--- .../forkchoice/test_store_attestations.py | 24 ++++++++++--------- 8 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index c567ce2f..88bc7301 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -859,7 +859,6 @@ def update_head(self) -> "Store": def accept_new_attestations(self) -> "Store": """Process pending aggregated payloads and update forkchoice head.""" - # Merge new aggregated payloads into known aggregated payloads merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads) for sig_key, proofs in self.latest_new_aggregated_payloads.items(): @@ -1067,7 +1066,8 @@ def on_tick( is_aggregator: Whether the node is an aggregator. Returns: - Tuple of (new Store with time advanced, list of all produced SignedAggregatedAttestation). + Tuple of (new Store with time advanced, + list of all produced SignedAggregatedAttestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 4f617ae6..80beb9e3 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -258,6 +258,4 @@ async def publish_aggregated_attestation( compressed = frame_compress(ssz_bytes) await self.event_source.publish(str(topic), compressed) - logger.debug( - "Published aggregated attestation for slot %s", signed_attestation.data.slot - ) + logger.debug("Published aggregated attestation for slot %s", signed_attestation.data.slot) diff --git a/src/lean_spec/subspecs/sync/block_cache.py b/src/lean_spec/subspecs/sync/block_cache.py index 2b8de0b0..fc803df7 100644 --- a/src/lean_spec/subspecs/sync/block_cache.py +++ b/src/lean_spec/subspecs/sync/block_cache.py @@ -103,7 +103,7 @@ class PendingBlock: slot order to ensure parents are processed before children. """ - received_from: PeerId + received_from: PeerId | None """ Peer that sent this block. @@ -112,6 +112,7 @@ class PendingBlock: - If invalid, they get penalized. This creates incentives for good behavior. + None for self-produced blocks. """ received_at: float = field(default_factory=time) @@ -165,7 +166,7 @@ def __contains__(self, root: Bytes32) -> bool: def add( self, block: SignedBlockWithAttestation, - peer: PeerId, + peer: PeerId | None, backfill_depth: int = 0, ) -> PendingBlock: """ diff --git a/src/lean_spec/subspecs/sync/head_sync.py b/src/lean_spec/subspecs/sync/head_sync.py index b14d0592..96610c9e 100644 --- a/src/lean_spec/subspecs/sync/head_sync.py +++ b/src/lean_spec/subspecs/sync/head_sync.py @@ -143,7 +143,7 @@ class HeadSync: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -217,7 +217,7 @@ async def on_gossip_block( async def _process_block_with_descendants( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -283,7 +283,7 @@ async def _process_cached_descendants( self, parent_root: Bytes32, store: Store, - peer_id: PeerId, + peer_id: PeerId | None, ) -> int: """ Process any cached blocks that descend from the given parent. @@ -351,7 +351,7 @@ async def _process_cached_descendants( async def _cache_and_backfill( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index b0dcd7a3..2cd27244 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -356,7 +356,7 @@ async def on_peer_status(self, peer_id: PeerId, status: Status) -> None: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, ) -> None: """ Handle block received via gossip. diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index eebe910b..26fc3246 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -367,6 +367,7 @@ async def start_node( # This matches the Ethereum gossip specification. if validator_indices: from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT + for idx in validator_indices: subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 2409018a..19362504 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -145,9 +145,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # # - High new_atts, low known_atts = processing bottleneck # - Low counts everywhere = gossip not propagating - new_atts = [ - len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes - ] + new_atts = [len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes] known_atts = [ len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes ] diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index e0044c3e..982aa905 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -728,12 +728,13 @@ def test_other_intervals_do_not_trigger_aggregation(self, key_manager: XmssKeyMa # After tick, time becomes time+1, and interval = (time+1) % 5 # So we need time+1 % 5 == target_interval # Therefore time = target_interval - 1 (mod 5) - pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) - test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) - - updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) - - assert sig_key not in updated_store.latest_new_aggregated_payloads, ( f"Aggregation should NOT occur at interval {target_interval}" + pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) + test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) + + updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) + + assert sig_key not in updated_store.latest_new_aggregated_payloads, ( + f"Aggregation should NOT occur at interval {target_interval}" ) def test_interval_0_accepts_attestations_with_proposal( @@ -809,11 +810,12 @@ def test_gossip_to_aggregation_to_storage(self, key_manager: XmssKeyManager) -> sig_key = SignatureKey(vid, data_root) assert sig_key in store.gossip_signatures, f"Signature for {vid} should be stored" - # Step 2: Advance to interval 2 (aggregation interval) - store = store.model_copy(update={"time": Uint64(1)}) - store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) - - # Step 3: Verify aggregated proofs were created for vid in attesting_validators: + # Step 2: Advance to interval 2 (aggregation interval) + store = store.model_copy(update={"time": Uint64(1)}) + store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) + + # Step 3: Verify aggregated proofs were created + for vid in attesting_validators: sig_key = SignatureKey(vid, data_root) assert sig_key in store.latest_new_aggregated_payloads, ( f"Aggregated proof for {vid} should exist after interval 2" From 26b90350739b42eea1f507a95186e37db5ef1ea3 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 20:07:26 +0500 Subject: [PATCH 6/7] fix: use convergence-based polling in finalization tests Replace fixed 70s duration loops with convergence helpers: - assert_all_finalized_to: polls until finalization target reached - assert_heads_consistent: polls until head slots converge - assert_same_finalized_checkpoint: polls until nodes agree This fixes CI flakiness where slow machines cause nodes to diverge during the fixed wait period. Tests now exit early on success and tolerate slower environments via generous timeouts. --- tests/interop/test_multi_node.py | 158 +++++-------------------------- 1 file changed, 22 insertions(+), 136 deletions(-) diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 19362504..81d462a9 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -24,14 +24,15 @@ import asyncio import logging -import time import pytest from .helpers import ( NodeCluster, + assert_all_finalized_to, assert_heads_consistent, assert_peer_connections, + assert_same_finalized_checkpoint, full_mesh, mesh_2_2_2, ) @@ -44,7 +45,7 @@ pytestmark = pytest.mark.interop -@pytest.mark.timeout(120) +@pytest.mark.timeout(200) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: """ @@ -101,107 +102,29 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # The 15s timeout handles slow handshakes. await assert_peer_connections(node_cluster, min_peers=2, timeout=15) - # Let the chain run for a fixed duration. - # - # Timing calculation: - # - # - Slot duration: 4 seconds - # - Slots in 70s: ~17 slots - # - Finalization requires: 2 consecutive justified epochs - # - With 3 validators: justification needs 2/3 = 2 attestations per slot - # - # This duration allows enough time for validators to: - # - # 1. Produce blocks (one per slot, round-robin) - # 2. Broadcast attestations (all validators each slot) - # 3. Accumulate justification (2+ matching attestations) - # 4. Finalize (justified epoch becomes finalized) - run_duration = 70 - poll_interval = 5 - - logger.info("Running chain for %d seconds...", run_duration) - - # Poll the chain state periodically. - # - # This provides visibility into consensus progress during the test. - # The logged metrics help debug failures. - start = time.monotonic() - while time.monotonic() - start < run_duration: - # Collect current state from each node. - # - # Head slot: the highest slot block each node has seen. - # Finalized slot: the most recent finalized checkpoint slot. - # Justified slot: the most recent justified checkpoint slot. - slots = [node.head_slot for node in node_cluster.nodes] - finalized = [node.finalized_slot for node in node_cluster.nodes] - justified = [node.justified_slot for node in node_cluster.nodes] - - # Track attestation counts for debugging. - # - # New attestations: received but not yet processed by fork choice. - # Known attestations: already incorporated into the store. - # - # These counts reveal if gossip is working: - # - # - High new_atts, low known_atts = processing bottleneck - # - Low counts everywhere = gossip not propagating - new_atts = [len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes] - known_atts = [ - len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes - ] - - logger.info( - "Progress: head=%s justified=%s finalized=%s new_atts=%s known_atts=%s", - slots, - justified, - finalized, - new_atts, - known_atts, - ) - await asyncio.sleep(poll_interval) - - # Capture final state for assertions. - head_slots = [node.head_slot for node in node_cluster.nodes] - finalized_slots = [node.finalized_slot for node in node_cluster.nodes] - - logger.info("FINAL: head_slots=%s finalized=%s", head_slots, finalized_slots) - - # Verify the chain advanced sufficiently. + # Wait for finalization with convergence-based polling. # - # Minimum 5 slots ensures: + # Instead of a fixed duration, we actively poll for the target state. + # This is more robust under varying CI performance. # - # - Block production is working (at least 5 blocks created) - # - Gossip is propagating (all nodes see the same progress) - # - No single node is stuck or partitioned - assert all(slot >= 5 for slot in head_slots), ( - f"Chain did not advance enough. Head slots: {head_slots}" - ) + # Finalization requires 2 consecutive justified epochs. + # With 3 validators and 4s slots, this typically takes ~30s + # but may take longer on slow CI machines. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) - # Verify heads are consistent across nodes. + # Verify heads converged across nodes. # - # In a healthy network, all nodes should converge to similar head slots. - # A difference > 2 slots indicates gossip or fork choice issues. - head_diff = max(head_slots) - min(head_slots) - assert head_diff <= 2, f"Head slots diverged too much. Slots: {head_slots}, diff: {head_diff}" - - # Verify ALL nodes finalized. - # - # With 70s runtime (~17 slots) and working gossip, every node - # should have finalized at least one checkpoint. - assert all(slot > 0 for slot in finalized_slots), ( - f"Not all nodes finalized. Finalized slots: {finalized_slots}" - ) + # After finalization, all nodes should agree on head within 2 slots. + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) # Verify finalized checkpoints are consistent. # # All nodes must agree on the finalized checkpoint. # Finalization is irreversible - divergent finalization would be catastrophic. - assert len(set(finalized_slots)) == 1, ( - f"Finalized slots inconsistent across nodes: {finalized_slots}" - ) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) -@pytest.mark.timeout(120) +@pytest.mark.timeout(200) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: """ @@ -244,58 +167,21 @@ async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: # Using min_peers=1 ensures spokes pass the check. await assert_peer_connections(node_cluster, min_peers=1, timeout=15) - # Match Ream's 70 second test duration. + # Wait for finalization with convergence-based polling. # - # Finalization requires sufficient time for: - # - Multiple slots to pass (4s each) - # - Attestations to accumulate - # - Justification and finalization to occur - run_duration = 70 - poll_interval = 5 - - logger.info("Running chain for %d seconds (mesh_2_2_2)...", run_duration) + # Hub-and-spoke adds latency (messages route through hub) + # but the protocol should still achieve finalization. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) - # Poll chain progress. - start = time.monotonic() - while time.monotonic() - start < run_duration: - slots = [node.head_slot for node in node_cluster.nodes] - finalized = [node.finalized_slot for node in node_cluster.nodes] - logger.info("Progress: head_slots=%s finalized=%s", slots, finalized) - await asyncio.sleep(poll_interval) - - # Final state capture. - head_slots = [node.head_slot for node in node_cluster.nodes] - finalized_slots = [node.finalized_slot for node in node_cluster.nodes] - - logger.info("FINAL: head_slots=%s finalized=%s", head_slots, finalized_slots) - - # Same assertions as full mesh. - # - # Despite reduced connectivity (messages route through hub), - # the protocol should still achieve full consensus. - - # Chain must advance sufficiently. - assert all(slot >= 5 for slot in head_slots), ( - f"Chain did not advance enough. Head slots: {head_slots}" - ) - - # Heads must be consistent across nodes. + # Verify heads converged across nodes. # # Hub-and-spoke adds latency but should not cause divergence. - head_diff = max(head_slots) - min(head_slots) - assert head_diff <= 2, f"Head slots diverged too much. Slots: {head_slots}, diff: {head_diff}" - - # ALL nodes must finalize. - assert all(slot > 0 for slot in finalized_slots), ( - f"Not all nodes finalized. Finalized slots: {finalized_slots}" - ) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) # Finalized checkpoints must be identical. # # Even with indirect connectivity, finalization must be consistent. - assert len(set(finalized_slots)) == 1, ( - f"Finalized slots inconsistent across nodes: {finalized_slots}" - ) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) @pytest.mark.timeout(30) From 5ace58d948f310e5d3c7aa05ae046cc0b8a608fc Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 22:15:49 +0500 Subject: [PATCH 7/7] fix: increase timeouts for CI reliability in interop tests - Increase gossipsub mesh stabilization from 5s to 10s in start_all (CI machines need more time for mesh formation before block production) - Increase finalization timeout from 100s to 150s - Increase peer connection timeout from 15s to 30s - Increase pytest timeout from 200s to 300s The CI failure showed all 3 nodes stuck at finalized slot 0, indicating gossip mesh wasn't fully formed when services started. --- tests/interop/helpers/node_runner.py | 3 ++- tests/interop/test_multi_node.py | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 26fc3246..70c793a6 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -470,7 +470,8 @@ async def start_all( # 3. GRAFT messages to be sent and processed # # A longer delay ensures proper mesh formation before block production. - await asyncio.sleep(5.0) + # CI machines need more time due to lower CPU/scheduling priority. + await asyncio.sleep(10.0) # Phase 4: Start node services (validators, chain service, etc). # diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 81d462a9..52ef3538 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -45,7 +45,7 @@ pytestmark = pytest.mark.interop -@pytest.mark.timeout(200) +@pytest.mark.timeout(300) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: """ @@ -100,7 +100,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # Each node needs at least 2 peers (the other two nodes). # This ensures gossip will reach all nodes. # The 15s timeout handles slow handshakes. - await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + await assert_peer_connections(node_cluster, min_peers=2, timeout=30) # Wait for finalization with convergence-based polling. # @@ -110,7 +110,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # Finalization requires 2 consecutive justified epochs. # With 3 validators and 4s slots, this typically takes ~30s # but may take longer on slow CI machines. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=150) # Verify heads converged across nodes. # @@ -124,7 +124,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) -@pytest.mark.timeout(200) +@pytest.mark.timeout(300) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: """ @@ -165,13 +165,13 @@ async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: # # Hub (node 0) has 2 peers; spokes have 1 peer each. # Using min_peers=1 ensures spokes pass the check. - await assert_peer_connections(node_cluster, min_peers=1, timeout=15) + await assert_peer_connections(node_cluster, min_peers=1, timeout=30) # Wait for finalization with convergence-based polling. # # Hub-and-spoke adds latency (messages route through hub) # but the protocol should still achieve finalization. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=150) # Verify heads converged across nodes. #