diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9f96165..8405d87 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,7 +5,7 @@ on: - main env: - GO_VERSION: 1.23 + GO_VERSION: 1.25.6 jobs: test: diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 3e64835..10f0305 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -5,7 +5,7 @@ on: - main env: - GO_VERSION: 1.23 + GO_VERSION: 1.25.6 jobs: test: diff --git a/derive.go b/derive.go index 9ff2b99..398dd1b 100644 --- a/derive.go +++ b/derive.go @@ -74,6 +74,8 @@ func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error { if err != nil { return err } + defer iter.Close() + for { wtxn := d.DB.WriteTxn(out) changes, watch := iter.Next(wtxn) diff --git a/go.mod b/go.mod index 2d26e7f..1c6f851 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/cilium/statedb -go 1.24 +go 1.25 require ( github.com/cilium/hive v0.0.0-20250731144630-28e7a35ed227 diff --git a/iterator.go b/iterator.go index db7ff8c..d908fdf 100644 --- a/iterator.go +++ b/iterator.go @@ -251,7 +251,7 @@ func (it *changeIterator[Obj]) nextAny(txn ReadTxn) (iter.Seq2[Change[any], Revi }, watch } -func (it *changeIterator[Obj]) close() { +func (it *changeIterator[Obj]) Close() { it.iter = nil if it.dt != nil { it.dt.close() diff --git a/observable.go b/observable.go index 67ea704..c856d57 100644 --- a/observable.go +++ b/observable.go @@ -32,6 +32,7 @@ func (to *observable[Obj]) Observe(ctx context.Context, next func(Change[Obj]), complete(err) return } + defer iter.Close() defer complete(nil) for { diff --git a/reconciler/builder.go b/reconciler/builder.go index 44525ff..2bb114b 100644 --- a/reconciler/builder.go +++ b/reconciler/builder.go @@ -78,6 +78,7 @@ func Register[Obj comparable]( retries: newRetries(cfg.RetryBackoffMinDuration, cfg.RetryBackoffMaxDuration, objectToKey), externalPruneTrigger: make(chan struct{}, 1), primaryIndexer: idx, + progress: newProgressTracker(), } params.JobGroup.Add(job.OneShot("reconcile", r.reconcileLoop)) diff --git a/reconciler/example/types.go b/reconciler/example/types.go index b4de5db..ce03836 100644 --- a/reconciler/example/types.go +++ b/reconciler/example/types.go @@ -71,17 +71,12 @@ var MemoNameIndex = statedb.Index[*Memo, string]{ Unique: true, } -// MemoStatusIndex indexes memos by their reconciliation status. -// This is mainly used by the reconciler to implement WaitForReconciliation. -var MemoStatusIndex = reconciler.NewStatusIndex((*Memo).GetStatus) - // NewMemoTable creates and registers the memos table. -func NewMemoTable(db *statedb.DB) (statedb.RWTable[*Memo], statedb.Index[*Memo, reconciler.StatusKind], error) { +func NewMemoTable(db *statedb.DB) (statedb.RWTable[*Memo], error) { tbl, err := statedb.NewTable( db, "memos", MemoNameIndex, - MemoStatusIndex, ) - return tbl, MemoStatusIndex, err + return tbl, err } diff --git a/reconciler/incremental.go b/reconciler/incremental.go index 462ea53..3336f11 100644 --- a/reconciler/incremental.go +++ b/reconciler/incremental.go @@ -45,37 +45,43 @@ type opResult struct { id uint64 // the "pending" identifier } -func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error { +func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, retryLowWatermark statedb.Revision) { // Reconcile new and changed objects using either Operations // or BatchOperations. if incr.config.BatchOperations != nil { - incr.batch(ctx, txn, changes) + lastRev = incr.batch(ctx, txn, changes) } else { - incr.single(ctx, txn, changes) + lastRev = incr.single(ctx, txn, changes) } + // Commit status updates for new and changed objects. + newErrors := incr.commitStatus() + clear(incr.results) + // Process objects that need to be retried that were not cleared. - incr.processRetries(ctx, txn) + retryLowWatermark = incr.processRetries(ctx, txn) - // Finally commit the status updates. - newErrors := incr.commitStatus() + // Finally commit the status updates from retries. + newErrors += incr.commitStatus() // Since all failures are retried, we can return the errors from the retry // queue which includes both errors occurred in this round and the old // errors. - errs := incr.retries.errors() + errs = incr.retries.errors() incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs)) - // Prepare for next round + // Prepare for next round. incr.numReconciled = 0 clear(incr.results) - return errs + return errs, lastRev, retryLowWatermark } -func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { +func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) { // Iterate in revision order through new and changed objects. for change, rev := range changes { + lastRev = rev + obj := change.Object status := incr.config.GetObjectStatus(obj) @@ -95,14 +101,18 @@ func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, c break } } + + return } -func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) { +func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) { ops := incr.config.BatchOperations updateBatch := []BatchEntry[Obj]{} deleteBatch := []BatchEntry[Obj]{} for change, rev := range changes { + lastRev = rev + obj := change.Object status := incr.config.GetObjectStatus(obj) @@ -144,7 +154,7 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch for _, entry := range deleteBatch { if entry.Result != nil { // Delete failed, queue a retry for it. - incr.retries.Add(entry.original, entry.Revision, true, entry.Result) + incr.retries.Add(entry.original, entry.Revision, entry.Revision, true, entry.Result) } } } @@ -167,9 +177,11 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch incr.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original} } } + + return } -func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) { +func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) statedb.Revision { now := time.Now() for incr.numReconciled < incr.config.IncrementalRoundSize { item, ok := incr.retries.Top() @@ -180,6 +192,7 @@ func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.Re incr.processSingle(ctx, txn, item.object.(Obj), item.rev, item.delete) incr.numReconciled++ } + return incr.retries.LowWatermark() } func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.ReadTxn, obj Obj, rev statedb.Revision, delete bool) { @@ -194,7 +207,7 @@ func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.Rea err = incr.config.Operations.Delete(ctx, txn, rev, obj) if err != nil { // Deletion failed. Retry again later. - incr.retries.Add(obj, rev, true, err) + incr.retries.Add(obj, rev, rev, true, err) } } else { // Clone the object so it can be mutated by Update() @@ -257,7 +270,7 @@ func (incr *incremental[Obj]) commitStatus() (numErrors int) { // Reconciliation of the object had failed and the status was updated // successfully (object had not changed). Queue the retry for the object. newRevision := incr.table.Revision(wtxn) - incr.retries.Add(result.original.(Obj), newRevision, false, result.err) + incr.retries.Add(result.original.(Obj), newRevision, result.rev, false, result.err) } } return diff --git a/reconciler/index.go b/reconciler/index.go deleted file mode 100644 index 9d4b5bf..0000000 --- a/reconciler/index.go +++ /dev/null @@ -1,48 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Cilium - -package reconciler - -import ( - "context" - - "github.com/cilium/statedb" - "github.com/cilium/statedb/index" -) - -// NewStatusIndex creates a status index for a table of reconcilable objects. -// This is optional and should be only used when there is a need to often check that all -// objects are fully reconciled that outweighs the cost of maintaining a status index. -func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind] { - return statedb.Index[Obj, StatusKind]{ - Name: "status", - FromObject: func(obj Obj) index.KeySet { - return index.NewKeySet(getObjectStatus(obj).Kind.Key()) - }, - FromKey: StatusKind.Key, - Unique: false, - } -} - -// WaitForReconciliation blocks until all objects have been reconciled or the context -// has cancelled. -func WaitForReconciliation[Obj any](ctx context.Context, db *statedb.DB, table statedb.Table[Obj], statusIndex statedb.Index[Obj, StatusKind]) error { - for { - txn := db.ReadTxn() - - // See if there are any pending or error'd objects. - _, _, watchPending, okPending := table.GetWatch(txn, statusIndex.Query(StatusKindPending)) - _, _, watchError, okError := table.GetWatch(txn, statusIndex.Query(StatusKindError)) - if !okPending && !okError { - return nil - } - - // Wait for updates before checking again. - select { - case <-ctx.Done(): - return ctx.Err() - case <-watchPending: - case <-watchError: - } - } -} diff --git a/reconciler/progress.go b/reconciler/progress.go new file mode 100644 index 0000000..44eb5d6 --- /dev/null +++ b/reconciler/progress.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package reconciler + +import ( + "context" + "sync" + + "github.com/cilium/statedb" +) + +// progressTracker tracks the highest revision observed as reconciled and +// allows callers to wait until a target revision is reached. +type progressTracker struct { + mu sync.Mutex + revision statedb.Revision + retryLowWatermark statedb.Revision + watch chan struct{} +} + +func newProgressTracker() *progressTracker { + return &progressTracker{ + watch: make(chan struct{}), + } +} + +func (p *progressTracker) update(rev statedb.Revision, retryLowWatermark statedb.Revision) { + p.mu.Lock() + updated := false + if rev > p.revision { + p.revision = rev + updated = true + } + if retryLowWatermark != p.retryLowWatermark { + p.retryLowWatermark = retryLowWatermark + updated = true + } + if updated { + close(p.watch) + p.watch = make(chan struct{}) + } + p.mu.Unlock() +} + +func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, statedb.Revision, error) { + for { + p.mu.Lock() + current := p.revision + retryLowWatermark := p.retryLowWatermark + watch := p.watch + p.mu.Unlock() + + if current >= rev { + return current, retryLowWatermark, nil + } + select { + case <-ctx.Done(): + return current, retryLowWatermark, ctx.Err() + case <-watch: + } + } +} diff --git a/reconciler/progress_test.go b/reconciler/progress_test.go new file mode 100644 index 0000000..18fedf6 --- /dev/null +++ b/reconciler/progress_test.go @@ -0,0 +1,259 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package reconciler_test + +import ( + "context" + "errors" + "fmt" + "iter" + "log/slog" + "sync" + "sync/atomic" + "testing" + "testing/synctest" + "time" + + "github.com/cilium/hive" + "github.com/cilium/hive/cell" + "github.com/cilium/hive/hivetest" + "github.com/cilium/hive/job" + "github.com/cilium/statedb" + "github.com/cilium/statedb/index" + "github.com/cilium/statedb/reconciler" + "github.com/stretchr/testify/require" +) + +type waitObject struct { + ID uint64 + Fail *atomic.Bool + Status reconciler.Status +} + +// TableHeader implements statedb.TableWritable. +func (w *waitObject) TableHeader() []string { + return []string{"ID", "Fail", "Status"} +} + +// TableRow implements statedb.TableWritable. +func (w *waitObject) TableRow() []string { + fail := w.Fail.Load() + return []string{ + fmt.Sprintf("%d", w.ID), + fmt.Sprintf("%t", fail), + w.Status.String(), + } +} + +func (w *waitObject) Clone() *waitObject { + w2 := *w + return &w2 +} + +func (w *waitObject) GetStatus() reconciler.Status { + return w.Status +} + +func (w *waitObject) SetStatus(status reconciler.Status) *waitObject { + w.Status = status + return w +} + +var waitObjectIDIndex = statedb.Index[*waitObject, uint64]{ + Name: "id", + FromObject: func(obj *waitObject) index.KeySet { + return index.NewKeySet(index.Uint64(obj.ID)) + }, + FromKey: index.Uint64, + Unique: true, +} + +type waitOps struct { + started chan struct{} + unblock chan struct{} + markStarted func() +} + +func newWaitOps() *waitOps { + w := &waitOps{ + started: make(chan struct{}), + unblock: make(chan struct{}), + } + + w.markStarted = sync.OnceFunc(func() { + close(w.started) + }) + return w +} + +// Delete implements reconciler.Operations. +func (*waitOps) Delete(context.Context, statedb.ReadTxn, statedb.Revision, *waitObject) error { + return nil +} + +// Prune implements reconciler.Operations. +func (*waitOps) Prune(context.Context, statedb.ReadTxn, iter.Seq2[*waitObject, statedb.Revision]) error { + return nil +} + +// Update implements reconciler.Operations. +func (w *waitOps) Update(ctx context.Context, txn statedb.ReadTxn, rev statedb.Revision, obj *waitObject) error { + w.markStarted() + if obj.Fail.Load() { + return errors.New("fail") + } + select { + case <-w.unblock: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +var _ reconciler.Operations[*waitObject] = &waitOps{} + +func TestWaitUntilReconciled(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var ( + table statedb.RWTable[*waitObject] + db *statedb.DB + r reconciler.Reconciler[*waitObject] + ) + ops := newWaitOps() + + hive := hive.New( + statedb.Cell, + job.Cell, + cell.Provide( + cell.NewSimpleHealth, + reconciler.NewExpVarMetrics, + func(r job.Registry, h cell.Health, lc cell.Lifecycle) job.Group { + return r.NewGroup(h, lc) + }, + ), + cell.Invoke(func(db_ *statedb.DB) (err error) { + db = db_ + table, err = statedb.NewTable(db, "wait-objects", waitObjectIDIndex) + return err + }), + cell.Module("test", "test", + cell.Invoke(func(params reconciler.Params) error { + var err error + r, err = reconciler.Register( + params, + table, + (*waitObject).Clone, + (*waitObject).SetStatus, + (*waitObject).GetStatus, + ops, + nil, + reconciler.WithoutPruning(), + reconciler.WithRetry(10*time.Millisecond, 10*time.Millisecond), + ) + return err + }), + ), + ) + + log := hivetest.Logger(t, hivetest.LogLevel(slog.LevelError)) + require.NoError(t, hive.Start(log, context.TODO()), "Start") + defer func() { + require.NoError(t, hive.Stop(log, context.TODO()), "Stop") + }() + + waitCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Won't block if we query with 0 revision. + _, retryRevision, err := r.WaitUntilReconciled(waitCtx, 0) + require.NoError(t, err) + require.Zero(t, retryRevision) + + // Insert an object and wait for it to be reconciled. + wtxn := db.WriteTxn(table) + table.Insert(wtxn, &waitObject{ + ID: 1, + Fail: new(atomic.Bool), + Status: reconciler.StatusPending(), + }) + revision := table.Revision(wtxn) + wtxn.Commit() + + type waitResult struct { + rev statedb.Revision + retryRevision statedb.Revision + err error + } + done := make(chan waitResult, 1) + go func() { + rev, retryRevision, err := r.WaitUntilReconciled(waitCtx, revision) + done <- waitResult{rev: rev, err: err, retryRevision: retryRevision} + }() + + started := false + for !started { + // Advance the fake time + time.Sleep(50 * time.Millisecond) + select { + case <-ops.started: + started = true + default: + } + } + if !started { + t.Fatal("expected update to start") + } + + select { + case result := <-done: + t.Fatalf("WaitUntilReconciled returned early: %v", result.err) + default: + } + + close(ops.unblock) + + synctest.Wait() + + select { + case result := <-done: + require.NoError(t, result.err) + require.Zero(t, result.retryRevision) + default: + t.Fatal("expected WaitUntilReconciled to complete") + } + + wtxn = db.WriteTxn(table) + obj := &waitObject{ + ID: 2, + Fail: new(atomic.Bool), + Status: reconciler.StatusPending(), + } + obj.Fail.Store(true) + table.Insert(wtxn, obj) + + retryRevision = table.Revision(wtxn) + wtxn.Commit() + + synctest.Wait() + + origRetryRevision := retryRevision + rev, returnedRetryRevision, err := r.WaitUntilReconciled(waitCtx, origRetryRevision) + require.NoError(t, err) + require.Equal(t, origRetryRevision, rev) + require.Equal(t, origRetryRevision, returnedRetryRevision) + + obj.Fail.Store(false) + // Advance the fake time enough that a retry has happened. + time.Sleep(time.Second) + + obj, newRev, ok := table.Get(db.ReadTxn(), waitObjectIDIndex.Query(2)) + require.True(t, ok && obj.Status.Kind == reconciler.StatusKindDone) + + rev, returnedRetryRevision, err = r.WaitUntilReconciled(waitCtx, origRetryRevision) + require.NoError(t, err) + require.Equal(t, newRev, rev) + require.Zero(t, returnedRetryRevision) + + }) +} diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 181365f..834ab72 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -19,6 +19,7 @@ type reconciler[Obj comparable] struct { retries *retries externalPruneTrigger chan struct{} primaryIndexer statedb.Indexer[Obj] + progress *progressTracker } func (r *reconciler[Obj]) Prune() { @@ -28,6 +29,10 @@ func (r *reconciler[Obj]) Prune() { } } +func (r *reconciler[Obj]) WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (statedb.Revision, statedb.Revision, error) { + return r.progress.wait(ctx, untilRevision) +} + func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) error { var pruneTickerChan <-chan time.Time if r.config.PruneInterval > 0 { @@ -43,6 +48,7 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) if err != nil { return fmt.Errorf("watching for changes failed: %w", err) } + defer changeIterator.Close() tableWatchChan := closedWatchChannel @@ -100,7 +106,8 @@ func (r *reconciler[Obj]) reconcileLoop(ctx context.Context, health cell.Health) // Perform incremental reconciliation and retries of previously failed // objects. - errs := incremental.run(ctx, txn, changes) + errs, lastRevision, retryLowWatermark := incremental.run(ctx, txn, changes) + r.progress.update(lastRevision, retryLowWatermark) if tableInitialized && (prune || externalPrune) { if err := r.prune(ctx, txn); err != nil { diff --git a/reconciler/retries.go b/reconciler/retries.go index cb3fd50..6eb5015 100644 --- a/reconciler/retries.go +++ b/reconciler/retries.go @@ -26,12 +26,29 @@ func (e *exponentialBackoff) Duration(attempt int) time.Duration { } func newRetries(minDuration, maxDuration time.Duration, objectToKey func(any) index.Key) *retries { + queue := newRetryPrioQueue( + func(items []*retryItem, i, j int) bool { + return items[i].retryAt.Before(items[j].retryAt) + }, + func(item *retryItem, idx int) { + item.index = idx + }, + ) + revQueue := newRetryPrioQueue( + func(items []*retryItem, i, j int) bool { + return items[i].origRev < items[j].origRev + }, + func(item *retryItem, idx int) { + item.revIndex = idx + }, + ) return &retries{ backoff: exponentialBackoff{ min: minDuration, max: maxDuration, }, - queue: nil, + queue: queue, + revQueue: revQueue, items: make(map[string]*retryItem), objectToKey: objectToKey, waitTimer: nil, @@ -43,8 +60,15 @@ func newRetries(minDuration, maxDuration time.Duration, objectToKey func(any) in // a priority queue ordered by retry time. Methods of 'retries' // are not safe to access concurrently. type retries struct { - backoff exponentialBackoff - queue retryPrioQueue + backoff exponentialBackoff + + // queue stores items to be retried by their 'retryAt' time + queue *retryPrioQueue + + // revQueue stores items by their original revision. Used to compute the + // low watermark revision in order to implement [WaitUntilReconciled]. + revQueue *retryPrioQueue + items map[string]*retryItem objectToKey func(any) index.Key waitTimer *time.Timer @@ -60,11 +84,13 @@ func (rq *retries) errors() []error { } type retryItem struct { - object any // the object that is being retried. 'any' to avoid specializing this internal code. - rev statedb.Revision - delete bool + object any // the object that is being retried. 'any' to avoid specializing this internal code. + rev statedb.Revision + origRev statedb.Revision + delete bool - index int // item's index in the priority queue + index int // item's index in the retry time priority queue + revIndex int // item's index in the revision priority queue retryAt time.Time // time at which to retry numRetries int // number of retries attempted (for calculating backoff) lastError error @@ -80,14 +106,13 @@ func (rq *retries) Top() (*retryItem, bool) { if rq.queue.Len() == 0 { return nil, false } - return rq.queue[0], true + return rq.queue.Peek(), true } func (rq *retries) Pop() { // Pop the object from the queue, but leave it into the map until // the object is cleared or re-added. - rq.queue[0].index = -1 - heap.Pop(&rq.queue) + rq.queue.PopItem() rq.resetTimer() } @@ -101,48 +126,60 @@ func (rq *retries) resetTimer() { if rq.queue.Len() == 0 { rq.waitTimer = nil } else { - d := time.Until(rq.queue[0].retryAt) + d := time.Until(rq.queue.Peek().retryAt) rq.waitTimer = time.AfterFunc(d, func() { close(waitChan) }) } } else if rq.queue.Len() > 0 { - d := time.Until(rq.queue[0].retryAt) + d := time.Until(rq.queue.Peek().retryAt) // Did not fire yet so we can just reset the timer. rq.waitTimer.Reset(d) } } -func (rq *retries) Add(obj any, rev statedb.Revision, delete bool, lastError error) { +func (rq *retries) Add(obj any, rev statedb.Revision, origRev statedb.Revision, delete bool, lastError error) { var ( item *retryItem ok bool ) key := rq.objectToKey(obj) - if item, ok = rq.items[string(key)]; !ok { + keyStr := string(key) + if item, ok = rq.items[keyStr]; !ok { item = &retryItem{ numRetries: 0, index: -1, + revIndex: -1, } - rq.items[string(key)] = item + rq.items[keyStr] = item } item.object = obj item.rev = rev + item.origRev = origRev item.delete = delete item.numRetries += 1 item.lastError = lastError duration := rq.backoff.Duration(item.numRetries) item.retryAt = time.Now().Add(duration) + // Add the item into the revision key'd priority queue + if item.revIndex >= 0 { + rq.revQueue.Fix(item.revIndex) + } else { + rq.revQueue.PushItem(item) + } + + // Add the item into the retryAt key'd priority queue if item.index >= 0 { // The item was already in the queue, fix up its position. - heap.Fix(&rq.queue, item.index) + rq.queue.Fix(item.index) } else { - heap.Push(&rq.queue, item) + rq.queue.PushItem(item) } // Item is at the head of the queue, reset the timer. if item.index == 0 { rq.resetTimer() } + } func (rq *retries) Clear(obj any) { @@ -150,49 +187,93 @@ func (rq *retries) Clear(obj any) { if item, ok := rq.items[string(key)]; ok { // Remove the object from the queue if it is still there. index := item.index // hold onto the index as heap.Remove messes with it - if item.index >= 0 && item.index < len(rq.queue) && - key.Equal(rq.objectToKey(rq.queue[item.index].object)) { - heap.Remove(&rq.queue, item.index) + if item.index >= 0 && item.index < len(rq.queue.items) && + key.Equal(rq.objectToKey(rq.queue.items[item.index].object)) { + rq.queue.Remove(item.index) // Reset the timer in case we removed the top item. if index == 0 { rq.resetTimer() } } + if item.revIndex >= 0 && item.revIndex < len(rq.revQueue.items) && + key.Equal(rq.objectToKey(rq.revQueue.items[item.revIndex].object)) { + rq.revQueue.Remove(item.revIndex) + } // Completely forget the object and its retry count. delete(rq.items, string(key)) } } -// retryPrioQueue is a slice-backed priority heap with the next -// expiring 'retryItem' at top. Implementation is adapted from the -// 'container/heap' PriorityQueue example. -type retryPrioQueue []*retryItem +func (rq *retries) LowWatermark() statedb.Revision { + for rq.revQueue.Len() > 0 { + top := rq.revQueue.Peek() + key := rq.objectToKey(top.object) + if item, ok := rq.items[string(key)]; ok && item == top { + return top.origRev + } + rq.revQueue.PopItem() + } + return 0 +} -func (pq retryPrioQueue) Len() int { return len(pq) } +type retryPrioQueue struct { + items []*retryItem + less func(items []*retryItem, i, j int) bool + setIndex func(*retryItem, int) +} -func (pq retryPrioQueue) Less(i, j int) bool { - return pq[i].retryAt.Before(pq[j].retryAt) +func newRetryPrioQueue( + less func(items []*retryItem, i, j int) bool, + setIndex func(*retryItem, int), +) *retryPrioQueue { + return &retryPrioQueue{ + less: less, + setIndex: setIndex, + } } -func (pq retryPrioQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j +func (hq *retryPrioQueue) Len() int { return len(hq.items) } + +func (hq *retryPrioQueue) Less(i, j int) bool { return hq.less(hq.items, i, j) } + +func (hq *retryPrioQueue) Swap(i, j int) { + hq.items[i], hq.items[j] = hq.items[j], hq.items[i] + hq.setIndex(hq.items[i], i) + hq.setIndex(hq.items[j], j) } -func (pq *retryPrioQueue) Push(x any) { - retryObj := x.(*retryItem) - retryObj.index = len(*pq) - *pq = append(*pq, retryObj) +func (hq *retryPrioQueue) Push(x any) { + item := x.(*retryItem) + hq.setIndex(item, len(hq.items)) + hq.items = append(hq.items, item) } -func (pq *retryPrioQueue) Pop() any { - old := *pq - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety - *pq = old[0 : n-1] +func (hq *retryPrioQueue) Pop() any { + n := len(hq.items) + item := hq.items[n-1] + hq.items[n-1] = nil // avoid memory leak + hq.setIndex(item, -1) + hq.items = hq.items[:n-1] return item } + +func (hq *retryPrioQueue) Peek() *retryItem { + return hq.items[0] +} + +func (hq *retryPrioQueue) PushItem(item *retryItem) { + heap.Push(hq, item) +} + +func (hq *retryPrioQueue) PopItem() *retryItem { + return heap.Pop(hq).(*retryItem) +} + +func (hq *retryPrioQueue) Fix(index int) { + heap.Fix(hq, index) +} + +func (hq *retryPrioQueue) Remove(index int) { + heap.Remove(hq, index) +} diff --git a/reconciler/retries_test.go b/reconciler/retries_test.go index bdb136e..3a21d3b 100644 --- a/reconciler/retries_test.go +++ b/reconciler/retries_test.go @@ -25,9 +25,9 @@ func TestRetries(t *testing.T) { // Add objects to be retried in order. We assume here that 'time.Time' has // enough granularity for these to be added with rising retryAt times. err := errors.New("some error") - rq.Add(obj1, 1, false, err) - rq.Add(obj2, 2, false, err) - rq.Add(obj3, 3, false, err) + rq.Add(obj1, 1, 1, false, err) + rq.Add(obj2, 2, 2, false, err) + rq.Add(obj3, 3, 3, false, err) errs := rq.errors() assert.Len(t, errs, 3) @@ -35,7 +35,7 @@ func TestRetries(t *testing.T) { // Adding an item a second time will increment the number of retries and // recalculate when it should be retried. - rq.Add(obj3, 3, false, err) + rq.Add(obj3, 3, 3, false, err) <-rq.Wait() item1, ok := rq.Top() @@ -75,8 +75,8 @@ func TestRetries(t *testing.T) { // Retry 'obj3' and since it was added back without clearing it'll be retried // later. Add obj1 and check that 'obj3' has later retry time. - rq.Add(obj3, 4, false, err) - rq.Add(obj1, 5, false, err) + rq.Add(obj3, 4, 4, false, err) + rq.Add(obj1, 5, 5, false, err) <-rq.Wait() item4, ok := rq.Top() @@ -107,9 +107,9 @@ func TestRetries(t *testing.T) { assert.False(t, ok) // Test that object can be cleared from the queue without popping it. - rq.Add(obj1, 6, false, err) - rq.Add(obj2, 7, false, err) - rq.Add(obj3, 8, false, err) + rq.Add(obj1, 6, 6, false, err) + rq.Add(obj2, 7, 7, false, err) + rq.Add(obj3, 8, 8, false, err) rq.Clear(obj1) // Remove obj1, testing that it'll fix the queue correctly. rq.Pop() // Pop and remove obj2 and clear it to test that Clear doesn't mess with queue diff --git a/reconciler/types.go b/reconciler/types.go index 55aede2..1f20624 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -34,6 +34,17 @@ type Reconciler[Obj any] interface { // that something has gone wrong in the reconciliation target and full // reconciliation is needed to recover. Prune() + + // WaitUntilReconciled blocks until the reconciler has processed all + // table changes up to untilRevision. Returns the revision to which + // objects have been attempted to be reconciled at least once, the lowest + // revision of an object that failed to reconcile and ctx.Err() if the context + // is cancelled. + // + // If you want to wait until objects have been successfully reconciled up to + // a specific revision then this method should be called repeatedly until + // both [revision] and [retryLowWatermark] are past the desired [untilRevision]. + WaitUntilReconciled(ctx context.Context, untilRevision statedb.Revision) (revision statedb.Revision, retryLowWatermark statedb.Revision, err error) } // Params are the reconciler dependencies that are independent of the diff --git a/table.go b/table.go index 2e01cf6..3ce8a5d 100644 --- a/table.go +++ b/table.go @@ -544,11 +544,6 @@ func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) { table: t, watch: closedWatchChannel, } - // Set a finalizer to unregister the delete tracker when the iterator - // is dropped. - runtime.SetFinalizer(iter, func(iter *changeIterator[Obj]) { - iter.close() - }) itxn := txn.unwrap() name := fmt.Sprintf("changes-%p", iter) @@ -564,6 +559,13 @@ func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) { return nil, err } + // Add a cleanup to unregister the delete tracker. + runtime.AddCleanup( + iter, + func(dt *deleteTracker[Obj]) { dt.close() }, + iter.dt, + ) + // Prime it. iter.refresh(txn) diff --git a/types.go b/types.go index 15e3ac6..e9fd365 100644 --- a/types.go +++ b/types.go @@ -110,6 +110,12 @@ type ChangeIterator[Obj any] interface { // // Next will panic if called with a WriteTxn that has locked the target table. Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{}) + + // Close the change iterator. Once all change iterators for a given table are closed + // deleted objects for that table are no longer set aside for the change iterators. + // + // Calling this method is optional as each iterator has a finalizer that closes it. + Close() } // RWTable provides methods for modifying the table under a write transaction