diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3828494f..1065a575 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,7 +80,7 @@ jobs: interop-tests: name: Interop tests - Multi-node consensus - runs-on: ubuntu-latest + runs-on: macos-15 timeout-minutes: 10 steps: - name: Checkout leanSpec diff --git a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py index 0a0138b6..f48b74c1 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py +++ b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py @@ -235,7 +235,9 @@ def make_fixture(self) -> Self: # Time advancement may trigger slot boundaries. # At slot boundaries, pending attestations may become active. # Always act as aggregator to ensure gossip signatures are aggregated - store = store.on_tick(Uint64(step.time), has_proposal=False, is_aggregator=True) + store, _ = store.on_tick( + Uint64(step.time), has_proposal=False, is_aggregator=True + ) elif isinstance(step, BlockStep): # Build a complete signed block from the lightweight spec. @@ -264,7 +266,7 @@ def make_fixture(self) -> Self: # Always act as aggregator to ensure gossip signatures are aggregated slot_duration_seconds = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration_seconds - store = store.on_tick(block_time, has_proposal=True, is_aggregator=True) + store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True) # Process the block through Store. # This validates, applies state transition, and updates head. @@ -408,7 +410,7 @@ def _build_block_from_spec( # First, aggregate any gossip signatures into payloads # This ensures that signatures from previous blocks (like proposer attestations) # are available for extraction - aggregation_store = working_store.aggregate_committee_signatures() + aggregation_store, _ = working_store.aggregate_committee_signatures() # Now combine aggregated payloads from both sources aggregated_payloads = ( diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 17a86e20..16ab4d3f 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -125,9 +125,10 @@ async def run(self) -> None: # # This minimal service does not produce blocks. # Block production requires validator keys. - new_store = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) # Update sync service's store reference. @@ -137,6 +138,11 @@ async def run(self) -> None: # the updated time. self.sync_service.store = new_store + # Publish any new aggregated attestations produced this tick + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + logger.info( "Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d", self.clock.current_slot(), @@ -162,11 +168,18 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store + + # Publish any aggregated attestations produced during catch-up. + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + return self.clock.total_intervals() return None diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 7faefd9c..ba036278 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -494,7 +494,6 @@ def on_gossip_aggregated_attestation( new_attestation_data_by_root = dict(self.attestation_data_by_root) new_attestation_data_by_root[data_root] = data - store = self for vid in validator_ids: # Update Proof Map # @@ -503,7 +502,7 @@ def on_gossip_aggregated_attestation( new_aggregated_payloads.setdefault(key, []).append(proof) # Return store with updated aggregated payloads and attestation data - return store.model_copy( + return self.model_copy( update={ "latest_new_aggregated_payloads": new_aggregated_payloads, "attestation_data_by_root": new_attestation_data_by_root, @@ -862,28 +861,7 @@ def update_head(self) -> "Store": ) def accept_new_attestations(self) -> "Store": - """ - Process pending aggregated payloads and update forkchoice head. - - Moves aggregated payloads from latest_new_aggregated_payloads to - latest_known_aggregated_payloads, making them eligible to contribute to - fork choice weights. This migration happens at specific interval ticks. - - The Interval Tick System - ------------------------- - Aggregated payloads progress through intervals: - - Interval 0: Block proposal - - Interval 1: Validators cast attestations (enter "new") - - Interval 2: Aggregators create proofs & broadcast - - Interval 3: Safe target update - - Interval 4: Process accumulated attestations - - This staged progression ensures proper timing and prevents premature - influence on fork choice decisions. - - Returns: - New Store with migrated aggregated payloads and updated head. - """ + """Process pending aggregated payloads and update forkchoice head.""" # Merge new aggregated payloads into known aggregated payloads merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads) for sig_key, proofs in self.latest_new_aggregated_payloads.items(): @@ -943,7 +921,7 @@ def update_safe_target(self) -> "Store": return self.model_copy(update={"safe_target": safe_target}) - def aggregate_committee_signatures(self) -> "Store": + def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Aggregate committee signatures for attestations in committee_signatures. @@ -951,7 +929,7 @@ def aggregate_committee_signatures(self) -> "Store": Attestations are reconstructed from gossip_signatures using attestation_data_by_root. Returns: - New Store with updated latest_new_aggregated_payloads. + Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation). """ new_aggregated_payloads = dict(self.latest_new_aggregated_payloads) @@ -976,13 +954,14 @@ def aggregate_committee_signatures(self) -> "Store": committee_signatures, ) - # iterate to broadcast aggregated attestations + # Create list for broadcasting + new_aggregates: list[SignedAggregatedAttestation] = [] for aggregated_attestation, aggregated_signature in aggregated_results: - _ = SignedAggregatedAttestation( + agg = SignedAggregatedAttestation( data=aggregated_attestation.data, proof=aggregated_signature, ) - # Note: here we should broadcast the aggregated signature to committee_aggregators topic + new_aggregates.append(agg) # Compute new aggregated payloads new_gossip_sigs = dict(self.gossip_signatures) @@ -1004,9 +983,11 @@ def aggregate_committee_signatures(self) -> "Store": "latest_new_aggregated_payloads": new_aggregated_payloads, "gossip_signatures": new_gossip_sigs, } - ) + ), new_aggregates - def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def tick_interval( + self, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance store time by one interval and perform interval-specific actions. @@ -1048,11 +1029,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto is_aggregator: Whether the node is an aggregator. Returns: - New Store with advanced time and interval-specific updates applied. + Tuple of (new store with advanced time, list of new signed aggregated attestation). """ # Advance time by one interval store = self.model_copy(update={"time": self.time + Uint64(1)}) current_interval = store.time % INTERVALS_PER_SLOT + new_aggregates: list[SignedAggregatedAttestation] = [] if current_interval == Uint64(0): # Start of slot - process attestations if proposal exists @@ -1061,7 +1043,7 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto elif current_interval == Uint64(2): # Aggregation interval - aggregators create proofs if is_aggregator: - store = store.aggregate_committee_signatures() + store, new_aggregates = store.aggregate_committee_signatures() elif current_interval == Uint64(3): # Fast confirm - update safe target based on received proofs store = store.update_safe_target() @@ -1069,9 +1051,11 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto # End of slot - accept accumulated attestations store = store.accept_new_attestations() - return store + return store, new_aggregates - def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance forkchoice store time to given timestamp. @@ -1085,7 +1069,8 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) is_aggregator: Whether the node is an aggregator. Returns: - New Store with time advanced and all interval actions performed. + Tuple of (new store with time advanced, + list of all produced signed aggregated attestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) @@ -1093,14 +1078,16 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) # Tick forward one interval at a time store = self + all_new_aggregates: list[SignedAggregatedAttestation] = [] while store.time < tick_interval_time: # Check if proposal should be signaled for next interval should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time # Advance by one interval with appropriate signaling - store = store.tick_interval(should_signal_proposal, is_aggregator) + store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator) + all_new_aggregates.extend(new_aggregates) - return store + return store, all_new_aggregates def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: """ @@ -1128,7 +1115,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: slot_time = self.config.genesis_time + slot_duration_seconds # Advance time to current slot (ticking intervals) - store = self.on_tick(slot_time, True) + store, _ = self.on_tick(slot_time, True) # Process any pending attestations before proposal store = store.accept_new_attestations() @@ -1174,8 +1161,11 @@ def get_attestation_target(self) -> Checkpoint: # # This ensures the target doesn't advance too far ahead of safe target, # providing a balance between liveness and safety. + # + # We allow the target to be up to 1 slot ahead of the safe target + # to ensure the chain can actually start advancing from genesis. for _ in range(JUSTIFICATION_LOOKBACK_SLOTS): - if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot: + if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1): target_block_root = self.blocks[target_block_root].parent_root else: break @@ -1192,7 +1182,7 @@ def get_attestation_target(self) -> Checkpoint: # Create checkpoint from selected target block target_block = self.blocks[target_block_root] - return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot) + return Checkpoint(root=target_block_root, slot=target_block.slot) def produce_attestation_data(self, slot: Slot) -> AttestationData: """ @@ -1299,7 +1289,7 @@ def produce_block_with_signatures( # # The builder iteratively collects valid attestations. # It returns the final block, post-state, and signature proofs. - final_block, final_post_state, _, signatures = head_state.build_block( + final_block, final_post_state, collected_attestations, signatures = head_state.build_block( slot=slot, proposer_index=validator_index, parent_root=head_root, diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index 3a6e300f..593f11f2 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -106,7 +106,7 @@ from lean_spec.snappy import SnappyDecompressionError, frame_decompress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.config import ( GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12, @@ -130,6 +130,7 @@ ) from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.service.events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -325,7 +326,7 @@ def decode_message( self, topic_str: str, compressed_data: bytes, - ) -> SignedBlockWithAttestation | SignedAttestation | None: + ) -> SignedBlockWithAttestation | SignedAttestation | SignedAggregatedAttestation | None: """ Decode a gossip message from topic and compressed data. @@ -393,6 +394,8 @@ def decode_message( return SignedBlockWithAttestation.decode_bytes(ssz_bytes) case TopicKind.ATTESTATION_SUBNET: return SignedAttestation.decode_bytes(ssz_bytes) + case TopicKind.AGGREGATED_ATTESTATION: + return SignedAggregatedAttestation.decode_bytes(ssz_bytes) except SSZSerializationError as e: raise GossipMessageError(f"SSZ decode failed: {e}") from e @@ -816,6 +819,9 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None: case TopicKind.ATTESTATION_SUBNET: if isinstance(message, SignedAttestation): await self._emit_gossip_attestation(message, event.peer_id) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, event.peer_id) logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id) @@ -1168,6 +1174,25 @@ async def _emit_gossip_attestation( GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic) ) + async def _emit_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, + ) -> None: + """ + Emit a gossip aggregated attestation event. + + Args: + signed_attestation: Aggregated attestation received from gossip. + peer_id: Peer that sent it. + """ + topic = GossipTopic(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=self._fork_digest) + await self._events.put( + GossipAggregatedAttestationEvent( + signed_attestation=signed_attestation, peer_id=peer_id, topic=topic + ) + ) + async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None: """ Accept incoming streams from a connection. @@ -1464,6 +1489,15 @@ async def _handle_gossip_stream(self, peer_id: PeerId, stream: Stream) -> None: # Type mismatch indicates a bug in decode_message. logger.warning("Attestation topic but got %s", type(message).__name__) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, peer_id) + else: + logger.warning( + "Aggregated attestation topic but got %s", + type(message).__name__, + ) + logger.debug("Received gossip %s from %s", topic.kind.value, peer_id) except GossipMessageError as e: diff --git a/src/lean_spec/subspecs/networking/service/events.py b/src/lean_spec/subspecs/networking/service/events.py index 4a18eeb6..9095cfca 100644 --- a/src/lean_spec/subspecs/networking/service/events.py +++ b/src/lean_spec/subspecs/networking/service/events.py @@ -24,7 +24,7 @@ from dataclasses import dataclass from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.transport import PeerId @@ -67,6 +67,25 @@ class GossipAttestationEvent: """Topic the attestation was received on (includes fork digest).""" +@dataclass(frozen=True, slots=True) +class GossipAggregatedAttestationEvent: + """ + Aggregated attestation received via gossip subscription. + + Fired when a signed aggregated attestation arrives from the gossipsub network. + Aggregates contain multiple validator votes combined into a single proof. + """ + + signed_attestation: SignedAggregatedAttestation + """The signed aggregated attestation.""" + + peer_id: PeerId + """Peer that propagated this aggregated attestation to us.""" + + topic: GossipTopic + """Topic the aggregated attestation was received on.""" + + @dataclass(frozen=True, slots=True) class PeerStatusEvent: """ @@ -113,6 +132,7 @@ class PeerDisconnectedEvent: NetworkEvent = ( GossipBlockEvent | GossipAttestationEvent + | GossipAggregatedAttestationEvent | PeerStatusEvent | PeerConnectedEvent | PeerDisconnectedEvent diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 57f8fb1a..80eb8f7a 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -28,13 +28,14 @@ from lean_spec.snappy import frame_compress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.peer.info import PeerInfo from lean_spec.subspecs.networking.types import ConnectionState from .events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -146,14 +147,22 @@ async def _handle_event(self, event: NetworkEvent) -> None: ) await self.sync_service.on_gossip_block(block, peer_id) - case GossipAttestationEvent(attestation=attestation, peer_id=peer_id): + case GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic): # # SyncService will validate signature and update forkchoice. await self.sync_service.on_gossip_attestation( attestation=attestation, + subnet_id=topic.subnet_id or 0, peer_id=peer_id, ) + case GossipAggregatedAttestationEvent(signed_attestation=att, peer_id=peer_id): + # Route aggregated attestations to sync service. + # + # Aggregates contain multiple validator votes and are used + # to advance justification and finalization. + await self.sync_service.on_gossip_aggregated_attestation(att, peer_id) + case PeerStatusEvent(peer_id=peer_id, status=status): # Route peer status updates to sync service. # @@ -234,3 +243,19 @@ async def publish_attestation(self, attestation: SignedAttestation, subnet_id: i await self.event_source.publish(str(topic), compressed) logger.debug("Published attestation for slot %s", attestation.message.slot) + + async def publish_aggregated_attestation( + self, signed_attestation: SignedAggregatedAttestation + ) -> None: + """ + Publish an aggregated attestation to the aggregation gossip topic. + + Args: + signed_attestation: Aggregated attestation to publish. + """ + topic = GossipTopic.committee_aggregation(self.fork_digest) + ssz_bytes = signed_attestation.encode_bytes() + compressed = frame_compress(ssz_bytes) + + await self.event_source.publish(str(topic), compressed) + logger.debug("Published aggregated attestation for slot %s", signed_attestation.data.slot) diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 55bc09c7..ceb979ab 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -21,7 +21,7 @@ from lean_spec.subspecs.chain import SlotClock from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT from lean_spec.subspecs.chain.service import ChainService -from lean_spec.subspecs.containers import Block, BlockBody, State +from lean_spec.subspecs.containers import Block, BlockBody, SignedBlockWithAttestation, State from lean_spec.subspecs.containers.attestation import SignedAttestation from lean_spec.subspecs.containers.block.types import AggregatedAttestations from lean_spec.subspecs.containers.slot import Slot @@ -238,6 +238,12 @@ def from_genesis(cls, config: NodeConfig) -> Node: is_aggregator=config.is_aggregator, ) + # Wire up aggregated attestation publishing. + # + # SyncService delegates aggregate publishing to NetworkService + # via a callback, avoiding a circular dependency. + sync_service._publish_agg_fn = network_service.publish_aggregated_attestation + # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: @@ -260,12 +266,19 @@ def from_genesis(cls, config: NodeConfig) -> Node: async def publish_attestation_wrapper(attestation: SignedAttestation) -> None: subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) await network_service.publish_attestation(attestation, subnet_id) + # Also route locally so we can aggregate our own attestation + await sync_service.on_gossip_attestation(attestation, subnet_id, peer_id=None) + + async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None: + await network_service.publish_block(block) + # Also route locally so we update our own store + await sync_service.on_gossip_block(block, peer_id=None) validator_service = ValidatorService( sync_service=sync_service, clock=clock, registry=config.validator_registry, - on_block=network_service.publish_block, + on_block=publish_block_wrapper, on_attestation=publish_attestation_wrapper, ) diff --git a/src/lean_spec/subspecs/sync/block_cache.py b/src/lean_spec/subspecs/sync/block_cache.py index 2b8de0b0..fc803df7 100644 --- a/src/lean_spec/subspecs/sync/block_cache.py +++ b/src/lean_spec/subspecs/sync/block_cache.py @@ -103,7 +103,7 @@ class PendingBlock: slot order to ensure parents are processed before children. """ - received_from: PeerId + received_from: PeerId | None """ Peer that sent this block. @@ -112,6 +112,7 @@ class PendingBlock: - If invalid, they get penalized. This creates incentives for good behavior. + None for self-produced blocks. """ received_at: float = field(default_factory=time) @@ -165,7 +166,7 @@ def __contains__(self, root: Bytes32) -> bool: def add( self, block: SignedBlockWithAttestation, - peer: PeerId, + peer: PeerId | None, backfill_depth: int = 0, ) -> PendingBlock: """ diff --git a/src/lean_spec/subspecs/sync/head_sync.py b/src/lean_spec/subspecs/sync/head_sync.py index b14d0592..96610c9e 100644 --- a/src/lean_spec/subspecs/sync/head_sync.py +++ b/src/lean_spec/subspecs/sync/head_sync.py @@ -143,7 +143,7 @@ class HeadSync: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -217,7 +217,7 @@ async def on_gossip_block( async def _process_block_with_descendants( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -283,7 +283,7 @@ async def _process_cached_descendants( self, parent_root: Bytes32, store: Store, - peer_id: PeerId, + peer_id: PeerId | None, ) -> int: """ Process any cached blocks that descend from the given parent. @@ -351,7 +351,7 @@ async def _process_cached_descendants( async def _cache_and_backfill( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 299791f6..3b74d42f 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -38,14 +38,15 @@ import asyncio import logging -from collections.abc import Callable +from collections.abc import Callable, Coroutine from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock from lean_spec.subspecs.containers import ( Block, + SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, ) @@ -67,6 +68,8 @@ BlockProcessor = Callable[[Store, SignedBlockWithAttestation], Store] +PublishAggFn = Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]] + def default_block_processor( store: Store, @@ -76,6 +79,10 @@ def default_block_processor( return store.on_block(block) +async def _noop_publish_agg(signed_attestation: SignedAggregatedAttestation) -> None: + """No-op default for aggregated attestation publishing.""" + + @dataclass(slots=True) class SyncProgress: """ @@ -156,6 +163,9 @@ class SyncService: process_block: BlockProcessor = field(default=default_block_processor) """Block processor function. Defaults to Store.on_block().""" + _publish_agg_fn: PublishAggFn = field(default=_noop_publish_agg) + """Callback for publishing aggregated attestations to the network.""" + _state: SyncState = field(default=SyncState.IDLE) """Current sync state.""" @@ -346,7 +356,7 @@ async def on_peer_status(self, peer_id: PeerId, status: Status) -> None: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, ) -> None: """ Handle block received via gossip. @@ -402,7 +412,8 @@ async def on_gossip_block( async def on_gossip_attestation( self, attestation: SignedAttestation, - peer_id: PeerId, # noqa: ARG002 + subnet_id: int, + peer_id: PeerId | None = None, ) -> None: """ Handle attestation received via gossip. @@ -416,7 +427,8 @@ async def on_gossip_attestation( Args: attestation: The signed attestation received. - peer_id: The peer that propagated the attestation (unused for now). + subnet_id: Subnet ID the attestation was received on. + peer_id: The peer that propagated the attestation (optional). """ # Guard: Only process gossip in states that accept it. # @@ -452,6 +464,45 @@ async def on_gossip_attestation( # These are expected during normal operation and don't indicate bugs. pass + async def on_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, # noqa: ARG002 + ) -> None: + """ + Handle aggregated attestation received via gossip. + + Aggregated attestations are collections of individual votes for the same + target, signed by an aggregator. They provide efficient propagation of + consensus weight. + + Args: + signed_attestation: The signed aggregated attestation received. + peer_id: The peer that propagated the aggregate (unused for now). + """ + if not self._state.accepts_gossip: + return + + try: + self.store = self.store.on_gossip_aggregated_attestation(signed_attestation) + except (AssertionError, KeyError): + # Aggregation validation failed. + pass + + async def publish_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + ) -> None: + """ + Publish an aggregated attestation to the network. + + Called by the chain service when this node acts as an aggregator. + + Args: + signed_attestation: The aggregate to publish. + """ + await self._publish_agg_fn(signed_attestation) + async def start_sync(self) -> None: """ Start or resume synchronization. diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index 217ed724..e01f3c01 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -323,7 +323,10 @@ async def _produce_attestations(self, slot: Slot) -> None: Args: slot: Current slot number. """ + # Ensure we are attesting to the latest known head + self.sync_service.store = self.sync_service.store.update_head() store = self.sync_service.store + head_state = store.states.get(store.head) if head_state is None: return diff --git a/tests/consensus/devnet/fc/test_attestation_target_selection.py b/tests/consensus/devnet/fc/test_attestation_target_selection.py index 8044a7fa..c4f6c994 100644 --- a/tests/consensus/devnet/fc/test_attestation_target_selection.py +++ b/tests/consensus/devnet/fc/test_attestation_target_selection.py @@ -17,24 +17,21 @@ def test_attestation_target_at_genesis_initially( fork_choice_test: ForkChoiceTestFiller, ) -> None: """ - Attestation target starts at genesis before safe target updates. + Attestation target starts near genesis before safe target updates. Scenario -------- Process two blocks at slots 1 and 2. Expected: - - After slot 1: target = slot 0 (genesis/finalized) - - After slot 2: target = slot 0 (genesis/finalized) - - Target root automatically validated against block at slot 0 + - After slot 1: target = slot 1 (1 slot ahead of safe target allowed) + - After slot 2: target = slot 1 (walkback stops at safe_target + 1) Why This Matters ---------------- - Initially, the safe target is at genesis (slot 0), so the attestation - target walks back from head to genesis. - - This conservative behavior ensures validators don't attest too far ahead - before there's sufficient attestation weight to advance the safe target. + Initially, the safe target is at genesis (slot 0). The attestation target + is allowed up to 1 slot ahead of safe target to ensure the chain can + start advancing from genesis. """ fork_choice_test( steps=[ @@ -42,14 +39,14 @@ def test_attestation_target_at_genesis_initially( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), ], @@ -87,35 +84,35 @@ def test_attestation_target_advances_with_attestations( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # 1 slot ahead of safe target allowed ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # Walks back to safe_target + 1 ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # Walks back to safe_target + 1 ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # Advances to slot 1 + attestation_target_slot=Slot(1), # 3-step walkback from 4 → 1 ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # Continues advancing + attestation_target_slot=Slot(2), # 3-step walkback from 5 → 2 ), ), ], @@ -150,21 +147,21 @@ def test_attestation_target_with_slot_gaps( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), # Walks back 5→3→1, stops at safe_target+1 ), ), ], @@ -201,56 +198,56 @@ def test_attestation_target_with_extended_chain( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # Advances to slot 1 + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # Stable at 2 + attestation_target_slot=Slot(2), ), ), BlockStep( block=BlockSpec(slot=Slot(6)), checks=StoreChecks( head_slot=Slot(6), - attestation_target_slot=Slot(3), # Continues to advance + attestation_target_slot=Slot(3), ), ), BlockStep( block=BlockSpec(slot=Slot(7)), checks=StoreChecks( head_slot=Slot(7), - attestation_target_slot=Slot(4), # Continues advancing + attestation_target_slot=Slot(4), ), ), BlockStep( block=BlockSpec(slot=Slot(8)), checks=StoreChecks( head_slot=Slot(8), - attestation_target_slot=Slot(5), # Continues advancing + attestation_target_slot=Slot(5), ), ), ], @@ -296,39 +293,39 @@ def test_attestation_target_justifiable_constraint( head_slot=Slot(i), attestation_target_slot=Slot( # Mapping of current slot -> expected target slot - # delta = current_slot - JUSTIFICATION_LOOKBACK_SLOTS - finalized_slot - # delta = current_slot - 3 - 0 + # With +1 allowance: walkback stops at safe_target + 1 + # delta = target_slot - finalized_slot { - 1: 0, # 3-slot walkback reaches safe target at slot 0 - 2: 0, # 3-slot walkback reaches safe target at slot 0 - 3: 0, # 3-slot walkback reaches safe target at slot 0 - 4: 1, # delta = 4 - 3 - 0 = 1, Rule 1: delta 1 ≤ 5 - 5: 2, # delta = 5 - 3 - 0 = 2, Rule 1: delta 2 ≤ 5 - 6: 3, # delta = 6 - 3 - 0 = 3, Rule 1: delta 3 ≤ 5 - 7: 4, # delta = 7 - 3 - 0 = 4, Rule 1: delta 4 ≤ 5 - 8: 5, # delta = 8 - 3 - 0 = 5, Rule 1: delta 5 ≤ 5 - 9: 6, # delta = 6 - 0 = 6, Rule 3: pronic number (2*3) - 10: 6, # delta = 10 - 3 - 0 = 7 - 11: 6, # delta = 11 - 3 - 0 = 8 - 12: 9, # delta = 9 - 0 = 9, Rule 2: perfect square (3^2) - 13: 9, # delta = 13 - 3 - 0 = 10 - 14: 9, # delta = 14 - 3 - 0 = 11 - 15: 12, # delta = 15 - 3 - 0 = 12, Rule 3: pronic number (3*4) - 16: 12, # delta = 16 - 3 - 0 = 13 - 17: 12, # delta = 17 - 3 - 0 = 14 - 18: 12, # delta = 18 - 3 - 0 = 15 - 19: 16, # delta = 19 - 3 - 0 = 16, Rule 2: perfect square (4^2) - 20: 16, # delta = 20 - 3 - 0 = 17 - 21: 16, # delta = 21 - 3 - 0 = 18 - 22: 16, # delta = 22 - 3 - 0 = 19 - 23: 20, # delta = 23 - 3 - 0 = 20, Rule 3: pronic number (4*5) - 24: 20, # delta = 24 - 3 - 0 = 21 - 25: 20, # delta = 25 - 3 - 0 = 22 - 26: 20, # delta = 26 - 3 - 0 = 23 - 27: 20, # delta = 27 - 3 - 0 = 24 - 28: 25, # delta = 28 - 3 - 0 = 25, Rule 2: perfect square (5^2) - 29: 25, # delta = 29 - 3 - 0 = 26 - 30: 25, # delta = 30 - 3 - 0 = 27 + 1: 1, # At safe_target + 1, no walkback needed + 2: 1, # Walks back to safe_target + 1 + 3: 1, # Walks back to safe_target + 1 + 4: 1, # 3-step walkback from 4 → 1 + 5: 2, # 3-step walkback from 5 → 2, delta 2 ≤ 5 + 6: 3, # 3-step walkback from 6 → 3, delta 3 ≤ 5 + 7: 4, # 3-step walkback from 7 → 4, delta 4 ≤ 5 + 8: 5, # 3-step walkback from 8 → 5, delta 5 ≤ 5 + 9: 6, # delta = 6, pronic number (2*3) + 10: 6, # delta = 7, not justifiable → walks to 6 + 11: 6, # delta = 8, not justifiable → walks to 6 + 12: 9, # delta = 9, perfect square (3^2) + 13: 9, # delta = 10, not justifiable → walks to 9 + 14: 9, # delta = 11, not justifiable → walks to 9 + 15: 12, # delta = 12, pronic number (3*4) + 16: 12, # delta = 13, not justifiable → walks to 12 + 17: 12, # delta = 14, not justifiable → walks to 12 + 18: 12, # delta = 15, not justifiable → walks to 12 + 19: 16, # delta = 16, perfect square (4^2) + 20: 16, # delta = 17, not justifiable → walks to 16 + 21: 16, # delta = 18, not justifiable → walks to 16 + 22: 16, # delta = 19, not justifiable → walks to 16 + 23: 20, # delta = 20, pronic number (4*5) + 24: 20, # delta = 21, not justifiable → walks to 20 + 25: 20, # delta = 22, not justifiable → walks to 20 + 26: 20, # delta = 23, not justifiable → walks to 20 + 27: 20, # delta = 24, not justifiable → walks to 20 + 28: 25, # delta = 25, perfect square (5^2) + 29: 25, # delta = 26, not justifiable → walks to 25 + 30: 25, # delta = 27, not justifiable → walks to 25 }[i] ), ), diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 71bc288a..3b92ff68 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -12,6 +12,7 @@ from dataclasses import dataclass, field from typing import cast +from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT from lean_spec.subspecs.containers import Checkpoint, Validator from lean_spec.subspecs.containers.state import Validators from lean_spec.subspecs.containers.validator import ValidatorIndex @@ -234,6 +235,7 @@ async def start_node( self, node_index: int, validator_indices: list[int] | None = None, + is_aggregator: bool = False, bootnodes: list[str] | None = None, *, start_services: bool = True, @@ -244,6 +246,7 @@ async def start_node( Args: node_index: Index for this node (for logging/identification). validator_indices: Which validators this node controls. + is_aggregator: Whether this node is aggregator bootnodes: Addresses to connect to on startup. start_services: If True, start the node's services immediately. If False, call test_node.start() manually after mesh is stable. @@ -282,6 +285,7 @@ async def start_node( api_config=None, # Disable API server for interop tests (not needed for P2P testing) validator_registry=validator_registry, fork_digest=self.fork_digest, + is_aggregator=is_aggregator, ) node = Node.from_genesis(config) @@ -350,9 +354,21 @@ async def start_node( await event_source.start_gossipsub() block_topic = f"/leanconsensus/{self.fork_digest}/block/ssz_snappy" - attestation_topic = f"/leanconsensus/{self.fork_digest}/attestation/ssz_snappy" + aggregation_topic = f"/leanconsensus/{self.fork_digest}/aggregation/ssz_snappy" event_source.subscribe_gossip_topic(block_topic) - event_source.subscribe_gossip_topic(attestation_topic) + event_source.subscribe_gossip_topic(aggregation_topic) + + # Determine subnets for our validators and subscribe. + # + # Validators only subscribe to the subnets they are assigned to. + # This matches the Ethereum gossip specification. + if validator_indices: + from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT + + for idx in validator_indices: + subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) + topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" + event_source.subscribe_gossip_topic(topic) # Optionally start the node's services. # @@ -408,9 +424,20 @@ async def start_all( # This allows the gossipsub mesh to form before validators start # producing blocks and attestations. Otherwise, early blocks/attestations # would be "Published message to 0 peers" because the mesh is empty. + aggregator_indices = set(range(int(ATTESTATION_COMMITTEE_COUNT))) for i in range(num_nodes): validator_indices = validators_per_node[i] if i < len(validators_per_node) else [] - await self.start_node(i, validator_indices, start_services=False) + + # A node is an aggregator if it controls any of the first + # ATTESTATION_COMMITTEE_COUNT validators. + is_node_aggregator = any(vid in aggregator_indices for vid in validator_indices) + + await self.start_node( + i, + validator_indices, + is_aggregator=is_node_aggregator, + start_services=False, + ) # Stagger node startup like Ream does. # diff --git a/tests/interop/test_late_joiner.py b/tests/interop/test_late_joiner.py index 343a1f63..53c7c919 100644 --- a/tests/interop/test_late_joiner.py +++ b/tests/interop/test_late_joiner.py @@ -24,7 +24,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(240) @pytest.mark.num_validators(3) async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: @@ -36,7 +35,7 @@ async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: """ validators_per_node = [[0], [1], [2]] - await node_cluster.start_node(0, validators_per_node[0]) + await node_cluster.start_node(0, validators_per_node[0], is_aggregator=True) await node_cluster.start_node(1, validators_per_node[1]) node0 = node_cluster.nodes[0] diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index d2edcfe5..19362504 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -44,7 +44,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: @@ -146,8 +145,10 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # # - High new_atts, low known_atts = processing bottleneck # - Low counts everywhere = gossip not propagating - new_atts = [len(node._store.latest_new_attestations) for node in node_cluster.nodes] - known_atts = [len(node._store.latest_known_attestations) for node in node_cluster.nodes] + new_atts = [len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes] + known_atts = [ + len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes + ] logger.info( "Progress: head=%s justified=%s finalized=%s new_atts=%s known_atts=%s", @@ -200,7 +201,6 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: ) -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: diff --git a/tests/lean_spec/helpers/builders.py b/tests/lean_spec/helpers/builders.py index ae557bfc..594aae54 100644 --- a/tests/lean_spec/helpers/builders.py +++ b/tests/lean_spec/helpers/builders.py @@ -520,7 +520,7 @@ def make_signed_block_from_store( slot_duration = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration - advanced_store = store.on_tick(block_time, has_proposal=True) + advanced_store, _ = store.on_tick(block_time, has_proposal=True) return advanced_store, signed_block diff --git a/tests/lean_spec/subspecs/chain/test_service.py b/tests/lean_spec/subspecs/chain/test_service.py index 8e2e9ac4..1406bf26 100644 --- a/tests/lean_spec/subspecs/chain/test_service.py +++ b/tests/lean_spec/subspecs/chain/test_service.py @@ -28,7 +28,9 @@ class MockStore: head: Bytes32 = field(default_factory=lambda: ZERO_HASH) latest_finalized: MockCheckpoint = field(default_factory=MockCheckpoint) - def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple[MockStore, list]: """Record the tick call and return a new store.""" new_store = MockStore( time=time, @@ -37,7 +39,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: latest_finalized=self.latest_finalized, ) new_store.tick_calls.append((time, has_proposal)) - return new_store + return new_store, [] @dataclass @@ -45,6 +47,7 @@ class MockSyncService: """Mock sync service for testing ChainService.""" store: MockStore = field(default_factory=MockStore) + is_aggregator: bool = False class TestChainServiceLifecycle: diff --git a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py index 2cbc306f..c85721c8 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py +++ b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py @@ -163,7 +163,7 @@ def test_safe_target_requires_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target (uses latest_new_aggregated_payloads) store = store.update_safe_target() @@ -207,7 +207,7 @@ def test_safe_target_advances_with_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target store = store.update_safe_target() @@ -247,7 +247,7 @@ def test_update_safe_target_uses_new_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate into new payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target should use new aggregated payloads store = store.update_safe_target() @@ -301,7 +301,7 @@ def test_justification_with_supermajority_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate signatures before producing the next block - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Produce block 2 which includes these attestations store, block_2, signatures = store.produce_block_with_signatures(slot_2, proposer_2) @@ -382,7 +382,7 @@ def test_justification_tracking_with_multiple_targets( ) store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() store = store.update_safe_target() # Neither target should be justified with only half validators @@ -527,7 +527,7 @@ def test_full_attestation_cycle( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Phase 3: Aggregate signatures into payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Phase 4: Update safe target store = store.update_safe_target() @@ -589,7 +589,7 @@ def test_attestation_target_after_on_block( # Process block via on_block on a fresh consumer store consumer_store = observer_store block_time = consumer_store.config.genesis_time + block.slot * Uint64(SECONDS_PER_SLOT) - consumer_store = consumer_store.on_tick(block_time, has_proposal=True) + consumer_store, _ = consumer_store.on_tick(block_time, has_proposal=True) consumer_store = consumer_store.on_block(signed_block) # Get attestation target after on_block diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index f02f73d4..982aa905 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -509,7 +509,7 @@ def test_aggregates_gossip_signatures_into_proof(self, key_manager: XmssKeyManag ) # Perform aggregation - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify proofs were created and stored data_root = attestation_data.data_root_bytes() @@ -537,7 +537,7 @@ def test_aggregated_proof_is_valid(self, key_manager: XmssKeyManager) -> None: attesting_validators=attesting_validators, ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() data_root = attestation_data.data_root_bytes() sig_key = SignatureKey(ValidatorIndex(1), data_root) @@ -567,7 +567,7 @@ def test_empty_gossip_signatures_produces_no_proofs(self, key_manager: XmssKeyMa attesting_validators=[], # No attesters ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify no proofs were created assert len(updated_store.latest_new_aggregated_payloads) == 0 @@ -619,7 +619,7 @@ def test_multiple_attestation_data_grouped_separately( } ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify both validators have separate proofs sig_key_1 = SignatureKey(ValidatorIndex(1), data_root_1) @@ -661,7 +661,7 @@ def test_interval_2_triggers_aggregation_for_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) # Verify aggregation was performed data_root = attestation_data.data_root_bytes() @@ -692,7 +692,7 @@ def test_interval_2_skips_aggregation_for_non_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as NON-aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=False) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=False) # Verify aggregation was NOT performed data_root = attestation_data.data_root_bytes() @@ -731,7 +731,7 @@ def test_other_intervals_do_not_trigger_aggregation(self, key_manager: XmssKeyMa pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) - updated_store = test_store.tick_interval(has_proposal=False, is_aggregator=True) + updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) assert sig_key not in updated_store.latest_new_aggregated_payloads, ( f"Aggregation should NOT occur at interval {target_interval}" @@ -754,7 +754,7 @@ def test_interval_0_accepts_attestations_with_proposal( store = store.model_copy(update={"time": Uint64(4)}) # Tick to interval 0 with proposal - updated_store = store.tick_interval(has_proposal=True, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=True, is_aggregator=True) # Verify time advanced assert updated_store.time == Uint64(5) @@ -812,7 +812,7 @@ def test_gossip_to_aggregation_to_storage(self, key_manager: XmssKeyManager) -> # Step 2: Advance to interval 2 (aggregation interval) store = store.model_copy(update={"time": Uint64(1)}) - store = store.tick_interval(has_proposal=False, is_aggregator=True) + store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) # Step 3: Verify aggregated proofs were created for vid in attesting_validators: diff --git a/tests/lean_spec/subspecs/forkchoice/test_time_management.py b/tests/lean_spec/subspecs/forkchoice/test_time_management.py index 36df3b51..c80eafb3 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_time_management.py +++ b/tests/lean_spec/subspecs/forkchoice/test_time_management.py @@ -59,7 +59,7 @@ def test_on_tick_basic(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(200) # Much later time - sample_store = sample_store.on_tick(target_time, has_proposal=True) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=True) # Time should advance assert sample_store.time > initial_time @@ -69,7 +69,7 @@ def test_on_tick_no_proposal(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(100) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Time should still advance assert sample_store.time >= initial_time @@ -80,7 +80,7 @@ def test_on_tick_already_current(self, sample_store: Store) -> None: current_target = sample_store.config.genesis_time + initial_time # Try to advance to current time (should be no-op) - sample_store = sample_store.on_tick(current_target, has_proposal=True) + sample_store, _ = sample_store.on_tick(current_target, has_proposal=True) # Should not change significantly (time can only increase) # Tolerance increased for 5-interval per slot system @@ -91,7 +91,7 @@ def test_on_tick_small_increment(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + initial_time + Uint64(1) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Should advance by small amount assert sample_store.time >= initial_time @@ -105,7 +105,7 @@ def test_tick_interval_basic(self, sample_store: Store) -> None: initial_time = sample_store.time # Tick one interval forward - sample_store = sample_store.tick_interval(has_proposal=False) + sample_store, _ = sample_store.tick_interval(has_proposal=False) # Time should advance by one interval assert sample_store.time == initial_time + Uint64(1) @@ -114,7 +114,7 @@ def test_tick_interval_with_proposal(self, sample_store: Store) -> None: """Test interval ticking with proposal.""" initial_time = sample_store.time - sample_store = sample_store.tick_interval(has_proposal=True) + sample_store, _ = sample_store.tick_interval(has_proposal=True) # Time should advance assert sample_store.time == initial_time + Uint64(1) @@ -125,7 +125,7 @@ def test_tick_interval_sequence(self, sample_store: Store) -> None: # Tick multiple intervals for i in range(5): - sample_store = sample_store.tick_interval(has_proposal=(i % 2 == 0)) + sample_store, _ = sample_store.tick_interval(has_proposal=(i % 2 == 0)) # Should have advanced by 5 intervals assert sample_store.time == initial_time + Uint64(5) @@ -139,7 +139,7 @@ def test_tick_interval_actions_by_phase(self, sample_store: Store) -> None: # Tick through a complete slot cycle for interval in range(INTERVALS_PER_SLOT): has_proposal = interval == 0 # Proposal only in first interval - sample_store = sample_store.tick_interval(has_proposal=has_proposal) + sample_store, _ = sample_store.tick_interval(has_proposal=has_proposal) current_interval = sample_store.time % INTERVALS_PER_SLOT expected_interval = Uint64((interval + 1)) % INTERVALS_PER_SLOT