From 5d00c5494dabd86e0ffc3a29f4ed4bceb80505b7 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Wed, 21 Jan 2026 09:35:26 +0100 Subject: [PATCH 1/5] go.mod: Bump to Go v1.25 Let's raise the minimum required Go version for the next minor StateDB release. This will allow using 'synctest' in the tests here. Signed-off-by: Jussi Maki --- .github/workflows/main.yml | 2 +- .github/workflows/pr.yml | 2 +- go.mod | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9f96165..8405d87 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,7 +5,7 @@ on: - main env: - GO_VERSION: 1.23 + GO_VERSION: 1.25.6 jobs: test: diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 3e64835..10f0305 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -5,7 +5,7 @@ on: - main env: - GO_VERSION: 1.23 + GO_VERSION: 1.25.6 jobs: test: diff --git a/go.mod b/go.mod index 2d26e7f..1c6f851 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/cilium/statedb -go 1.24 +go 1.25 require ( github.com/cilium/hive v0.0.0-20250731144630-28e7a35ed227 From 60adca361b2c06cf8b2d4aac51cde703e43a2bc6 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Wed, 21 Jan 2026 09:36:19 +0100 Subject: [PATCH 2/5] statedb: Add ChangeIterator.Close method If we use 'synctest' then we can't close channels outside the synctest bubble. The Changes() method creates a `*changeIterator` and registers a finalizer for it that unregisters the delete tracker from the table. As the delete trackers are stored in a 'part.Map` there's a watch channel that gets closed and this triggers a panic if this happens in a synctest. To avoid this issue add a 'Close()' method to the 'ChangeIterator' interface to allow optionally closing the iterator and avoiding the finalizer. Signed-off-by: Jussi Maki --- derive.go | 2 ++ iterator.go | 6 ++++++ observable.go | 1 + reconciler/reconciler.go | 1 + types.go | 6 ++++++ 5 files changed, 16 insertions(+) diff --git a/derive.go b/derive.go index 9ff2b99..398dd1b 100644 --- a/derive.go +++ b/derive.go @@ -74,6 +74,8 @@ func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error { if err != nil { return err } + defer iter.Close() + for { wtxn := d.DB.WriteTxn(out) changes, watch := iter.Next(wtxn) diff --git a/iterator.go b/iterator.go index db7ff8c..e25861e 100644 --- a/iterator.go +++ b/iterator.go @@ -6,6 +6,7 @@ package statedb import ( "fmt" "iter" + "runtime" "slices" "github.com/cilium/statedb/index" @@ -251,6 +252,11 @@ func (it *changeIterator[Obj]) nextAny(txn ReadTxn) (iter.Seq2[Change[any], Revi }, watch } +func (it *changeIterator[Obj]) Close() { + runtime.SetFinalizer(it, nil) + it.close() +} + func (it *changeIterator[Obj]) close() { it.iter = nil if it.dt != nil { diff --git a/observable.go b/observable.go index 67ea704..c856d57 100644 --- a/observable.go +++ b/observable.go @@ -32,6 +32,7 @@ func (to *observable[Obj]) Observe(ctx context.Context, next func(Change[Obj]), complete(err) return } + defer iter.Close() defer complete(nil) for { diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 181365f..b1ed7c6 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -43,6 +43,7 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) if err != nil { return fmt.Errorf("watching for changes failed: %w", err) } + defer changeIterator.Close() tableWatchChan := closedWatchChannel diff --git a/types.go b/types.go index 15e3ac6..e9fd365 100644 --- a/types.go +++ b/types.go @@ -110,6 +110,12 @@ type ChangeIterator[Obj any] interface { // // Next will panic if called with a WriteTxn that has locked the target table. Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{}) + + // Close the change iterator. Once all change iterators for a given table are closed + // deleted objects for that table are no longer set aside for the change iterators. + // + // Calling this method is optional as each iterator has a finalizer that closes it. + Close() } // RWTable provides methods for modifying the table under a write transaction From 0da40e8101c607db9a529a7a268ac8737da1fef7 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Mon, 19 Jan 2026 15:25:27 +0100 Subject: [PATCH 3/5] reconciler: Add WaitUntilReconciled method Add `Reconciler.WaitUntilReconciled` method to allow waiting for the reconciler to catch up processing to a given revision. Example usage: ``` wtxn := db.WriteTxn(table) table.Insert(wtxn, &Obj{ID: 1, Status: reconciler.StatusPending()}) table.Insert(wtxn, &Obj{ID: 2, Status: reconciler.StatusPending()}) table.Insert(wtxn, &Obj{ID: 3, Status: reconciler.StatusPending()}) revToWaitFor := table.Revision(wtxn) wtxn.Commit() // Block until reconciler has catched up to [revToWaitFor] or [ctx] // is cancelled. myReconciler.WaitUntilReconciled(ctx, revToWaitFor) ``` Signed-off-by: Jussi Maki --- reconciler/builder.go | 1 + reconciler/incremental.go | 24 +++-- reconciler/progress.go | 53 ++++++++++ reconciler/progress_test.go | 205 ++++++++++++++++++++++++++++++++++++ reconciler/reconciler.go | 10 +- reconciler/types.go | 6 ++ 6 files changed, 291 insertions(+), 8 deletions(-) create mode 100644 reconciler/progress.go create mode 100644 reconciler/progress_test.go diff --git a/reconciler/builder.go b/reconciler/builder.go index 44525ff..2bb114b 100644 --- a/reconciler/builder.go +++ b/reconciler/builder.go @@ -78,6 +78,7 @@ func Register[Obj comparable]( retries: newRetries(cfg.RetryBackoffMinDuration, cfg.RetryBackoffMaxDuration, objectToKey), externalPruneTrigger: make(chan struct{}, 1), primaryIndexer: idx, + progress: newProgressTracker(), } params.JobGroup.Add(job.OneShot("reconcile", r.reconcileLoop)) diff --git a/reconciler/incremental.go b/reconciler/incremental.go index 462ea53..e609a8f 100644 --- a/reconciler/incremental.go +++ b/reconciler/incremental.go @@ -45,13 +45,13 @@ type opResult struct { id uint64 // the "pending" identifier } -func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error { +func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, changed bool) { // Reconcile new and changed objects using either Operations // or BatchOperations. if incr.config.BatchOperations != nil { - incr.batch(ctx, txn, changes) + lastRev, changed = incr.batch(ctx, txn, changes) } else { - incr.single(ctx, txn, changes) + lastRev, changed = incr.single(ctx, txn, changes) } // Process objects that need to be retried that were not cleared. @@ -63,19 +63,22 @@ func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, chan // Since all failures are retried, we can return the errors from the retry // queue which includes both errors occurred in this round and the old // errors. - errs := incr.retries.errors() + errs = incr.retries.errors() incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs)) // Prepare for next round incr.numReconciled = 0 clear(incr.results) - return errs + return errs, lastRev, changed } -func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { +func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) { // Iterate in revision order through new and changed objects. for change, rev := range changes { + lastRev = rev + changed = true + obj := change.Object status := incr.config.GetObjectStatus(obj) @@ -95,14 +98,19 @@ func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, c break } } + + return } -func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { +func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) { ops := incr.config.BatchOperations updateBatch := []BatchEntry[Obj]{} deleteBatch := []BatchEntry[Obj]{} for change, rev := range changes { + lastRev = rev + changed = true + obj := change.Object status := incr.config.GetObjectStatus(obj) @@ -167,6 +175,8 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch incr.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original} } } + + return } func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) { diff --git a/reconciler/progress.go b/reconciler/progress.go new file mode 100644 index 0000000..e8da74a --- /dev/null +++ b/reconciler/progress.go @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package reconciler + +import ( + "context" + "sync" + + "github.com/cilium/statedb" +) + +// progressTracker tracks the highest revision observed as reconciled and +// allows callers to wait until a target revision is reached. +type progressTracker struct { + mu sync.Mutex + revision statedb.Revision + watch chan struct{} +} + +func newProgressTracker() *progressTracker { + return &progressTracker{ + watch: make(chan struct{}), + } +} + +func (p *progressTracker) update(rev statedb.Revision) { + p.mu.Lock() + if rev > p.revision { + p.revision = rev + close(p.watch) + p.watch = make(chan struct{}) + } + p.mu.Unlock() +} + +func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, error) { + for { + p.mu.Lock() + current := p.revision + watch := p.watch + p.mu.Unlock() + + if current >= rev { + return current, nil + } + select { + case <-ctx.Done(): + return current, ctx.Err() + case <-watch: + } + } +} diff --git a/reconciler/progress_test.go b/reconciler/progress_test.go new file mode 100644 index 0000000..21e4afc --- /dev/null +++ b/reconciler/progress_test.go @@ -0,0 +1,205 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package reconciler_test + +import ( + "context" + "fmt" + "iter" + "log/slog" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/cilium/hive" + "github.com/cilium/hive/cell" + "github.com/cilium/hive/hivetest" + "github.com/cilium/hive/job" + "github.com/cilium/statedb" + "github.com/cilium/statedb/index" + "github.com/cilium/statedb/reconciler" + "github.com/stretchr/testify/require" +) + +type waitObject struct { + ID uint64 + Status reconciler.Status +} + +// TableHeader implements statedb.TableWritable. +func (w waitObject) TableHeader() []string { + return []string{"ID", "Status"} +} + +// TableRow implements statedb.TableWritable. +func (w waitObject) TableRow() []string { + return []string{ + fmt.Sprintf("%d", w.ID), + w.Status.String(), + } +} + +var _ statedb.TableWritable = waitObject{} + +func (w *waitObject) Clone() *waitObject { + w2 := *w + return &w2 +} + +func (w *waitObject) GetStatus() reconciler.Status { + return w.Status +} + +func (w *waitObject) SetStatus(status reconciler.Status) *waitObject { + w.Status = status + return w +} + +var waitObjectIDIndex = statedb.Index[*waitObject, uint64]{ + Name: "id", + FromObject: func(obj *waitObject) index.KeySet { + return index.NewKeySet(index.Uint64(obj.ID)) + }, + FromKey: index.Uint64, + Unique: true, +} + +type waitOps struct { + started chan struct{} + unblock chan struct{} + startedOnce sync.Once +} + +func newWaitOps() *waitOps { + return &waitOps{ + started: make(chan struct{}), + unblock: make(chan struct{}), + } +} + +// Delete implements reconciler.Operations. +func (*waitOps) Delete(context.Context, statedb.ReadTxn, statedb.Revision, *waitObject) error { + return nil +} + +// Prune implements reconciler.Operations. +func (*waitOps) Prune(context.Context, statedb.ReadTxn, iter.Seq2[*waitObject, statedb.Revision]) error { + return nil +} + +// Update implements reconciler.Operations. +func (w *waitOps) Update(ctx context.Context, txn statedb.ReadTxn, rev statedb.Revision, obj *waitObject) error { + w.startedOnce.Do(func() { + close(w.started) + }) + select { + case <-w.unblock: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +var _ reconciler.Operations[*waitObject] = &waitOps{} + +func TestWaitUntilReconciled(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var ( + table statedb.RWTable[*waitObject] + db *statedb.DB + r reconciler.Reconciler[*waitObject] + ) + ops := newWaitOps() + + hive := hive.New( + statedb.Cell, + job.Cell, + cell.Provide( + cell.NewSimpleHealth, + reconciler.NewExpVarMetrics, + func(r job.Registry, h cell.Health, lc cell.Lifecycle) job.Group { + return r.NewGroup(h, lc) + }, + ), + cell.Invoke(func(db_ *statedb.DB) (err error) { + db = db_ + table, err = statedb.NewTable(db, "wait-objects", waitObjectIDIndex) + return err + }), + cell.Module("test", "test", + cell.Invoke(func(params reconciler.Params) error { + var err error + r, err = reconciler.Register( + params, + table, + (*waitObject).Clone, + (*waitObject).SetStatus, + (*waitObject).GetStatus, + ops, + nil, + reconciler.WithoutPruning(), + ) + return err + }), + ), + ) + + log := hivetest.Logger(t, hivetest.LogLevel(slog.LevelError)) + require.NoError(t, hive.Start(log, context.TODO()), "Start") + defer func() { + require.NoError(t, hive.Stop(log, context.TODO()), "Stop") + }() + + waitCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Won't block if we query with 0 revision. + _, err := r.WaitUntilReconciled(waitCtx, 0) + require.NoError(t, err) + + // Insert an object and wait for it to be reconciled. + wtxn := db.WriteTxn(table) + table.Insert(wtxn, &waitObject{ + ID: 1, + Status: reconciler.StatusPending(), + }) + revision := table.Revision(wtxn) + wtxn.Commit() + + type waitResult struct { + rev statedb.Revision + err error + } + done := make(chan waitResult, 1) + go func() { + rev, err := r.WaitUntilReconciled(waitCtx, revision) + done <- waitResult{rev: rev, err: err} + }() + + synctest.Wait() + select { + case <-ops.started: + default: + t.Fatal("expected update to start") + } + + select { + case result := <-done: + t.Fatalf("WaitUntilReconciled returned early: %v", result.err) + default: + } + + close(ops.unblock) + synctest.Wait() + + select { + case result := <-done: + require.NoError(t, result.err) + require.Equal(t, revision, result.rev) + default: + t.Fatal("expected WaitUntilReconciled to complete") + } + }) +} diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index b1ed7c6..dcae3a1 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -19,6 +19,7 @@ type reconciler[Obj comparable] struct { retries *retries externalPruneTrigger chan struct{} primaryIndexer statedb.Indexer[Obj] + progress *progressTracker } func (r *reconciler[Obj]) Prune() { @@ -28,6 +29,10 @@ func (r *reconciler[Obj]) Prune() { } } +func (r *reconciler[Obj]) WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, error) { + return r.progress.wait(ctx, untilRevision) +} + func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) error { var pruneTickerChan <-chan time.Time if r.config.PruneInterval > 0 { @@ -101,7 +106,10 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) // Perform incremental reconciliation and retries of previously failed // objects. - errs := incremental.run(ctx, txn, changes) + errs, lastRevision, changed := incremental.run(ctx, txn, changes) + if changed { + r.progress.update(lastRevision) + } if tableInitialized && (prune || externalPrune) { if err := r.prune(ctx, txn); err != nil { diff --git a/reconciler/types.go b/reconciler/types.go index 55aede2..fee0c9a 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -34,6 +34,12 @@ type Reconciler[Obj any] interface { // that something has gone wrong in the reconciliation target and full // reconciliation is needed to recover. Prune() + + // WaitUntilReconciled blocks until the reconciler has processed all + // table changes up to untilRevision. Returns the latest processed + // revision and ctx.Err() if the context is cancelled. + // Note: errors from Update/Delete are treated as reconciled. + WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, error) } // Params are the reconciler dependencies that are independent of the From 2989332a28fcda7aa1ab9421b1dd22c621343a3b Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Mon, 19 Jan 2026 15:31:25 +0100 Subject: [PATCH 4/5] reconciler: Remove NewStatusIndex and WaitForReconciliation These were never used by Cilium and they are very inefficient way of waiting for objects to be reconciled. Just drop these as we now have [Reconciler.WaitUntilReconciled]. Signed-off-by: Jussi Maki --- reconciler/example/types.go | 9 ++----- reconciler/index.go | 48 ------------------------------------- 2 files changed, 2 insertions(+), 55 deletions(-) delete mode 100644 reconciler/index.go diff --git a/reconciler/example/types.go b/reconciler/example/types.go index b4de5db..ce03836 100644 --- a/reconciler/example/types.go +++ b/reconciler/example/types.go @@ -71,17 +71,12 @@ var MemoNameIndex = statedb.Index[*Memo, string]{ Unique: true, } -// MemoStatusIndex indexes memos by their reconciliation status. -// This is mainly used by the reconciler to implement WaitForReconciliation. -var MemoStatusIndex = reconciler.NewStatusIndex((*Memo).GetStatus) - // NewMemoTable creates and registers the memos table. -func NewMemoTable(db *statedb.DB) (statedb.RWTable[*Memo], statedb.Index[*Memo, reconciler.StatusKind], error) { +func NewMemoTable(db *statedb.DB) (statedb.RWTable[*Memo], error) { tbl, err := statedb.NewTable( db, "memos", MemoNameIndex, - MemoStatusIndex, ) - return tbl, MemoStatusIndex, err + return tbl, err } diff --git a/reconciler/index.go b/reconciler/index.go deleted file mode 100644 index 9d4b5bf..0000000 --- a/reconciler/index.go +++ /dev/null @@ -1,48 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Cilium - -package reconciler - -import ( - "context" - - "github.com/cilium/statedb" - "github.com/cilium/statedb/index" -) - -// NewStatusIndex creates a status index for a table of reconcilable objects. -// This is optional and should be only used when there is a need to often check that all -// objects are fully reconciled that outweighs the cost of maintaining a status index. -func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind] { - return statedb.Index[Obj, StatusKind]{ - Name: "status", - FromObject: func(obj Obj) index.KeySet { - return index.NewKeySet(getObjectStatus(obj).Kind.Key()) - }, - FromKey: StatusKind.Key, - Unique: false, - } -} - -// WaitForReconciliation blocks until all objects have been reconciled or the context -// has cancelled. -func WaitForReconciliation[Obj any](ctx context.Context, db *statedb.DB, table statedb.Table[Obj], statusIndex statedb.Index[Obj, StatusKind]) error { - for { - txn := db.ReadTxn() - - // See if there are any pending or error'd objects. - _, _, watchPending, okPending := table.GetWatch(txn, statusIndex.Query(StatusKindPending)) - _, _, watchError, okError := table.GetWatch(txn, statusIndex.Query(StatusKindError)) - if !okPending && !okError { - return nil - } - - // Wait for updates before checking again. - select { - case <-ctx.Done(): - return ctx.Err() - case <-watchPending: - case <-watchError: - } - } -} From 0886cb6f1e24a87df0b9a2f561eb489baa60d8fe Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Wed, 21 Jan 2026 09:55:05 +0100 Subject: [PATCH 5/5] reconciler: Add 'retriesPending' to WaitUntilReconciled Extend [Reconciler.WaitUntilReconciled] to also indicate whether retries are pending for any objects with a revision below or equal to [untilRevision]. The committing of results is split into two: one after normal incremental processing of pending objects and one after processing retries. This way the entries that failed to reconcile are pushed to the retry queue and we can check the low watermark to produce 'retriesPending'. Signed-off-by: Jussi Maki --- iterator.go | 6 -- reconciler/incremental.go | 35 ++++---- reconciler/progress.go | 24 ++++-- reconciler/progress_test.go | 94 +++++++++++++++----- reconciler/reconciler.go | 8 +- reconciler/retries.go | 165 +++++++++++++++++++++++++++--------- reconciler/retries_test.go | 18 ++-- reconciler/types.go | 13 ++- table.go | 12 +-- 9 files changed, 261 insertions(+), 114 deletions(-) diff --git a/iterator.go b/iterator.go index e25861e..d908fdf 100644 --- a/iterator.go +++ b/iterator.go @@ -6,7 +6,6 @@ package statedb import ( "fmt" "iter" - "runtime" "slices" "github.com/cilium/statedb/index" @@ -253,11 +252,6 @@ func (it *changeIterator[Obj]) nextAny(txn ReadTxn) (iter.Seq2[Change[any], Revi } func (it *changeIterator[Obj]) Close() { - runtime.SetFinalizer(it, nil) - it.close() -} - -func (it *changeIterator[Obj]) close() { it.iter = nil if it.dt != nil { it.dt.close() diff --git a/reconciler/incremental.go b/reconciler/incremental.go index e609a8f..3336f11 100644 --- a/reconciler/incremental.go +++ b/reconciler/incremental.go @@ -45,20 +45,24 @@ type opResult struct { id uint64 // the "pending" identifier } -func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, changed bool) { +func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, retryLowWatermark statedb.Revision) { // Reconcile new and changed objects using either Operations // or BatchOperations. if incr.config.BatchOperations != nil { - lastRev, changed = incr.batch(ctx, txn, changes) + lastRev = incr.batch(ctx, txn, changes) } else { - lastRev, changed = incr.single(ctx, txn, changes) + lastRev = incr.single(ctx, txn, changes) } + // Commit status updates for new and changed objects. + newErrors := incr.commitStatus() + clear(incr.results) + // Process objects that need to be retried that were not cleared. - incr.processRetries(ctx, txn) + retryLowWatermark = incr.processRetries(ctx, txn) - // Finally commit the status updates. - newErrors := incr.commitStatus() + // Finally commit the status updates from retries. + newErrors += incr.commitStatus() // Since all failures are retried, we can return the errors from the retry // queue which includes both errors occurred in this round and the old @@ -66,18 +70,17 @@ func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, chan errs = incr.retries.errors() incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs)) - // Prepare for next round + // Prepare for next round. incr.numReconciled = 0 clear(incr.results) - return errs, lastRev, changed + return errs, lastRev, retryLowWatermark } -func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) { +func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) { // Iterate in revision order through new and changed objects. for change, rev := range changes { lastRev = rev - changed = true obj := change.Object @@ -102,14 +105,13 @@ func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, c return } -func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision, changed bool) { +func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) { ops := incr.config.BatchOperations updateBatch := []BatchEntry[Obj]{} deleteBatch := []BatchEntry[Obj]{} for change, rev := range changes { lastRev = rev - changed = true obj := change.Object @@ -152,7 +154,7 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch for _, entry := range deleteBatch { if entry.Result != nil { // Delete failed, queue a retry for it. - incr.retries.Add(entry.original, entry.Revision, true, entry.Result) + incr.retries.Add(entry.original, entry.Revision, entry.Revision, true, entry.Result) } } } @@ -179,7 +181,7 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch return } -func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) { +func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) statedb.Revision { now := time.Now() for incr.numReconciled < incr.config.IncrementalRoundSize { item, ok := incr.retries.Top() @@ -190,6 +192,7 @@ func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.Re incr.processSingle(ctx, txn, item.object.(Obj), item.rev, item.delete) incr.numReconciled++ } + return incr.retries.LowWatermark() } func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.ReadTxn, obj Obj, rev statedb.Revision, delete bool) { @@ -204,7 +207,7 @@ func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.Rea err = incr.config.Operations.Delete(ctx, txn, rev, obj) if err != nil { // Deletion failed. Retry again later. - incr.retries.Add(obj, rev, true, err) + incr.retries.Add(obj, rev, rev, true, err) } } else { // Clone the object so it can be mutated by Update() @@ -267,7 +270,7 @@ func (incr *incremental[Obj]) commitStatus() (numErrors int) { // Reconciliation of the object had failed and the status was updated // successfully (object had not changed). Queue the retry for the object. newRevision := incr.table.Revision(wtxn) - incr.retries.Add(result.original.(Obj), newRevision, false, result.err) + incr.retries.Add(result.original.(Obj), newRevision, result.rev, false, result.err) } } return diff --git a/reconciler/progress.go b/reconciler/progress.go index e8da74a..44eb5d6 100644 --- a/reconciler/progress.go +++ b/reconciler/progress.go @@ -13,9 +13,10 @@ import ( // progressTracker tracks the highest revision observed as reconciled and // allows callers to wait until a target revision is reached. type progressTracker struct { - mu sync.Mutex - revision statedb.Revision - watch chan struct{} + mu sync.Mutex + revision statedb.Revision + retryLowWatermark statedb.Revision + watch chan struct{} } func newProgressTracker() *progressTracker { @@ -24,29 +25,38 @@ func newProgressTracker() *progressTracker { } } -func (p *progressTracker) update(rev statedb.Revision) { +func (p *progressTracker) update(rev statedb.Revision, retryLowWatermark statedb.Revision) { p.mu.Lock() + updated := false if rev > p.revision { p.revision = rev + updated = true + } + if retryLowWatermark != p.retryLowWatermark { + p.retryLowWatermark = retryLowWatermark + updated = true + } + if updated { close(p.watch) p.watch = make(chan struct{}) } p.mu.Unlock() } -func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, error) { +func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, statedb.Revision, error) { for { p.mu.Lock() current := p.revision + retryLowWatermark := p.retryLowWatermark watch := p.watch p.mu.Unlock() if current >= rev { - return current, nil + return current, retryLowWatermark, nil } select { case <-ctx.Done(): - return current, ctx.Err() + return current, retryLowWatermark, ctx.Err() case <-watch: } } diff --git a/reconciler/progress_test.go b/reconciler/progress_test.go index 21e4afc..18fedf6 100644 --- a/reconciler/progress_test.go +++ b/reconciler/progress_test.go @@ -5,10 +5,12 @@ package reconciler_test import ( "context" + "errors" "fmt" "iter" "log/slog" "sync" + "sync/atomic" "testing" "testing/synctest" "time" @@ -25,24 +27,25 @@ import ( type waitObject struct { ID uint64 + Fail *atomic.Bool Status reconciler.Status } // TableHeader implements statedb.TableWritable. -func (w waitObject) TableHeader() []string { - return []string{"ID", "Status"} +func (w *waitObject) TableHeader() []string { + return []string{"ID", "Fail", "Status"} } // TableRow implements statedb.TableWritable. -func (w waitObject) TableRow() []string { +func (w *waitObject) TableRow() []string { + fail := w.Fail.Load() return []string{ fmt.Sprintf("%d", w.ID), + fmt.Sprintf("%t", fail), w.Status.String(), } } -var _ statedb.TableWritable = waitObject{} - func (w *waitObject) Clone() *waitObject { w2 := *w return &w2 @@ -69,14 +72,19 @@ var waitObjectIDIndex = statedb.Index[*waitObject, uint64]{ type waitOps struct { started chan struct{} unblock chan struct{} - startedOnce sync.Once + markStarted func() } func newWaitOps() *waitOps { - return &waitOps{ + w := &waitOps{ started: make(chan struct{}), unblock: make(chan struct{}), } + + w.markStarted = sync.OnceFunc(func() { + close(w.started) + }) + return w } // Delete implements reconciler.Operations. @@ -91,9 +99,10 @@ func (*waitOps) Prune(context.Context, statedb.ReadTxn, iter.Seq2[*waitObject, s // Update implements reconciler.Operations. func (w *waitOps) Update(ctx context.Context, txn statedb.ReadTxn, rev statedb.Revision, obj *waitObject) error { - w.startedOnce.Do(func() { - close(w.started) - }) + w.markStarted() + if obj.Fail.Load() { + return errors.New("fail") + } select { case <-w.unblock: return nil @@ -140,6 +149,7 @@ func TestWaitUntilReconciled(t *testing.T) { ops, nil, reconciler.WithoutPruning(), + reconciler.WithRetry(10*time.Millisecond, 10*time.Millisecond), ) return err }), @@ -156,32 +166,42 @@ func TestWaitUntilReconciled(t *testing.T) { defer cancel() // Won't block if we query with 0 revision. - _, err := r.WaitUntilReconciled(waitCtx, 0) + _, retryRevision, err := r.WaitUntilReconciled(waitCtx, 0) require.NoError(t, err) + require.Zero(t, retryRevision) // Insert an object and wait for it to be reconciled. wtxn := db.WriteTxn(table) table.Insert(wtxn, &waitObject{ ID: 1, + Fail: new(atomic.Bool), Status: reconciler.StatusPending(), }) revision := table.Revision(wtxn) wtxn.Commit() type waitResult struct { - rev statedb.Revision - err error + rev statedb.Revision + retryRevision statedb.Revision + err error } done := make(chan waitResult, 1) go func() { - rev, err := r.WaitUntilReconciled(waitCtx, revision) - done <- waitResult{rev: rev, err: err} + rev, retryRevision, err := r.WaitUntilReconciled(waitCtx, revision) + done <- waitResult{rev: rev, err: err, retryRevision: retryRevision} }() - synctest.Wait() - select { - case <-ops.started: - default: + started := false + for !started { + // Advance the fake time + time.Sleep(50 * time.Millisecond) + select { + case <-ops.started: + started = true + default: + } + } + if !started { t.Fatal("expected update to start") } @@ -192,14 +212,48 @@ func TestWaitUntilReconciled(t *testing.T) { } close(ops.unblock) + synctest.Wait() select { case result := <-done: require.NoError(t, result.err) - require.Equal(t, revision, result.rev) + require.Zero(t, result.retryRevision) default: t.Fatal("expected WaitUntilReconciled to complete") } + + wtxn = db.WriteTxn(table) + obj := &waitObject{ + ID: 2, + Fail: new(atomic.Bool), + Status: reconciler.StatusPending(), + } + obj.Fail.Store(true) + table.Insert(wtxn, obj) + + retryRevision = table.Revision(wtxn) + wtxn.Commit() + + synctest.Wait() + + origRetryRevision := retryRevision + rev, returnedRetryRevision, err := r.WaitUntilReconciled(waitCtx, origRetryRevision) + require.NoError(t, err) + require.Equal(t, origRetryRevision, rev) + require.Equal(t, origRetryRevision, returnedRetryRevision) + + obj.Fail.Store(false) + // Advance the fake time enough that a retry has happened. + time.Sleep(time.Second) + + obj, newRev, ok := table.Get(db.ReadTxn(), waitObjectIDIndex.Query(2)) + require.True(t, ok && obj.Status.Kind == reconciler.StatusKindDone) + + rev, returnedRetryRevision, err = r.WaitUntilReconciled(waitCtx, origRetryRevision) + require.NoError(t, err) + require.Equal(t, newRev, rev) + require.Zero(t, returnedRetryRevision) + }) } diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index dcae3a1..834ab72 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -29,7 +29,7 @@ func (r *reconciler[Obj]) Prune() { } } -func (r *reconciler[Obj]) WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, error) { +func (r *reconciler[Obj]) WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, statedb.Revision, error) { return r.progress.wait(ctx, untilRevision) } @@ -106,10 +106,8 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) // Perform incremental reconciliation and retries of previously failed // objects. - errs, lastRevision, changed := incremental.run(ctx, txn, changes) - if changed { - r.progress.update(lastRevision) - } + errs, lastRevision, retryLowWatermark := incremental.run(ctx, txn, changes) + r.progress.update(lastRevision, retryLowWatermark) if tableInitialized && (prune || externalPrune) { if err := r.prune(ctx, txn); err != nil { diff --git a/reconciler/retries.go b/reconciler/retries.go index cb3fd50..6eb5015 100644 --- a/reconciler/retries.go +++ b/reconciler/retries.go @@ -26,12 +26,29 @@ func (e *exponentialBackoff) Duration(attempt int) time.Duration { } func newRetries(minDuration, maxDuration time.Duration, objectToKey func(any) index.Key) *retries { + queue := newRetryPrioQueue( + func(items []*retryItem, i, j int) bool { + return items[i].retryAt.Before(items[j].retryAt) + }, + func(item *retryItem, idx int) { + item.index = idx + }, + ) + revQueue := newRetryPrioQueue( + func(items []*retryItem, i, j int) bool { + return items[i].origRev < items[j].origRev + }, + func(item *retryItem, idx int) { + item.revIndex = idx + }, + ) return &retries{ backoff: exponentialBackoff{ min: minDuration, max: maxDuration, }, - queue: nil, + queue: queue, + revQueue: revQueue, items: make(map[string]*retryItem), objectToKey: objectToKey, waitTimer: nil, @@ -43,8 +60,15 @@ func newRetries(minDuration, maxDuration time.Duration, objectToKey func(any) in // a priority queue ordered by retry time. Methods of 'retries' // are not safe to access concurrently. type retries struct { - backoff exponentialBackoff - queue retryPrioQueue + backoff exponentialBackoff + + // queue stores items to be retried by their 'retryAt' time + queue *retryPrioQueue + + // revQueue stores items by their original revision. Used to compute the + // low watermark revision in order to implement [WaitUntilReconciled]. + revQueue *retryPrioQueue + items map[string]*retryItem objectToKey func(any) index.Key waitTimer *time.Timer @@ -60,11 +84,13 @@ func (rq *retries) errors() []error { } type retryItem struct { - object any // the object that is being retried. 'any' to avoid specializing this internal code. - rev statedb.Revision - delete bool + object any // the object that is being retried. 'any' to avoid specializing this internal code. + rev statedb.Revision + origRev statedb.Revision + delete bool - index int // item's index in the priority queue + index int // item's index in the retry time priority queue + revIndex int // item's index in the revision priority queue retryAt time.Time // time at which to retry numRetries int // number of retries attempted (for calculating backoff) lastError error @@ -80,14 +106,13 @@ func (rq *retries) Top() (*retryItem, bool) { if rq.queue.Len() == 0 { return nil, false } - return rq.queue[0], true + return rq.queue.Peek(), true } func (rq *retries) Pop() { // Pop the object from the queue, but leave it into the map until // the object is cleared or re-added. - rq.queue[0].index = -1 - heap.Pop(&rq.queue) + rq.queue.PopItem() rq.resetTimer() } @@ -101,48 +126,60 @@ func (rq *retries) resetTimer() { if rq.queue.Len() == 0 { rq.waitTimer = nil } else { - d := time.Until(rq.queue[0].retryAt) + d := time.Until(rq.queue.Peek().retryAt) rq.waitTimer = time.AfterFunc(d, func() { close(waitChan) }) } } else if rq.queue.Len() > 0 { - d := time.Until(rq.queue[0].retryAt) + d := time.Until(rq.queue.Peek().retryAt) // Did not fire yet so we can just reset the timer. rq.waitTimer.Reset(d) } } -func (rq *retries) Add(obj any, rev statedb.Revision, delete bool, lastError error) { +func (rq *retries) Add(obj any, rev statedb.Revision, origRev statedb.Revision, delete bool, lastError error) { var ( item *retryItem ok bool ) key := rq.objectToKey(obj) - if item, ok = rq.items[string(key)]; !ok { + keyStr := string(key) + if item, ok = rq.items[keyStr]; !ok { item = &retryItem{ numRetries: 0, index: -1, + revIndex: -1, } - rq.items[string(key)] = item + rq.items[keyStr] = item } item.object = obj item.rev = rev + item.origRev = origRev item.delete = delete item.numRetries += 1 item.lastError = lastError duration := rq.backoff.Duration(item.numRetries) item.retryAt = time.Now().Add(duration) + // Add the item into the revision key'd priority queue + if item.revIndex >= 0 { + rq.revQueue.Fix(item.revIndex) + } else { + rq.revQueue.PushItem(item) + } + + // Add the item into the retryAt key'd priority queue if item.index >= 0 { // The item was already in the queue, fix up its position. - heap.Fix(&rq.queue, item.index) + rq.queue.Fix(item.index) } else { - heap.Push(&rq.queue, item) + rq.queue.PushItem(item) } // Item is at the head of the queue, reset the timer. if item.index == 0 { rq.resetTimer() } + } func (rq *retries) Clear(obj any) { @@ -150,49 +187,93 @@ func (rq *retries) Clear(obj any) { if item, ok := rq.items[string(key)]; ok { // Remove the object from the queue if it is still there. index := item.index // hold onto the index as heap.Remove messes with it - if item.index >= 0 && item.index < len(rq.queue) && - key.Equal(rq.objectToKey(rq.queue[item.index].object)) { - heap.Remove(&rq.queue, item.index) + if item.index >= 0 && item.index < len(rq.queue.items) && + key.Equal(rq.objectToKey(rq.queue.items[item.index].object)) { + rq.queue.Remove(item.index) // Reset the timer in case we removed the top item. if index == 0 { rq.resetTimer() } } + if item.revIndex >= 0 && item.revIndex < len(rq.revQueue.items) && + key.Equal(rq.objectToKey(rq.revQueue.items[item.revIndex].object)) { + rq.revQueue.Remove(item.revIndex) + } // Completely forget the object and its retry count. delete(rq.items, string(key)) } } -// retryPrioQueue is a slice-backed priority heap with the next -// expiring 'retryItem' at top. Implementation is adapted from the -// 'container/heap' PriorityQueue example. -type retryPrioQueue []*retryItem +func (rq *retries) LowWatermark() statedb.Revision { + for rq.revQueue.Len() > 0 { + top := rq.revQueue.Peek() + key := rq.objectToKey(top.object) + if item, ok := rq.items[string(key)]; ok && item == top { + return top.origRev + } + rq.revQueue.PopItem() + } + return 0 +} -func (pq retryPrioQueue) Len() int { return len(pq) } +type retryPrioQueue struct { + items []*retryItem + less func(items []*retryItem, i, j int) bool + setIndex func(*retryItem, int) +} -func (pq retryPrioQueue) Less(i, j int) bool { - return pq[i].retryAt.Before(pq[j].retryAt) +func newRetryPrioQueue( + less func(items []*retryItem, i, j int) bool, + setIndex func(*retryItem, int), +) *retryPrioQueue { + return &retryPrioQueue{ + less: less, + setIndex: setIndex, + } } -func (pq retryPrioQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j +func (hq *retryPrioQueue) Len() int { return len(hq.items) } + +func (hq *retryPrioQueue) Less(i, j int) bool { return hq.less(hq.items, i, j) } + +func (hq *retryPrioQueue) Swap(i, j int) { + hq.items[i], hq.items[j] = hq.items[j], hq.items[i] + hq.setIndex(hq.items[i], i) + hq.setIndex(hq.items[j], j) } -func (pq *retryPrioQueue) Push(x any) { - retryObj := x.(*retryItem) - retryObj.index = len(*pq) - *pq = append(*pq, retryObj) +func (hq *retryPrioQueue) Push(x any) { + item := x.(*retryItem) + hq.setIndex(item, len(hq.items)) + hq.items = append(hq.items, item) } -func (pq *retryPrioQueue) Pop() any { - old := *pq - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety - *pq = old[0 : n-1] +func (hq *retryPrioQueue) Pop() any { + n := len(hq.items) + item := hq.items[n-1] + hq.items[n-1] = nil // avoid memory leak + hq.setIndex(item, -1) + hq.items = hq.items[:n-1] return item } + +func (hq *retryPrioQueue) Peek() *retryItem { + return hq.items[0] +} + +func (hq *retryPrioQueue) PushItem(item *retryItem) { + heap.Push(hq, item) +} + +func (hq *retryPrioQueue) PopItem() *retryItem { + return heap.Pop(hq).(*retryItem) +} + +func (hq *retryPrioQueue) Fix(index int) { + heap.Fix(hq, index) +} + +func (hq *retryPrioQueue) Remove(index int) { + heap.Remove(hq, index) +} diff --git a/reconciler/retries_test.go b/reconciler/retries_test.go index bdb136e..3a21d3b 100644 --- a/reconciler/retries_test.go +++ b/reconciler/retries_test.go @@ -25,9 +25,9 @@ func TestRetries(t *testing.T) { // Add objects to be retried in order. We assume here that 'time.Time' has // enough granularity for these to be added with rising retryAt times. err := errors.New("some error") - rq.Add(obj1, 1, false, err) - rq.Add(obj2, 2, false, err) - rq.Add(obj3, 3, false, err) + rq.Add(obj1, 1, 1, false, err) + rq.Add(obj2, 2, 2, false, err) + rq.Add(obj3, 3, 3, false, err) errs := rq.errors() assert.Len(t, errs, 3) @@ -35,7 +35,7 @@ func TestRetries(t *testing.T) { // Adding an item a second time will increment the number of retries and // recalculate when it should be retried. - rq.Add(obj3, 3, false, err) + rq.Add(obj3, 3, 3, false, err) <-rq.Wait() item1, ok := rq.Top() @@ -75,8 +75,8 @@ func TestRetries(t *testing.T) { // Retry 'obj3' and since it was added back without clearing it'll be retried // later. Add obj1 and check that 'obj3' has later retry time. - rq.Add(obj3, 4, false, err) - rq.Add(obj1, 5, false, err) + rq.Add(obj3, 4, 4, false, err) + rq.Add(obj1, 5, 5, false, err) <-rq.Wait() item4, ok := rq.Top() @@ -107,9 +107,9 @@ func TestRetries(t *testing.T) { assert.False(t, ok) // Test that object can be cleared from the queue without popping it. - rq.Add(obj1, 6, false, err) - rq.Add(obj2, 7, false, err) - rq.Add(obj3, 8, false, err) + rq.Add(obj1, 6, 6, false, err) + rq.Add(obj2, 7, 7, false, err) + rq.Add(obj3, 8, 8, false, err) rq.Clear(obj1) // Remove obj1, testing that it'll fix the queue correctly. rq.Pop() // Pop and remove obj2 and clear it to test that Clear doesn't mess with queue diff --git a/reconciler/types.go b/reconciler/types.go index fee0c9a..1f20624 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -36,10 +36,15 @@ type Reconciler[Obj any] interface { Prune() // WaitUntilReconciled blocks until the reconciler has processed all - // table changes up to untilRevision. Returns the latest processed - // revision and ctx.Err() if the context is cancelled. - // Note: errors from Update/Delete are treated as reconciled. - WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, error) + // table changes up to untilRevision. Returns the revision to which + // objects have been attempted to be reconciled at least once, the lowest + // revision of an object that failed to reconcile and ctx.Err() if the context + // is cancelled. + // + // If you want to wait until objects have been successfully reconciled up to + // a specific revision then this method should be called repeatedly until + // both [revision] and [retryLowWatermark] are past the desired [untilRevision]. + WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (revision statedb.Revision, retryLowWatermark statedb.Revision, err error) } // Params are the reconciler dependencies that are independent of the diff --git a/table.go b/table.go index 2e01cf6..3ce8a5d 100644 --- a/table.go +++ b/table.go @@ -544,11 +544,6 @@ func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) { table: t, watch: closedWatchChannel, } - // Set a finalizer to unregister the delete tracker when the iterator - // is dropped. - runtime.SetFinalizer(iter, func(iter *changeIterator[Obj]) { - iter.close() - }) itxn := txn.unwrap() name := fmt.Sprintf("changes-%p", iter) @@ -564,6 +559,13 @@ func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) { return nil, err } + // Add a cleanup to unregister the delete tracker. + runtime.AddCleanup( + iter, + func(dt *deleteTracker[Obj]) { dt.close() }, + iter.dt, + ) + // Prime it. iter.refresh(txn)