Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 57 additions & 91 deletions validator_client/validator_services/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,20 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot + slot_duration / 3).await;

if let Err(e) = self.spawn_attestation_tasks(slot_duration) {
crit!(error = e, "Failed to spawn attestation tasks")
} else {
trace!("Spawned attestation tasks");
}
let attestation_service = self.clone();
self.executor.spawn(
async move {
if let Err(e) = attestation_service
.perform_attestation_tasks(slot_duration)
.await
{
crit!(error = e, "Failed to perform attestation tasks")
} else {
trace!("Completed attestation tasks");
}
},
"attestation_tasks",
);
} else {
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
Expand All @@ -180,10 +189,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// Spawn only one new task for attestation post-Electra
/// For each required aggregates, spawn a new task that downloads, signs and uploads the
/// aggregates to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
/// Performs attestation duties for the current slot.
async fn perform_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
Expand All @@ -199,49 +206,32 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
return Ok(());
}

let attestation_service = self.clone();

let attestation_data_handle = self
.inner
.executor
.spawn_handle(
async move {
let attestation_data = attestation_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;

attestation_service
.sign_and_publish_attestations(
slot,
&attestation_duties,
attestation_data.clone(),
)
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
slot = slot.as_u64(),
"Error during attestation routine"
);
e
})?;
Ok::<AttestationData, String>(attestation_data)
},
"unaggregated attestation production",
)
.ok_or("Failed to spawn attestation data task")?;
self.sign_and_publish_attestations(slot, &attestation_duties, attestation_data.clone())
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
slot = slot.as_u64(),
"Error during attestation routine"
);
e
})?;

// If a validator needs to publish an aggregate attestation, they must do so at 2/3
// through the slot. This delay triggers at this time
Expand All @@ -250,6 +240,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));

// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
// production.
self.spawn_slashing_protection_pruning_task(slot, aggregate_production_instant);

let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
Expand All @@ -261,50 +256,21 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
map
});

// Spawn a task that awaits the attestation data handle and then spawns aggregate tasks
let attestation_service_clone = self.clone();
let executor = self.inner.executor.clone();
self.inner.executor.spawn(
async move {
// Log an error if the handle fails and return, skipping aggregates
let attestation_data = match attestation_data_handle.await {
Ok(Some(Ok(data))) => data,
Ok(Some(Err(err))) => {
error!(?err, "Attestation production failed");
return;
}
Ok(None) | Err(_) => {
info!("Aborting attestation production due to shutdown");
return;
}
};

// For each committee index for this slot:
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
aggregate_duties_by_committee_index.into_iter().for_each(
|(committee_index, validator_duties)| {
let attestation_service = attestation_service_clone.clone();
let attestation_data = attestation_data.clone();
executor.spawn_ignoring_error(
attestation_service.handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data,
),
"aggregate publish",
);
},
// For each committee index for this slot:
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
let aggregate_futures = aggregate_duties_by_committee_index.into_iter().map(
|(committee_index, validator_duties)| {
self.clone().handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data.clone(),
)
},
"attestation and aggregate publish",
);

// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
// production.
self.spawn_slashing_protection_pruning_task(slot, aggregate_production_instant);
join_all(aggregate_futures).await;

Ok(())
}
Expand Down