Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
- main

env:
GO_VERSION: 1.23
GO_VERSION: 1.25.6

jobs:
test:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
- main

env:
GO_VERSION: 1.23
GO_VERSION: 1.25.6

jobs:
test:
Expand Down
2 changes: 2 additions & 0 deletions derive.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
if err != nil {
return err
}
defer iter.Close()

for {
wtxn := d.DB.WriteTxn(out)
changes, watch := iter.Next(wtxn)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/cilium/statedb

go 1.24
go 1.25

require (
github.com/cilium/hive v0.0.0-20250731144630-28e7a35ed227
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (it *changeIterator[Obj]) nextAny(txn ReadTxn) (iter.Seq2[Change[any], Revi
}, watch
}

func (it *changeIterator[Obj]) close() {
func (it *changeIterator[Obj]) Close() {
it.iter = nil
if it.dt != nil {
it.dt.close()
Expand Down
1 change: 1 addition & 0 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (to *observable[Obj]) Observe(ctx context.Context, next func(Change[Obj]),
complete(err)
return
}
defer iter.Close()
defer complete(nil)

for {
Expand Down
1 change: 1 addition & 0 deletions reconciler/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func Register[Obj comparable](
retries: newRetries(cfg.RetryBackoffMinDuration, cfg.RetryBackoffMaxDuration, objectToKey),
externalPruneTrigger: make(chan struct{}, 1),
primaryIndexer: idx,
progress: newProgressTracker(),
}

params.JobGroup.Add(job.OneShot("reconcile", r.reconcileLoop))
Expand Down
9 changes: 2 additions & 7 deletions reconciler/example/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,12 @@ var MemoNameIndex = statedb.Index[*Memo, string]{
Unique: true,
}

// MemoStatusIndex indexes memos by their reconciliation status.
// This is mainly used by the reconciler to implement WaitForReconciliation.
var MemoStatusIndex = reconciler.NewStatusIndex((*Memo).GetStatus)

// NewMemoTable creates and registers the memos table.
func NewMemoTable(db *statedb.DB) (statedb.RWTable[*Memo], statedb.Index[*Memo, reconciler.StatusKind], error) {
func NewMemoTable(db *statedb.DB) (statedb.RWTable[*Memo], error) {
tbl, err := statedb.NewTable(
db,
"memos",
MemoNameIndex,
MemoStatusIndex,
)
return tbl, MemoStatusIndex, err
return tbl, err
}
43 changes: 28 additions & 15 deletions reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,43 @@ type opResult struct {
id uint64 // the "pending" identifier
}

func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) []error {
func (incr *incremental[Obj]) run(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (errs []error, lastRev statedb.Revision, retryLowWatermark statedb.Revision) {
// Reconcile new and changed objects using either Operations
// or BatchOperations.
if incr.config.BatchOperations != nil {
incr.batch(ctx, txn, changes)
lastRev = incr.batch(ctx, txn, changes)
} else {
incr.single(ctx, txn, changes)
lastRev = incr.single(ctx, txn, changes)
}

// Commit status updates for new and changed objects.
newErrors := incr.commitStatus()
clear(incr.results)

// Process objects that need to be retried that were not cleared.
incr.processRetries(ctx, txn)
retryLowWatermark = incr.processRetries(ctx, txn)

// Finally commit the status updates.
newErrors := incr.commitStatus()
// Finally commit the status updates from retries.
newErrors += incr.commitStatus()

// Since all failures are retried, we can return the errors from the retry
// queue which includes both errors occurred in this round and the old
// errors.
errs := incr.retries.errors()
errs = incr.retries.errors()
incr.metrics.ReconciliationErrors(incr.moduleID, newErrors, len(errs))

// Prepare for next round
// Prepare for next round.
incr.numReconciled = 0
clear(incr.results)

return errs
return errs, lastRev, retryLowWatermark
}

func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) {
// Iterate in revision order through new and changed objects.
for change, rev := range changes {
lastRev = rev

obj := change.Object

status := incr.config.GetObjectStatus(obj)
Expand All @@ -95,14 +101,18 @@ func (incr *incremental[Obj]) single(ctx context.Context, txn statedb.ReadTxn, c
break
}
}

return
}

func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) {
func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, changes iter.Seq2[statedb.Change[Obj], statedb.Revision]) (lastRev statedb.Revision) {
ops := incr.config.BatchOperations
updateBatch := []BatchEntry[Obj]{}
deleteBatch := []BatchEntry[Obj]{}

for change, rev := range changes {
lastRev = rev

obj := change.Object

status := incr.config.GetObjectStatus(obj)
Expand Down Expand Up @@ -144,7 +154,7 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch
for _, entry := range deleteBatch {
if entry.Result != nil {
// Delete failed, queue a retry for it.
incr.retries.Add(entry.original, entry.Revision, true, entry.Result)
incr.retries.Add(entry.original, entry.Revision, entry.Revision, true, entry.Result)
}
}
}
Expand All @@ -167,9 +177,11 @@ func (incr *incremental[Obj]) batch(ctx context.Context, txn statedb.ReadTxn, ch
incr.results[entry.Object] = opResult{rev: entry.Revision, id: status.ID, err: entry.Result, original: entry.original}
}
}

return
}

func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) {
func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.ReadTxn) statedb.Revision {
now := time.Now()
for incr.numReconciled < incr.config.IncrementalRoundSize {
item, ok := incr.retries.Top()
Expand All @@ -180,6 +192,7 @@ func (incr *incremental[Obj]) processRetries(ctx context.Context, txn statedb.Re
incr.processSingle(ctx, txn, item.object.(Obj), item.rev, item.delete)
incr.numReconciled++
}
return incr.retries.LowWatermark()
}

func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.ReadTxn, obj Obj, rev statedb.Revision, delete bool) {
Expand All @@ -194,7 +207,7 @@ func (incr *incremental[Obj]) processSingle(ctx context.Context, txn statedb.Rea
err = incr.config.Operations.Delete(ctx, txn, rev, obj)
if err != nil {
// Deletion failed. Retry again later.
incr.retries.Add(obj, rev, true, err)
incr.retries.Add(obj, rev, rev, true, err)
}
} else {
// Clone the object so it can be mutated by Update()
Expand Down Expand Up @@ -257,7 +270,7 @@ func (incr *incremental[Obj]) commitStatus() (numErrors int) {
// Reconciliation of the object had failed and the status was updated
// successfully (object had not changed). Queue the retry for the object.
newRevision := incr.table.Revision(wtxn)
incr.retries.Add(result.original.(Obj), newRevision, false, result.err)
incr.retries.Add(result.original.(Obj), newRevision, result.rev, false, result.err)
}
}
return
Expand Down
48 changes: 0 additions & 48 deletions reconciler/index.go

This file was deleted.

63 changes: 63 additions & 0 deletions reconciler/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package reconciler

import (
"context"
"sync"

"github.com/cilium/statedb"
)

// progressTracker tracks the highest revision observed as reconciled and
// allows callers to wait until a target revision is reached.
type progressTracker struct {
mu sync.Mutex
revision statedb.Revision
retryLowWatermark statedb.Revision
watch chan struct{}
}

func newProgressTracker() *progressTracker {
return &progressTracker{
watch: make(chan struct{}),
}
}

func (p *progressTracker) update(rev statedb.Revision, retryLowWatermark statedb.Revision) {
p.mu.Lock()
updated := false
if rev > p.revision {
p.revision = rev
updated = true
}
if retryLowWatermark != p.retryLowWatermark {
p.retryLowWatermark = retryLowWatermark
updated = true
}
if updated {
close(p.watch)
p.watch = make(chan struct{})
}
p.mu.Unlock()
}

func (p *progressTracker) wait(ctx context.Context, rev statedb.Revision) (statedb.Revision, statedb.Revision, error) {
for {
p.mu.Lock()
current := p.revision
retryLowWatermark := p.retryLowWatermark
watch := p.watch
p.mu.Unlock()

if current >= rev {
return current, retryLowWatermark, nil
}
select {
case <-ctx.Done():
return current, retryLowWatermark, ctx.Err()
case <-watch:
}
}
}
Loading