Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:

interop-tests:
name: Interop tests - Multi-node consensus
runs-on: ubuntu-latest
runs-on: macos-15
timeout-minutes: 10
steps:
- name: Checkout leanSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def make_fixture(self) -> Self:
# Time advancement may trigger slot boundaries.
# At slot boundaries, pending attestations may become active.
# Always act as aggregator to ensure gossip signatures are aggregated
store = store.on_tick(Uint64(step.time), has_proposal=False, is_aggregator=True)
store, _ = store.on_tick(
Uint64(step.time), has_proposal=False, is_aggregator=True
)

elif isinstance(step, BlockStep):
# Build a complete signed block from the lightweight spec.
Expand Down Expand Up @@ -264,7 +266,7 @@ def make_fixture(self) -> Self:
# Always act as aggregator to ensure gossip signatures are aggregated
slot_duration_seconds = block.slot * SECONDS_PER_SLOT
block_time = store.config.genesis_time + slot_duration_seconds
store = store.on_tick(block_time, has_proposal=True, is_aggregator=True)
store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True)

# Process the block through Store.
# This validates, applies state transition, and updates head.
Expand Down Expand Up @@ -408,7 +410,7 @@ def _build_block_from_spec(
# First, aggregate any gossip signatures into payloads
# This ensures that signatures from previous blocks (like proposer attestations)
# are available for extraction
aggregation_store = working_store.aggregate_committee_signatures()
aggregation_store, _ = working_store.aggregate_committee_signatures()

# Now combine aggregated payloads from both sources
aggregated_payloads = (
Expand Down
17 changes: 15 additions & 2 deletions src/lean_spec/subspecs/chain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ async def run(self) -> None:
#
# This minimal service does not produce blocks.
# Block production requires validator keys.
new_store = self.sync_service.store.on_tick(
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)

# Update sync service's store reference.
Expand All @@ -137,6 +138,11 @@ async def run(self) -> None:
# the updated time.
self.sync_service.store = new_store

# Publish any new aggregated attestations produced this tick
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

logger.info(
"Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d",
self.clock.current_slot(),
Expand All @@ -162,11 +168,18 @@ async def _initial_tick(self) -> Interval | None:

# Only tick if we're past genesis.
if current_time >= self.clock.genesis_time:
new_store = self.sync_service.store.on_tick(
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
self.sync_service.store = new_store

# Publish any aggregated attestations produced during catch-up.
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

return self.clock.total_intervals()

return None
Expand Down
74 changes: 32 additions & 42 deletions src/lean_spec/subspecs/forkchoice/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ def on_gossip_aggregated_attestation(
new_attestation_data_by_root = dict(self.attestation_data_by_root)
new_attestation_data_by_root[data_root] = data

store = self
for vid in validator_ids:
# Update Proof Map
#
Expand All @@ -503,7 +502,7 @@ def on_gossip_aggregated_attestation(
new_aggregated_payloads.setdefault(key, []).append(proof)

# Return store with updated aggregated payloads and attestation data
return store.model_copy(
return self.model_copy(
update={
"latest_new_aggregated_payloads": new_aggregated_payloads,
"attestation_data_by_root": new_attestation_data_by_root,
Expand Down Expand Up @@ -862,28 +861,7 @@ def update_head(self) -> "Store":
)

def accept_new_attestations(self) -> "Store":
"""
Process pending aggregated payloads and update forkchoice head.

Moves aggregated payloads from latest_new_aggregated_payloads to
latest_known_aggregated_payloads, making them eligible to contribute to
fork choice weights. This migration happens at specific interval ticks.

The Interval Tick System
-------------------------
Aggregated payloads progress through intervals:
- Interval 0: Block proposal
- Interval 1: Validators cast attestations (enter "new")
- Interval 2: Aggregators create proofs & broadcast
- Interval 3: Safe target update
- Interval 4: Process accumulated attestations

This staged progression ensures proper timing and prevents premature
influence on fork choice decisions.

Returns:
New Store with migrated aggregated payloads and updated head.
"""
"""Process pending aggregated payloads and update forkchoice head."""
# Merge new aggregated payloads into known aggregated payloads
merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads)
for sig_key, proofs in self.latest_new_aggregated_payloads.items():
Expand Down Expand Up @@ -943,15 +921,15 @@ def update_safe_target(self) -> "Store":

return self.model_copy(update={"safe_target": safe_target})

def aggregate_committee_signatures(self) -> "Store":
def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Aggregate committee signatures for attestations in committee_signatures.

This method aggregates signatures from the gossip_signatures map.
Attestations are reconstructed from gossip_signatures using attestation_data_by_root.

Returns:
New Store with updated latest_new_aggregated_payloads.
Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation).
"""
new_aggregated_payloads = dict(self.latest_new_aggregated_payloads)

Expand All @@ -976,13 +954,14 @@ def aggregate_committee_signatures(self) -> "Store":
committee_signatures,
)

# iterate to broadcast aggregated attestations
# Create list for broadcasting
new_aggregates: list[SignedAggregatedAttestation] = []
for aggregated_attestation, aggregated_signature in aggregated_results:
_ = SignedAggregatedAttestation(
agg = SignedAggregatedAttestation(
data=aggregated_attestation.data,
proof=aggregated_signature,
)
# Note: here we should broadcast the aggregated signature to committee_aggregators topic
new_aggregates.append(agg)

# Compute new aggregated payloads
new_gossip_sigs = dict(self.gossip_signatures)
Expand All @@ -1004,9 +983,11 @@ def aggregate_committee_signatures(self) -> "Store":
"latest_new_aggregated_payloads": new_aggregated_payloads,
"gossip_signatures": new_gossip_sigs,
}
)
), new_aggregates

def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store":
def tick_interval(
self, has_proposal: bool, is_aggregator: bool = False
) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance store time by one interval and perform interval-specific actions.

Expand Down Expand Up @@ -1048,11 +1029,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto
is_aggregator: Whether the node is an aggregator.

Returns:
New Store with advanced time and interval-specific updates applied.
Tuple of (new store with advanced time, list of new signed aggregated attestation).
"""
# Advance time by one interval
store = self.model_copy(update={"time": self.time + Uint64(1)})
current_interval = store.time % INTERVALS_PER_SLOT
new_aggregates: list[SignedAggregatedAttestation] = []

if current_interval == Uint64(0):
# Start of slot - process attestations if proposal exists
Expand All @@ -1061,17 +1043,19 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto
elif current_interval == Uint64(2):
# Aggregation interval - aggregators create proofs
if is_aggregator:
store = store.aggregate_committee_signatures()
store, new_aggregates = store.aggregate_committee_signatures()
elif current_interval == Uint64(3):
# Fast confirm - update safe target based on received proofs
store = store.update_safe_target()
elif current_interval == Uint64(4):
# End of slot - accept accumulated attestations
store = store.accept_new_attestations()

return store
return store, new_aggregates

def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store":
def on_tick(
self, time: Uint64, has_proposal: bool, is_aggregator: bool = False
) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance forkchoice store time to given timestamp.

Expand All @@ -1085,22 +1069,25 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False)
is_aggregator: Whether the node is an aggregator.

Returns:
New Store with time advanced and all interval actions performed.
Tuple of (new store with time advanced,
list of all produced signed aggregated attestation).
"""
# Calculate target time in intervals
time_delta_ms = (time - self.config.genesis_time) * Uint64(1000)
tick_interval_time = time_delta_ms // MILLISECONDS_PER_INTERVAL

# Tick forward one interval at a time
store = self
all_new_aggregates: list[SignedAggregatedAttestation] = []
while store.time < tick_interval_time:
# Check if proposal should be signaled for next interval
should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time

# Advance by one interval with appropriate signaling
store = store.tick_interval(should_signal_proposal, is_aggregator)
store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator)
all_new_aggregates.extend(new_aggregates)

return store
return store, all_new_aggregates

def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
"""
Expand Down Expand Up @@ -1128,7 +1115,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
slot_time = self.config.genesis_time + slot_duration_seconds

# Advance time to current slot (ticking intervals)
store = self.on_tick(slot_time, True)
store, _ = self.on_tick(slot_time, True)

# Process any pending attestations before proposal
store = store.accept_new_attestations()
Expand Down Expand Up @@ -1174,8 +1161,11 @@ def get_attestation_target(self) -> Checkpoint:
#
# This ensures the target doesn't advance too far ahead of safe target,
# providing a balance between liveness and safety.
#
# We allow the target to be up to 1 slot ahead of the safe target
# to ensure the chain can actually start advancing from genesis.
for _ in range(JUSTIFICATION_LOOKBACK_SLOTS):
if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot:
if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1):
target_block_root = self.blocks[target_block_root].parent_root
else:
break
Expand All @@ -1192,7 +1182,7 @@ def get_attestation_target(self) -> Checkpoint:
# Create checkpoint from selected target block
target_block = self.blocks[target_block_root]

return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot)
return Checkpoint(root=target_block_root, slot=target_block.slot)

def produce_attestation_data(self, slot: Slot) -> AttestationData:
"""
Expand Down Expand Up @@ -1299,7 +1289,7 @@ def produce_block_with_signatures(
#
# The builder iteratively collects valid attestations.
# It returns the final block, post-state, and signature proofs.
final_block, final_post_state, _, signatures = head_state.build_block(
final_block, final_post_state, collected_attestations, signatures = head_state.build_block(
slot=slot,
proposer_index=validator_index,
parent_root=head_root,
Expand Down
38 changes: 36 additions & 2 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@

from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAttestation
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
from lean_spec.subspecs.networking.config import (
GOSSIPSUB_DEFAULT_PROTOCOL_ID,
GOSSIPSUB_PROTOCOL_ID_V12,
Expand All @@ -130,6 +130,7 @@
)
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.networking.service.events import (
GossipAggregatedAttestationEvent,
GossipAttestationEvent,
GossipBlockEvent,
NetworkEvent,
Expand Down Expand Up @@ -325,7 +326,7 @@ def decode_message(
self,
topic_str: str,
compressed_data: bytes,
) -> SignedBlockWithAttestation | SignedAttestation | None:
) -> SignedBlockWithAttestation | SignedAttestation | SignedAggregatedAttestation | None:
"""
Decode a gossip message from topic and compressed data.

Expand Down Expand Up @@ -393,6 +394,8 @@ def decode_message(
return SignedBlockWithAttestation.decode_bytes(ssz_bytes)
case TopicKind.ATTESTATION_SUBNET:
return SignedAttestation.decode_bytes(ssz_bytes)
case TopicKind.AGGREGATED_ATTESTATION:
return SignedAggregatedAttestation.decode_bytes(ssz_bytes)
except SSZSerializationError as e:
raise GossipMessageError(f"SSZ decode failed: {e}") from e

Expand Down Expand Up @@ -816,6 +819,9 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None:
case TopicKind.ATTESTATION_SUBNET:
if isinstance(message, SignedAttestation):
await self._emit_gossip_attestation(message, event.peer_id)
case TopicKind.AGGREGATED_ATTESTATION:
if isinstance(message, SignedAggregatedAttestation):
await self._emit_gossip_aggregated_attestation(message, event.peer_id)

logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id)

Expand Down Expand Up @@ -1168,6 +1174,25 @@ async def _emit_gossip_attestation(
GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic)
)

async def _emit_gossip_aggregated_attestation(
self,
signed_attestation: SignedAggregatedAttestation,
peer_id: PeerId,
) -> None:
"""
Emit a gossip aggregated attestation event.

Args:
signed_attestation: Aggregated attestation received from gossip.
peer_id: Peer that sent it.
"""
topic = GossipTopic(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=self._fork_digest)
await self._events.put(
GossipAggregatedAttestationEvent(
signed_attestation=signed_attestation, peer_id=peer_id, topic=topic
)
)

async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None:
"""
Accept incoming streams from a connection.
Expand Down Expand Up @@ -1464,6 +1489,15 @@ async def _handle_gossip_stream(self, peer_id: PeerId, stream: Stream) -> None:
# Type mismatch indicates a bug in decode_message.
logger.warning("Attestation topic but got %s", type(message).__name__)

case TopicKind.AGGREGATED_ATTESTATION:
if isinstance(message, SignedAggregatedAttestation):
await self._emit_gossip_aggregated_attestation(message, peer_id)
else:
logger.warning(
"Aggregated attestation topic but got %s",
type(message).__name__,
)

logger.debug("Received gossip %s from %s", topic.kind.value, peer_id)

except GossipMessageError as e:
Expand Down
Loading
Loading