From 6cb2d532672c31032cb69a851ce2cf76bb977a1e Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Fri, 30 Jan 2026 14:03:46 -0700 Subject: [PATCH 1/5] DAP-16 Helper Job State Updates Now that DAP-16 allows for job reacquisition / idempotency on the HTTP layer, a job in AwaitingRequest has to be continuable directly without error. E.g., the database needs to include it in `acquire_incomplete_aggregation_jobs`. While I was at it, I wrote some more documentation on the state transitions in the `AggregationJobState` database model. These changes unblock further job state updates for the leader, but do not go so far as to remove the no-longer-relevant AwaitingRequest state, which will be done separately in #4305. Resolves #4322 --- .../src/aggregator/aggregation_job_driver.rs | 42 +++++++++--- .../src/aggregator/aggregation_job_writer.rs | 5 ++ aggregator_core/src/datastore.rs | 7 +- aggregator_core/src/datastore/models.rs | 28 +++++++- aggregator_core/src/datastore/tests.rs | 68 +++++++++++-------- 5 files changed, 111 insertions(+), 39 deletions(-) diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 05311212e..cc279251b 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -61,8 +61,8 @@ use crate::{ aggregation_job_continue::{AggregateContinueMetrics, compute_helper_aggregate_continue}, aggregation_job_init::{AggregateInitMetrics, compute_helper_aggregate_init}, aggregation_job_writer::{ - AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, - WritableReportAggregation, + AggregationJobWriter, AggregationJobWriterMetrics, ReportAggregationUpdate, + UpdateWrite, WritableReportAggregation, }, batch_mode::CollectableBatchMode, error::handle_ping_pong_error, @@ -1512,8 +1512,7 @@ where // Compute the next aggregation step. let task = Arc::new(task); - let aggregation_job = - Arc::new(aggregation_job.with_state(AggregationJobState::AwaitingRequest)); + let aggregation_job_for_compute = Arc::new(aggregation_job); let report_aggregations = Arc::new( compute_helper_aggregate_init( datastore.clock(), @@ -1525,12 +1524,26 @@ where self.past_report_clock_skew_histogram.clone(), ), Arc::clone(&task), - Arc::clone(&aggregation_job), + Arc::clone(&aggregation_job_for_compute), report_aggregations, ) .await?, ); + // Determine the next Helper job state based on whether all reports are terminal. + let all_terminal = report_aggregations.iter().all(|ra| ra.is_terminal()); + let new_state = if all_terminal { + AggregationJobState::Finished + } else { + AggregationJobState::AwaitingRequest + }; + let aggregation_job = Arc::new( + aggregation_job_for_compute + .as_ref() + .clone() + .with_state(new_state), + ); + // Write results back to datastore. let metrics = AggregationJobWriterMetrics { report_aggregation_success_counter: self.aggregation_success_counter.clone(), @@ -1613,8 +1626,7 @@ where // Compute the next aggregation step. let task = Arc::new(task); - let aggregation_job = - Arc::new(aggregation_job.with_state(AggregationJobState::AwaitingRequest)); + let aggregation_job_for_compute = Arc::new(aggregation_job); let report_aggregations = Arc::new( compute_helper_aggregate_continue( Arc::clone(&vdaf), @@ -1623,12 +1635,26 @@ where &task_aggregation_counters, ), Arc::clone(&task), - Arc::clone(&aggregation_job), + Arc::clone(&aggregation_job_for_compute), report_aggregations, ) .await, ); + // Determine the next Helper job state based on whether all reports are terminal. + let all_terminal = report_aggregations.iter().all(|ra| ra.is_terminal()); + let new_state = if all_terminal { + AggregationJobState::Finished + } else { + AggregationJobState::AwaitingRequest + }; + let aggregation_job = Arc::new( + aggregation_job_for_compute + .as_ref() + .clone() + .with_state(new_state), + ); + // Write results back to datastore. let metrics = AggregationJobWriterMetrics { report_aggregation_success_counter: self.aggregation_success_counter.clone(), diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index 7944f22c3..ff293026b 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -810,6 +810,11 @@ where /// Update aggregation job states if all their report aggregations have reached a terminal /// state. + /// + /// This function is called during write operations to automatically transition jobs to + /// Finished when all constituent report aggregations are terminal (Finished or Failed). + /// This serves as a safety net to ensure jobs reach their terminal state even if they + /// weren't transitioned earlier in processing. fn update_aggregation_job_state_from_report_aggregations(&mut self) { // Update in-memory state of aggregation jobs: any aggregation jobs whose report // aggregations are all in a terminal state should be considered Finished (unless the diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index a6b6119ab..551904a2b 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1994,7 +1994,12 @@ WHERE aggregation_jobs.task_id = $1 WITH incomplete_jobs AS ( SELECT aggregation_jobs.id FROM aggregation_jobs JOIN tasks ON tasks.id = aggregation_jobs.task_id - WHERE aggregation_jobs.state = 'ACTIVE' + -- DAP (§4.6.3 [dap-16]) describes the continuation phase where the Leader sends + -- AggregationJobContinueReq messages to advance preparation. Helper jobs in + -- AWAITING_REQUEST state represent work that has processed one round but needs + -- additional Leader requests to complete. These jobs must be acquirable so the + -- Helper can respond to incoming continuation requests. + WHERE aggregation_jobs.state IN ('ACTIVE', 'AWAITING_REQUEST') AND aggregation_jobs.lease_expiry <= $2 AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 103a4c9c3..768f5b568 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -498,19 +498,43 @@ impl> } } -/// AggregationJobState represents the state of an aggregation job. It corresponds to the -/// AGGREGATION_JOB_STATE enum in the schema. +/// AggregationJobState represents the internal operational state of an aggregation job. This +/// corresponds to the AGGREGATION_JOB_STATE enum in the schema. +/// +/// These are implementation-specific states used for Janus's internal state management. +/// DAP §4.6 [dap-16] defines aggregation job completion in terms of individual report +/// preparation states (Continued, FinishedWithOutbound, Finished, Rejected), not job-level +/// states. This enum provides operational states for managing the lifecycle of aggregation +/// jobs within Janus. #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, ToSql, FromSql)] #[postgres(name = "aggregation_job_state")] pub enum AggregationJobState { + /// Job is being actively processed. This is the initial state for both Leader and Helper + /// aggregation jobs. Corresponds to the initialization phase in DAP (§4.6.2 [dap-16]). #[postgres(name = "ACTIVE")] Active, + /// Helper has completed processing one request and is waiting for the Leader's next + /// request. Represents the interval between aggregation rounds in DAP's continuation + /// phase (§4.6.3 [dap-16]). The Helper has sent an AggregationJobResp but some reports + /// remain in the Continued state awaiting the next AggregationJobContinueReq. + /// + /// Note: This state will be removed as part of the DAP-16 state model redesign (#4305). #[postgres(name = "AWAITING_REQUEST")] AwaitingRequest, + /// All report aggregations have reached a terminal state (Finished or Failed), completing + /// the aggregation job lifecycle (§4.6 [dap-16]). Output shares are committed to batch + /// buckets for collection. This is a terminal state indicating normal completion. #[postgres(name = "FINISHED")] Finished, + /// Job processing encountered an unrecoverable error and was explicitly abandoned. This is + /// an implementation-specific terminal state used for operational issues beyond DAP's + /// protocol-level error handling (e.g., excessive lease contention, resource exhaustion). + /// Unlike Finished, this indicates the job did not complete the DAP aggregation lifecycle + /// normally. #[postgres(name = "ABANDONED")] Abandoned, + /// Job has been marked for deletion and should not be processed further. This is a terminal + /// state used during cleanup. #[postgres(name = "DELETED")] Deleted, } diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 170bf0acd..86b095d91 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -1973,6 +1973,11 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .zip(rng().sample_iter(StandardUniform)) .take(AGGREGATION_JOB_COUNT) .collect(); + + // Add an awaiting_request aggregation job which can still be acquired + let awaiting_request_aggregation_job_id: AggregationJobId = random(); + task_and_aggregation_job_ids.push((*helper_task.id(), awaiting_request_aggregation_job_id)); + task_and_aggregation_job_ids.sort(); let leader_aggregation_job_ids: Vec<_> = task_and_aggregation_job_ids .iter() @@ -1992,31 +1997,36 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore tx.put_aggregator_task(&leader_task).await.unwrap(); tx.put_aggregator_task(&helper_task).await.unwrap(); - try_join_all(task_and_aggregation_job_ids.into_iter().map( - |(task_id, aggregation_job_id)| async move { - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH_PRIO3, - TimeInterval, - Prio3Count, - >::new( - task_id, - aggregation_job_id, - (), - (), - Interval::minimal( - START_TIME - .add_duration(&lease_duration) - .unwrap() - .add_duration(&lease_duration) - .unwrap(), - ) - .unwrap(), - AggregationJobState::Active, - AggregationJobStep::from(0), - )) - .await - }, - )) + try_join_all( + task_and_aggregation_job_ids + .into_iter() + // Remove the awaiting_request job from the list since it needs special + // handling (it's already in AwaitingRequest). + .filter(|(_, id)| *id != awaiting_request_aggregation_job_id) + .map(|(task_id, aggregation_job_id)| async move { + tx.put_aggregation_job(&AggregationJob::< + VERIFY_KEY_LENGTH_PRIO3, + TimeInterval, + Prio3Count, + >::new( + task_id, + aggregation_job_id, + (), + (), + Interval::minimal( + START_TIME + .add_duration(&lease_duration) + .unwrap() + .add_duration(&lease_duration) + .unwrap(), + ) + .unwrap(), + AggregationJobState::Active, + AggregationJobStep::from(0), + )) + .await + }), + ) .await .unwrap(); @@ -2062,14 +2072,14 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .unwrap(); // Write an aggregation job that is awaiting a request from the Leader. We - // don't want to retrieve this one, either. + // expect to retrieve this one. tx.put_aggregation_job(&AggregationJob::< VERIFY_KEY_LENGTH_PRIO3, TimeInterval, Prio3Count, >::new( *helper_task.id(), - random(), + awaiting_request_aggregation_job_id, (), (), Interval::minimal(Time::from_time_precision_units(0)).unwrap(), @@ -2418,10 +2428,12 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .collect(); let mut got_aggregation_jobs: Vec<_> = ds .run_unnamed_tx(|tx| { + let task_count = task_and_aggregation_job_ids.len(); Box::pin(async move { // This time, we just acquire all jobs in a single go for simplicity -- we've // already tested the maximum acquire count functionality above. - tx.acquire_incomplete_aggregation_jobs(&lease_duration_std, AGGREGATION_JOB_COUNT) + // Note: task_and_aggregation_job_ids includes the awaiting_request job. + tx.acquire_incomplete_aggregation_jobs(&lease_duration_std, task_count) .await }) }) From ed0763e4327b6f3de2d466ef748a5e93689f8abe Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Thu, 5 Feb 2026 14:47:23 -0700 Subject: [PATCH 2/5] Keep the driver only acquiring ACTIVE jobs @divergentdave has corrected my interpretation of the state machine. --- aggregator_core/src/datastore.rs | 7 +-- aggregator_core/src/datastore/models.rs | 2 - aggregator_core/src/datastore/tests.rs | 68 ++++++++++--------------- 3 files changed, 29 insertions(+), 48 deletions(-) diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 551904a2b..a6b6119ab 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1994,12 +1994,7 @@ WHERE aggregation_jobs.task_id = $1 WITH incomplete_jobs AS ( SELECT aggregation_jobs.id FROM aggregation_jobs JOIN tasks ON tasks.id = aggregation_jobs.task_id - -- DAP (§4.6.3 [dap-16]) describes the continuation phase where the Leader sends - -- AggregationJobContinueReq messages to advance preparation. Helper jobs in - -- AWAITING_REQUEST state represent work that has processed one round but needs - -- additional Leader requests to complete. These jobs must be acquirable so the - -- Helper can respond to incoming continuation requests. - WHERE aggregation_jobs.state IN ('ACTIVE', 'AWAITING_REQUEST') + WHERE aggregation_jobs.state = 'ACTIVE' AND aggregation_jobs.lease_expiry <= $2 AND UPPER(aggregation_jobs.client_timestamp_interval) >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 768f5b568..24e7312f4 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -517,8 +517,6 @@ pub enum AggregationJobState { /// request. Represents the interval between aggregation rounds in DAP's continuation /// phase (§4.6.3 [dap-16]). The Helper has sent an AggregationJobResp but some reports /// remain in the Continued state awaiting the next AggregationJobContinueReq. - /// - /// Note: This state will be removed as part of the DAP-16 state model redesign (#4305). #[postgres(name = "AWAITING_REQUEST")] AwaitingRequest, /// All report aggregations have reached a terminal state (Finished or Failed), completing diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 86b095d91..170bf0acd 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -1973,11 +1973,6 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .zip(rng().sample_iter(StandardUniform)) .take(AGGREGATION_JOB_COUNT) .collect(); - - // Add an awaiting_request aggregation job which can still be acquired - let awaiting_request_aggregation_job_id: AggregationJobId = random(); - task_and_aggregation_job_ids.push((*helper_task.id(), awaiting_request_aggregation_job_id)); - task_and_aggregation_job_ids.sort(); let leader_aggregation_job_ids: Vec<_> = task_and_aggregation_job_ids .iter() @@ -1997,36 +1992,31 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore tx.put_aggregator_task(&leader_task).await.unwrap(); tx.put_aggregator_task(&helper_task).await.unwrap(); - try_join_all( - task_and_aggregation_job_ids - .into_iter() - // Remove the awaiting_request job from the list since it needs special - // handling (it's already in AwaitingRequest). - .filter(|(_, id)| *id != awaiting_request_aggregation_job_id) - .map(|(task_id, aggregation_job_id)| async move { - tx.put_aggregation_job(&AggregationJob::< - VERIFY_KEY_LENGTH_PRIO3, - TimeInterval, - Prio3Count, - >::new( - task_id, - aggregation_job_id, - (), - (), - Interval::minimal( - START_TIME - .add_duration(&lease_duration) - .unwrap() - .add_duration(&lease_duration) - .unwrap(), - ) - .unwrap(), - AggregationJobState::Active, - AggregationJobStep::from(0), - )) - .await - }), - ) + try_join_all(task_and_aggregation_job_ids.into_iter().map( + |(task_id, aggregation_job_id)| async move { + tx.put_aggregation_job(&AggregationJob::< + VERIFY_KEY_LENGTH_PRIO3, + TimeInterval, + Prio3Count, + >::new( + task_id, + aggregation_job_id, + (), + (), + Interval::minimal( + START_TIME + .add_duration(&lease_duration) + .unwrap() + .add_duration(&lease_duration) + .unwrap(), + ) + .unwrap(), + AggregationJobState::Active, + AggregationJobStep::from(0), + )) + .await + }, + )) .await .unwrap(); @@ -2072,14 +2062,14 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .unwrap(); // Write an aggregation job that is awaiting a request from the Leader. We - // expect to retrieve this one. + // don't want to retrieve this one, either. tx.put_aggregation_job(&AggregationJob::< VERIFY_KEY_LENGTH_PRIO3, TimeInterval, Prio3Count, >::new( *helper_task.id(), - awaiting_request_aggregation_job_id, + random(), (), (), Interval::minimal(Time::from_time_precision_units(0)).unwrap(), @@ -2428,12 +2418,10 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .collect(); let mut got_aggregation_jobs: Vec<_> = ds .run_unnamed_tx(|tx| { - let task_count = task_and_aggregation_job_ids.len(); Box::pin(async move { // This time, we just acquire all jobs in a single go for simplicity -- we've // already tested the maximum acquire count functionality above. - // Note: task_and_aggregation_job_ids includes the awaiting_request job. - tx.acquire_incomplete_aggregation_jobs(&lease_duration_std, task_count) + tx.acquire_incomplete_aggregation_jobs(&lease_duration_std, AGGREGATION_JOB_COUNT) .await }) }) From 6d0277d38812682897994e29490b7b7930edd44f Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Tue, 10 Feb 2026 12:25:44 -0700 Subject: [PATCH 3/5] Address @divergentdave's correction about the Active state --- aggregator_core/src/datastore/models.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 24e7312f4..3bb3996fc 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -509,8 +509,8 @@ impl> #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, ToSql, FromSql)] #[postgres(name = "aggregation_job_state")] pub enum AggregationJobState { - /// Job is being actively processed. This is the initial state for both Leader and Helper - /// aggregation jobs. Corresponds to the initialization phase in DAP (§4.6.2 [dap-16]). + /// Job is ready for the aggregation job driver to pick up. Corresponds to the + /// initialization phase in DAP (§4.6.2 [dap-16]). #[postgres(name = "ACTIVE")] Active, /// Helper has completed processing one request and is waiting for the Leader's next From bb524167defb645eab9ea4989086b994c4d76634 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Thu, 12 Feb 2026 09:04:15 -0700 Subject: [PATCH 4/5] Trim down to only doc updates to AggregationJobState --- .../src/aggregator/aggregation_job_driver.rs | 42 ++++--------------- .../src/aggregator/aggregation_job_writer.rs | 5 --- 2 files changed, 8 insertions(+), 39 deletions(-) diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index cc279251b..05311212e 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -61,8 +61,8 @@ use crate::{ aggregation_job_continue::{AggregateContinueMetrics, compute_helper_aggregate_continue}, aggregation_job_init::{AggregateInitMetrics, compute_helper_aggregate_init}, aggregation_job_writer::{ - AggregationJobWriter, AggregationJobWriterMetrics, ReportAggregationUpdate, - UpdateWrite, WritableReportAggregation, + AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, + WritableReportAggregation, }, batch_mode::CollectableBatchMode, error::handle_ping_pong_error, @@ -1512,7 +1512,8 @@ where // Compute the next aggregation step. let task = Arc::new(task); - let aggregation_job_for_compute = Arc::new(aggregation_job); + let aggregation_job = + Arc::new(aggregation_job.with_state(AggregationJobState::AwaitingRequest)); let report_aggregations = Arc::new( compute_helper_aggregate_init( datastore.clock(), @@ -1524,26 +1525,12 @@ where self.past_report_clock_skew_histogram.clone(), ), Arc::clone(&task), - Arc::clone(&aggregation_job_for_compute), + Arc::clone(&aggregation_job), report_aggregations, ) .await?, ); - // Determine the next Helper job state based on whether all reports are terminal. - let all_terminal = report_aggregations.iter().all(|ra| ra.is_terminal()); - let new_state = if all_terminal { - AggregationJobState::Finished - } else { - AggregationJobState::AwaitingRequest - }; - let aggregation_job = Arc::new( - aggregation_job_for_compute - .as_ref() - .clone() - .with_state(new_state), - ); - // Write results back to datastore. let metrics = AggregationJobWriterMetrics { report_aggregation_success_counter: self.aggregation_success_counter.clone(), @@ -1626,7 +1613,8 @@ where // Compute the next aggregation step. let task = Arc::new(task); - let aggregation_job_for_compute = Arc::new(aggregation_job); + let aggregation_job = + Arc::new(aggregation_job.with_state(AggregationJobState::AwaitingRequest)); let report_aggregations = Arc::new( compute_helper_aggregate_continue( Arc::clone(&vdaf), @@ -1635,26 +1623,12 @@ where &task_aggregation_counters, ), Arc::clone(&task), - Arc::clone(&aggregation_job_for_compute), + Arc::clone(&aggregation_job), report_aggregations, ) .await, ); - // Determine the next Helper job state based on whether all reports are terminal. - let all_terminal = report_aggregations.iter().all(|ra| ra.is_terminal()); - let new_state = if all_terminal { - AggregationJobState::Finished - } else { - AggregationJobState::AwaitingRequest - }; - let aggregation_job = Arc::new( - aggregation_job_for_compute - .as_ref() - .clone() - .with_state(new_state), - ); - // Write results back to datastore. let metrics = AggregationJobWriterMetrics { report_aggregation_success_counter: self.aggregation_success_counter.clone(), diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index ff293026b..7944f22c3 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -810,11 +810,6 @@ where /// Update aggregation job states if all their report aggregations have reached a terminal /// state. - /// - /// This function is called during write operations to automatically transition jobs to - /// Finished when all constituent report aggregations are terminal (Finished or Failed). - /// This serves as a safety net to ensure jobs reach their terminal state even if they - /// weren't transitioned earlier in processing. fn update_aggregation_job_state_from_report_aggregations(&mut self) { // Update in-memory state of aggregation jobs: any aggregation jobs whose report // aggregations are all in a terminal state should be considered Finished (unless the From 37cb664c6aac952e8ab1847191cdfd4f75891ed9 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Thu, 12 Feb 2026 13:40:18 -0700 Subject: [PATCH 5/5] Wording updates from @tgeoghegan --- aggregator_core/src/datastore/models.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 3bb3996fc..6ce835f1c 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -509,7 +509,7 @@ impl> #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, ToSql, FromSql)] #[postgres(name = "aggregation_job_state")] pub enum AggregationJobState { - /// Job is ready for the aggregation job driver to pick up. Corresponds to the + /// Job is ready for the aggregation job driver to drive. Corresponds to the /// initialization phase in DAP (§4.6.2 [dap-16]). #[postgres(name = "ACTIVE")] Active, @@ -520,8 +520,8 @@ pub enum AggregationJobState { #[postgres(name = "AWAITING_REQUEST")] AwaitingRequest, /// All report aggregations have reached a terminal state (Finished or Failed), completing - /// the aggregation job lifecycle (§4.6 [dap-16]). Output shares are committed to batch - /// buckets for collection. This is a terminal state indicating normal completion. + /// the aggregation job lifecycle (§4.6 [dap-16]). Output shares have been committed to + /// batch buckets for collection. This is a terminal state indicating normal completion. #[postgres(name = "FINISHED")] Finished, /// Job processing encountered an unrecoverable error and was explicitly abandoned. This is @@ -531,8 +531,9 @@ pub enum AggregationJobState { /// normally. #[postgres(name = "ABANDONED")] Abandoned, - /// Job has been marked for deletion and should not be processed further. This is a terminal - /// state used during cleanup. + /// Job has been marked for deletion, either by garbage collection or by using the HTTP + /// DELETE endpoint, and should not be processed further. This is a terminal state used + /// during cleanup. #[postgres(name = "DELETED")] Deleted, }