diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 587d4668b8a..5086c2b2ebb 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -162,11 +162,20 @@ impl AttestationService AttestationService Result<(), String> { + /// Performs attestation duties for the current slot. + async fn perform_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self .slot_clock @@ -199,49 +206,32 @@ impl AttestationService(attestation_data) - }, - "unaggregated attestation production", - ) - .ok_or("Failed to spawn attestation data task")?; + self.sign_and_publish_attestations(slot, &attestation_duties, attestation_data.clone()) + .await + .map_err(|e| { + crit!( + error = format!("{:?}", e), + slot = slot.as_u64(), + "Error during attestation routine" + ); + e + })?; // If a validator needs to publish an aggregate attestation, they must do so at 2/3 // through the slot. This delay triggers at this time @@ -250,6 +240,11 @@ impl AttestationService> = self .duties_service .attesters(slot) @@ -261,50 +256,21 @@ impl AttestationService data, - Ok(Some(Err(err))) => { - error!(?err, "Attestation production failed"); - return; - } - Ok(None) | Err(_) => { - info!("Aborting attestation production due to shutdown"); - return; - } - }; - - // For each committee index for this slot: - // Create and publish `SignedAggregateAndProof` for all aggregating validators. - aggregate_duties_by_committee_index.into_iter().for_each( - |(committee_index, validator_duties)| { - let attestation_service = attestation_service_clone.clone(); - let attestation_data = attestation_data.clone(); - executor.spawn_ignoring_error( - attestation_service.handle_aggregates( - slot, - committee_index, - validator_duties, - aggregate_production_instant, - attestation_data, - ), - "aggregate publish", - ); - }, + // For each committee index for this slot: + // Create and publish `SignedAggregateAndProof` for all aggregating validators. + let aggregate_futures = aggregate_duties_by_committee_index.into_iter().map( + |(committee_index, validator_duties)| { + self.clone().handle_aggregates( + slot, + committee_index, + validator_duties, + aggregate_production_instant, + attestation_data.clone(), ) }, - "attestation and aggregate publish", ); - // Schedule pruning of the slashing protection database once all unaggregated - // attestations have (hopefully) been signed, i.e. at the same time as aggregate - // production. - self.spawn_slashing_protection_pruning_task(slot, aggregate_production_instant); + join_all(aggregate_futures).await; Ok(()) }