Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def make_fixture(self) -> Self:
# Time advancement may trigger slot boundaries.
# At slot boundaries, pending attestations may become active.
# Always act as aggregator to ensure gossip signatures are aggregated
store = store.on_tick(Uint64(step.time), has_proposal=False, is_aggregator=True)
store, _ = store.on_tick(
Uint64(step.time), has_proposal=False, is_aggregator=True
)

elif isinstance(step, BlockStep):
# Build a complete signed block from the lightweight spec.
Expand Down Expand Up @@ -264,7 +266,7 @@ def make_fixture(self) -> Self:
# Always act as aggregator to ensure gossip signatures are aggregated
slot_duration_seconds = block.slot * SECONDS_PER_SLOT
block_time = store.config.genesis_time + slot_duration_seconds
store = store.on_tick(block_time, has_proposal=True, is_aggregator=True)
store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True)

# Process the block through Store.
# This validates, applies state transition, and updates head.
Expand Down Expand Up @@ -408,7 +410,7 @@ def _build_block_from_spec(
# First, aggregate any gossip signatures into payloads
# This ensures that signatures from previous blocks (like proposer attestations)
# are available for extraction
aggregation_store = working_store.aggregate_committee_signatures()
aggregation_store, _ = working_store.aggregate_committee_signatures()

# Now combine aggregated payloads from both sources
aggregated_payloads = (
Expand Down
17 changes: 15 additions & 2 deletions src/lean_spec/subspecs/chain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ async def run(self) -> None:
#
# This minimal service does not produce blocks.
# Block production requires validator keys.
new_store = self.sync_service.store.on_tick(
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)

# Update sync service's store reference.
Expand All @@ -137,6 +138,11 @@ async def run(self) -> None:
# the updated time.
self.sync_service.store = new_store

# Publish any new aggregated attestations produced this tick
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

logger.info(
"Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d",
self.clock.current_slot(),
Expand All @@ -162,11 +168,18 @@ async def _initial_tick(self) -> Interval | None:

# Only tick if we're past genesis.
if current_time >= self.clock.genesis_time:
new_store = self.sync_service.store.on_tick(
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
self.sync_service.store = new_store

# Publish any aggregated attestations produced during catch-up.
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

return self.clock.total_intervals()

return None
Expand Down
34 changes: 34 additions & 0 deletions src/lean_spec/subspecs/containers/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import logging
from typing import AbstractSet, Collection, Iterable

from lean_spec.subspecs.ssz.hash import hash_tree_root
Expand Down Expand Up @@ -33,6 +34,8 @@
Validators,
)

logger = logging.getLogger(__name__)


class State(Container):
"""The main consensus state object."""
Expand Down Expand Up @@ -449,6 +452,13 @@ def process_attestations(
#
# The rules below filter out invalid or irrelevant votes.
for attestation in attestations:
logger.debug(
"Processing attestation: target=slot%d source=slot%d participants=%s",
attestation.data.target.slot,
attestation.data.source.slot,
attestation.aggregation_bits.to_validator_indices(),
)

source = attestation.data.source
target = attestation.data.target

Expand All @@ -457,6 +467,7 @@ def process_attestations(
# A vote may only originate from a point in history that is already justified.
# A source that lacks existing justification cannot be used to anchor a new vote.
if not justified_slots.is_slot_justified(finalized_slot, source.slot):
logger.debug("Skipping attestation: source slot %d not justified", source.slot)
continue

# Ignore votes for targets that have already reached consensus.
Expand All @@ -468,6 +479,7 @@ def process_attestations(

# Ignore votes that reference zero-hash slots.
if source.root == ZERO_HASH or target.root == ZERO_HASH:
logger.debug("Skipping attestation: zero root in source/target")
continue

# Ensure the vote refers to blocks that actually exist on our chain.
Expand All @@ -491,13 +503,23 @@ def process_attestations(
)

if not source_matches or not target_matches:
logger.debug(
"Skipping attestation: root mismatch (source_match=%s target_match=%s)",
source_matches,
target_matches,
)
continue

# Ensure time flows forward.
#
# A target must always lie strictly after its source slot.
# Otherwise the vote makes no chronological sense.
if target.slot <= source.slot:
logger.debug(
"Skipping attestation: target slot %d <= source slot %d",
target.slot,
source.slot,
)
continue

# Ensure the target falls on a slot that can be justified after the finalized one.
Expand All @@ -514,6 +536,11 @@ def process_attestations(
# Any target outside this pattern is not eligible for justification,
# so votes for it are simply ignored.
if not target.slot.is_justifiable_after(self.latest_finalized.slot):
logger.debug(
"Skipping attestation: target slot %d not justifiable after finalized slot %d",
target.slot,
self.latest_finalized.slot,
)
continue

# Record the vote.
Expand Down Expand Up @@ -542,6 +569,12 @@ def process_attestations(
count = sum(bool(justified) for justified in justifications[target.root])

if 3 * count >= (2 * len(self.validators)):
logger.info(
"Supermajority reached for target slot %d: %d votes (threshold: %d)",
target.slot,
count,
(2 * len(self.validators) + 2) // 3,
)
# The block becomes justified
#
# The chain now considers this block part of its safe head.
Expand Down Expand Up @@ -573,6 +606,7 @@ def process_attestations(
old_finalized_slot = finalized_slot
latest_finalized = source
finalized_slot = latest_finalized.slot
logger.info("Finalization advanced to slot %d", finalized_slot)

# Rebase/prune justification tracking across the new finalized boundary.
#
Expand Down
77 changes: 35 additions & 42 deletions src/lean_spec/subspecs/forkchoice/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
__all__ = ["Store"]

import copy
import logging
from collections import defaultdict

from lean_spec.subspecs.chain.config import (
Expand Down Expand Up @@ -45,6 +46,8 @@
)
from lean_spec.types.container import Container

logger = logging.getLogger(__name__)


class Store(Container):
"""
Expand Down Expand Up @@ -488,7 +491,6 @@ def on_gossip_aggregated_attestation(
new_attestation_data_by_root = dict(self.attestation_data_by_root)
new_attestation_data_by_root[data_root] = data

store = self
for vid in validator_ids:
# Update Proof Map
#
Expand All @@ -497,7 +499,7 @@ def on_gossip_aggregated_attestation(
new_aggregated_payloads.setdefault(key, []).append(proof)

# Return store with updated aggregated payloads and attestation data
return store.model_copy(
return self.model_copy(
update={
"latest_new_aggregated_payloads": new_aggregated_payloads,
"attestation_data_by_root": new_attestation_data_by_root,
Expand Down Expand Up @@ -856,28 +858,7 @@ def update_head(self) -> "Store":
)

def accept_new_attestations(self) -> "Store":
"""
Process pending aggregated payloads and update forkchoice head.

Moves aggregated payloads from latest_new_aggregated_payloads to
latest_known_aggregated_payloads, making them eligible to contribute to
fork choice weights. This migration happens at specific interval ticks.

The Interval Tick System
-------------------------
Aggregated payloads progress through intervals:
- Interval 0: Block proposal
- Interval 1: Validators cast attestations (enter "new")
- Interval 2: Aggregators create proofs & broadcast
- Interval 3: Safe target update
- Interval 4: Process accumulated attestations

This staged progression ensures proper timing and prevents premature
influence on fork choice decisions.

Returns:
New Store with migrated aggregated payloads and updated head.
"""
"""Process pending aggregated payloads and update forkchoice head."""
# Merge new aggregated payloads into known aggregated payloads
merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads)
for sig_key, proofs in self.latest_new_aggregated_payloads.items():
Expand Down Expand Up @@ -937,15 +918,15 @@ def update_safe_target(self) -> "Store":

return self.model_copy(update={"safe_target": safe_target})

def aggregate_committee_signatures(self) -> "Store":
def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Aggregate committee signatures for attestations in committee_signatures.

This method aggregates signatures from the gossip_signatures map.
Attestations are reconstructed from gossip_signatures using attestation_data_by_root.

Returns:
New Store with updated latest_new_aggregated_payloads.
Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to avoid variable names in the documentation because these names can change over time, rendering the documentation obsolete

Suggested change
Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation).
Tuple of (new store with updated payloads, list of new signed aggregated attestation).

"""
new_aggregated_payloads = dict(self.latest_new_aggregated_payloads)

Expand All @@ -970,13 +951,14 @@ def aggregate_committee_signatures(self) -> "Store":
committee_signatures,
)

# iterate to broadcast aggregated attestations
# Create list for broadcasting
new_aggregates: list[SignedAggregatedAttestation] = []
for aggregated_attestation, aggregated_signature in aggregated_results:
_ = SignedAggregatedAttestation(
agg = SignedAggregatedAttestation(
data=aggregated_attestation.data,
proof=aggregated_signature,
)
# Note: here we should broadcast the aggregated signature to committee_aggregators topic
new_aggregates.append(agg)

# Compute new aggregated payloads
new_gossip_sigs = dict(self.gossip_signatures)
Expand All @@ -998,9 +980,11 @@ def aggregate_committee_signatures(self) -> "Store":
"latest_new_aggregated_payloads": new_aggregated_payloads,
"gossip_signatures": new_gossip_sigs,
}
)
), new_aggregates

def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store":
def tick_interval(
self, has_proposal: bool, is_aggregator: bool = False
) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance store time by one interval and perform interval-specific actions.

Expand Down Expand Up @@ -1042,11 +1026,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto
is_aggregator: Whether the node is an aggregator.

Returns:
New Store with advanced time and interval-specific updates applied.
Tuple of (new Store with advanced time, list of new SignedAggregatedAttestation).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Tuple of (new Store with advanced time, list of new SignedAggregatedAttestation).
Tuple of (new store with advanced time, list of new signed aggregated attestation).

"""
# Advance time by one interval
store = self.model_copy(update={"time": self.time + Uint64(1)})
current_interval = store.time % INTERVALS_PER_SLOT
new_aggregates: list[SignedAggregatedAttestation] = []

if current_interval == Uint64(0):
# Start of slot - process attestations if proposal exists
Expand All @@ -1055,17 +1040,19 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto
elif current_interval == Uint64(2):
# Aggregation interval - aggregators create proofs
if is_aggregator:
store = store.aggregate_committee_signatures()
store, new_aggregates = store.aggregate_committee_signatures()
elif current_interval == Uint64(3):
# Fast confirm - update safe target based on received proofs
store = store.update_safe_target()
elif current_interval == Uint64(4):
# End of slot - accept accumulated attestations
store = store.accept_new_attestations()

return store
return store, new_aggregates

def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store":
def on_tick(
self, time: Uint64, has_proposal: bool, is_aggregator: bool = False
) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance forkchoice store time to given timestamp.

Expand All @@ -1079,22 +1066,25 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False)
is_aggregator: Whether the node is an aggregator.

Returns:
New Store with time advanced and all interval actions performed.
Tuple of (new Store with time advanced,
list of all produced SignedAggregatedAttestation).
Comment on lines +1069 to +1070
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Tuple of (new Store with time advanced,
list of all produced SignedAggregatedAttestation).
Tuple of (new store with time advanced,
list of all produced signed aggregated attestation).

"""
# Calculate target time in intervals
time_delta_ms = (time - self.config.genesis_time) * Uint64(1000)
tick_interval_time = time_delta_ms // MILLISECONDS_PER_INTERVAL

# Tick forward one interval at a time
store = self
all_new_aggregates: list[SignedAggregatedAttestation] = []
while store.time < tick_interval_time:
# Check if proposal should be signaled for next interval
should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time

# Advance by one interval with appropriate signaling
store = store.tick_interval(should_signal_proposal, is_aggregator)
store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator)
all_new_aggregates.extend(new_aggregates)

return store
return store, all_new_aggregates

def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
"""
Expand Down Expand Up @@ -1122,7 +1112,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
slot_time = self.config.genesis_time + slot_duration_seconds

# Advance time to current slot (ticking intervals)
store = self.on_tick(slot_time, True)
store, _ = self.on_tick(slot_time, True)

# Process any pending attestations before proposal
store = store.accept_new_attestations()
Expand Down Expand Up @@ -1168,8 +1158,11 @@ def get_attestation_target(self) -> Checkpoint:
#
# This ensures the target doesn't advance too far ahead of safe target,
# providing a balance between liveness and safety.
#
# MODIFIED: We allow the target to be up to 1 slot ahead of safe_target
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# MODIFIED: We allow the target to be up to 1 slot ahead of safe_target
# We allow the target to be up to 1 slot ahead of the safe target

# to ensure the chain can actually start advancing from genesis.
for _ in range(JUSTIFICATION_LOOKBACK_SLOTS):
if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot:
if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1):
target_block_root = self.blocks[target_block_root].parent_root
else:
break
Expand All @@ -1186,7 +1179,7 @@ def get_attestation_target(self) -> Checkpoint:
# Create checkpoint from selected target block
target_block = self.blocks[target_block_root]

return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot)
return Checkpoint(root=target_block_root, slot=target_block.slot)

def produce_attestation_data(self, slot: Slot) -> AttestationData:
"""
Expand Down Expand Up @@ -1293,7 +1286,7 @@ def produce_block_with_signatures(
#
# The builder iteratively collects valid attestations.
# It returns the final block, post-state, and signature proofs.
final_block, final_post_state, _, signatures = head_state.build_block(
final_block, final_post_state, collected_attestations, signatures = head_state.build_block(
slot=slot,
proposer_index=validator_index,
parent_root=head_root,
Expand Down
Loading
Loading