diff --git a/.gitignore b/.gitignore index 3ce0214..d72f0e9 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ coverage.txt # vscode settings -.vscode/settings.json \ No newline at end of file +.vscode/settings.json +.aider* diff --git a/dedupequeue.go b/dedupequeue.go new file mode 100644 index 0000000..6ca621d --- /dev/null +++ b/dedupequeue.go @@ -0,0 +1,169 @@ +package inflight + +import ( + "container/list" + "sync" +) + +// OpQueue is a thread-safe duplicate operation suppression queue, that combines +// duplicate operations (queue entires) into sets that will be dequeued together. +// +// For example, If you enqueue an item with a key that already exists, then that +// item will be appended to that key's set of items. Otherwise the item is +// inserted into the head of the list as a new item. +// +// On Dequeue a SET is returned of all items that share a key in the queue. +// It blocks on dequeue if the queue is empty, but returns an error if the +// queue is full during enqueue. +type DedupeQueue struct { + mu sync.Mutex + cond sync.Cond + depth int + width int + q *list.List + entries map[ID]*OpSet + backup map[ID]*OpSet + closed bool +} + +// NewOpQueue create a new OpQueue. +func NewDedupeQueue(depth, width int) *DedupeQueue { + q := DedupeQueue{ + depth: depth, + width: width, + q: list.New(), + entries: map[ID]*OpSet{}, + backup: map[ID]*OpSet{}, + } + q.cond.L = &q.mu + return &q +} + +// Close releases resources associated with this callgroup, by canceling the context. +// The owner of this OpQueue should either call Close or cancel the context, both are +// equivalent. +func (q *DedupeQueue) Close() { + q.mu.Lock() + q.closed = true + q.mu.Unlock() + q.cond.Broadcast() // alert all dequeue calls that they should wake up and return. +} + +// Len returns the number of uniq IDs in the queue, that is the depth of the queue. +func (q *DedupeQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.q.Len() +} + +// Enqueue add the op to the queue. If the ID already exists then the Op +// is added to the existing OpSet for this ID, otherwise it's inserted as a new +// OpSet. +// +// Enqueue doesn't block if the queue if full, instead it returns a ErrQueueSaturated +// error. +func (q *DedupeQueue) Enqueue(id ID, op *Op) error { + q.mu.Lock() + defer q.mu.Unlock() + + if q.closed { + return ErrQueueClosed + } + + if set, ok := q.backup[id]; ok { + if len(set.Ops()) >= q.width { + return ErrQueueSaturatedWidth + } + + set.append(op) + return nil + } + + set, ok := q.entries[id] + if !ok { + // This is a new item, so we need to insert it into the queue. + if q.q.Len() >= q.depth { + return ErrQueueSaturatedDepth + } + q.newEntry(id, op) + + // Signal one waiting go routine to wake up and Dequeue + // I believe we only need to signal if we enqueue a new item. + // Consider the following possible states the queue could be in : + // 1. if no one is currently waiting in Dequeue, the signal isn't + // needed and all items will be dequeued on the next call to + // Dequeue. + // 2. One or Many go-routines are waiting in Dequeue because it's + // empty, and calling Signal will wake up one. Which will dequeue + // the item and return. + // 3. At most One go-routine is in the act of Dequeueing existing items + // from the queue (i.e. only one can have the lock and be in the "if OK" + // condition within the forloop in Dequeue). In which cause the signal + // is ignored and after returning we return to condition (1) above. + // Note signaled waiting go-routines will not be able the acquire + // the condition lock until this method call returns, finishing + // its append of the new operation. + q.cond.Signal() + return nil + } + if len(set.Ops()) >= q.width { + return ErrQueueSaturatedWidth + } + + set.append(op) + return nil +} + +// Dequeue removes the oldest OpSet from the queue and returns it. +// Dequeue will block if the Queue is empty. An Enqueue will wake the +// go routine up and it will continue on. +// +// If the OpQueue is closed, then Dequeue will return false +// for the second parameter. +func (q *DedupeQueue) Dequeue(callback func(*OpSet)) bool { + q.mu.Lock() + + for { + if id, set, ok := q.dequeue(); ok { + q.mu.Unlock() + callback(set) + + q.mu.Lock() + defer q.mu.Unlock() + if q.backup[id].Len() > 0 { + q.entries[id] = q.backup[id] + q.q.PushBack(id) + } + delete(q.backup, id) + return true + } + if q.closed { + q.mu.Unlock() + return false + } + q.cond.Wait() + } +} + +func (q *DedupeQueue) newEntry(id ID, op *Op) { + set := newOpSet(op) + q.entries[id] = set + q.q.PushBack(id) +} + +func (q *DedupeQueue) dequeue() (ID, *OpSet, bool) { + elem := q.q.Front() + if elem == nil { + return 0, nil, false + } + idt := q.q.Remove(elem) + id := idt.(ID) + + set, ok := q.entries[id] + if !ok { + panic("invariant broken: we dequeued a value that isn't in the map") + } + delete(q.entries, id) + q.backup[id] = &OpSet{} + return id, set, true +} diff --git a/dedupequeue_test.go b/dedupequeue_test.go new file mode 100644 index 0000000..41e41dd --- /dev/null +++ b/dedupequeue_test.go @@ -0,0 +1,389 @@ +package inflight + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/lytics/inflight/testutils" +) + +/* +To insure consistency I suggest running the test for a while with the following, +and if after 5 mins it never fails then we know the testcases are consistent. + while go test -v --race ; do echo `date` ; done +*/ + +func TestDedupeQueue(t *testing.T) { + t.Parallel() + completed1 := 0 + completed2 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + cg2 := NewCallGroup(func(finalState map[ID]*Response) { + completed2++ + }) + + now := time.Now() + op1_1 := cg1.Add(1, &tsMsg{123, now}) + op1_2 := cg1.Add(2, &tsMsg{111, now}) + op2_1 := cg2.Add(1, &tsMsg{123, now}) + op2_2 := cg2.Add(2, &tsMsg{111, now}) + + opq := NewDedupeQueue(10, 10) + defer opq.Close() + + { + err := opq.Enqueue(op1_1.Key, op1_1) + assert.Equal(t, nil, err) + err = opq.Enqueue(op2_1.Key, op2_1) + assert.Equal(t, nil, err) + err = opq.Enqueue(op1_2.Key, op1_2) + assert.Equal(t, nil, err) + err = opq.Enqueue(op2_2.Key, op2_2) + assert.Equal(t, nil, err) + assert.Equal(t, 2, opq.Len()) // only 2 keys + } + + opq.Dequeue(func(set1 *OpSet) { + assert.Equal(t, 2, len(set1.Ops())) + for _, op := range set1.Ops() { + op.Finish(nil, nil) + } + }) + assert.Equal(t, 0, completed1) + assert.Equal(t, 0, completed2) + opq.Dequeue(func(set2 *OpSet) { + assert.Equal(t, 2, len(set2.Ops())) + set2.FinishAll(nil, nil) + }) + + assert.Equal(t, 1, completed1) + assert.Equal(t, 1, completed2) +} + +func TestDedupeQueueClose(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + opq := NewDedupeQueue(10, 10) + now := time.Now() + + for i := 0; i < 9; i++ { + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) + err := opq.Enqueue(op.Key, op) + assert.Equal(t, nil, err) + } + + timer := time.AfterFunc(5*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + for i := 0; i < 9; i++ { + opq.Dequeue(func(set1 *OpSet) { + assert.Equal(t, 1, len(set1.Ops()), " at loop:%v set1_len:%v", i, len(set1.Ops())) + }) + } + timer.Stop() + + st := time.Now() + time.AfterFunc(10*time.Millisecond, func() { + opq.Close() // calling close should release the call to opq.Dequeue() + }) + open := opq.Dequeue(func(set1 *OpSet) { + panic("should not be called") + }) + assert.Equal(t, false, open) + rt := time.Since(st) + assert.True(t, rt >= 10*time.Millisecond, "we shouldn't have returned until Close was called: returned after:%v", rt) + +} + +func TestDedupeQueueFullDepth(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + opq := NewDedupeQueue(10, 10) + defer opq.Close() + + succuess := 0 + depthErrors := 0 + widthErrors := 0 + now := time.Now() + + for i := 0; i < 100; i++ { + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + succuess++ + case ErrQueueSaturatedDepth: + depthErrors++ + case ErrQueueSaturatedWidth: + widthErrors++ + default: + t.Fatalf("unexpected error: %v", err) + } + } + for i := 0; i < 10; i++ { + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + succuess++ + case ErrQueueSaturatedDepth: + depthErrors++ + case ErrQueueSaturatedWidth: + widthErrors++ + default: + t.Fatalf("unexpected error: %v", err) + } + } + assert.Equalf(t, succuess, 20, "expected 10, got:%v", succuess) + assert.Equalf(t, depthErrors, 90, "expected 90, got:%v", depthErrors) + assert.Equalf(t, widthErrors, 0, "expected 0, got:%v", widthErrors) + + timer := time.AfterFunc(5*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + for i := 0; i < 10; i++ { + open := opq.Dequeue( + func(set1 *OpSet) { + assert.Equal(t, 2, len(set1.Ops()), " at loop:%v set1_len:%v", i, len(set1.Ops())) + }, + ) + assert.True(t, open) + } + timer.Stop() +} + +// TestDedupeQueueFullWidth exactly like the test above, except we enqueue the SAME ID each time, +// so that we get ErrQueueSaturatedWidth errrors instead of ErrQueueSaturatedDepth errors. +func TestDedupeQueueFullWidth(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + opq := NewDedupeQueue(10, 10) + defer opq.Close() + + succuess := 0 + depthErrors := 0 + widthErrors := 0 + now := time.Now() + + for i := 0; i < 100; i++ { + op := cg1.Add(0, &tsMsg{uint64(i), now}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + succuess++ + case ErrQueueSaturatedDepth: + depthErrors++ + case ErrQueueSaturatedWidth: + widthErrors++ + default: + t.Fatalf("unexpected error: %v", err) + } + } + for i := 1; i < 10; i++ { + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + succuess++ + case ErrQueueSaturatedDepth: + depthErrors++ + case ErrQueueSaturatedWidth: + widthErrors++ + default: + t.Fatalf("unexpected error: %v", err) + } + } + assert.Equalf(t, succuess, 19, "expected 10, got:%v", succuess) + assert.Equalf(t, depthErrors, 0, "expected 0, got:%v", depthErrors) + assert.Equalf(t, widthErrors, 90, "expected 90, got:%v", widthErrors) + + timer := time.AfterFunc(5*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + + open := opq.Dequeue( + func(set1 *OpSet) { + assert.Equal(t, 10, len(set1.Ops()), " at loop:%v set1_len:%v", 0, len(set1.Ops())) // max width is 10, so we should get 10 in the first batch + }) + assert.True(t, open) + for i := 1; i < 10; i++ { + open := opq.Dequeue( + func(set1 *OpSet) { + assert.Equal(t, 1, len(set1.Ops()), " at loop:%v set1_len:%v", i, len(set1.Ops())) + }) + assert.True(t, open) + } + + timer.Stop() +} + +func TestDedupeQueueForRaceDetection(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + enqueueCnt := testutils.AtomicInt{} + dequeueCnt := testutils.AtomicInt{} + mergeCnt := testutils.AtomicInt{} + depthErrorCnt := testutils.AtomicInt{} + widthErrorCnt := testutils.AtomicInt{} + + opq := NewDedupeQueue(300, 500) + defer opq.Close() + + startingLine1 := sync.WaitGroup{} + startingLine2 := sync.WaitGroup{} + // block all go routines until the loop has finished spinning them up. + startingLine1.Add(1) + startingLine2.Add(1) + + finishLine, finish := context.WithCancel(context.Background()) + dequeFinishLine, deqFinish := context.WithCancel(context.Background()) + const concurrency = 2 + now := time.Now() + + for w := 0; w < concurrency; w++ { + go func(w int) { + startingLine1.Wait() + for i := 0; i < 1000000; i++ { + select { + case <-finishLine.Done(): + t.Logf("worker %v exiting at %v", w, i) + return + default: + } + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + enqueueCnt.Incr() + case ErrQueueSaturatedDepth: + depthErrorCnt.Incr() + case ErrQueueSaturatedWidth: + widthErrorCnt.Incr() + default: + t.Errorf("unexpected error: %v", err) + } + } + }(w) + } + + for w := 0; w < concurrency; w++ { + go func() { + startingLine2.Wait() + for { + select { + case <-dequeFinishLine.Done(): + return + default: + } + open := opq.Dequeue(func(set1 *OpSet) { + dequeueCnt.IncrBy(len(set1.Ops())) + if len(set1.Ops()) > 1 { + mergeCnt.Incr() + } + }) + select { + case <-dequeFinishLine.Done(): + return + default: + } + assert.True(t, open) + } + }() + } + startingLine1.Done() //release all the waiting workers. + startingLine2.Done() //release all the waiting workers. + + const runtime = 2 + timeout := time.AfterFunc((runtime+10)*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + defer timeout.Stop() + + //let the testcase run for N seconds + time.AfterFunc(runtime*time.Second, func() { + finish() + }) + <-finishLine.Done() + // Sleep to give the dequeue workers plenty of time to drain the queue before exiting. + time.Sleep(500 * time.Millisecond) + deqFinish() + + enq := enqueueCnt.Get() + deq := dequeueCnt.Get() + if enq != deq { + t.Fatalf("enqueueCnt and dequeueCnt should match: enq:% deq:%v", enq, deq) + } + // NOTE: I get the following performance on my laptop: + // dedupequeue_test.go:275: enqueue errors: 137075 mergedMsgs:2553 enqueueCnt:231437 dequeueCnt:231437 rate:115718 msgs/sec + // Over 100k msg a sec is more than fast enough... + t.Logf("Run Stats [note errors are expect for this test]") + t.Logf(" enqueue errors:[depth-errs:%v width-errs:%v]", depthErrorCnt.Get(), widthErrorCnt.Get()) + t.Logf(" mergedMsgs:%v enqueueCnt:%v dequeueCnt:%v rate:%v msgs/sec", mergeCnt.Get(), enq, deq, enq/runtime) +} + +func TestDedupeQueueCloseConcurrent(t *testing.T) { + t.Parallel() + + cg1 := NewCallGroup(func(finalState map[ID]*Response) {}) + cg2 := NewCallGroup(func(finalState map[ID]*Response) {}) + + now := time.Now() + + op1 := cg1.Add(1, &tsMsg{123, now}) + op2 := cg2.Add(2, &tsMsg{321, now}) + + oq := NewDedupeQueue(300, 500) + + var ops uint64 + var closes uint64 + const workers int = 12 + for i := 0; i < workers; i++ { + go func() { + for oq.Dequeue(func(set *OpSet) { atomic.AddUint64(&ops, 1) }) { + } + atomic.AddUint64(&closes, 1) + }() + } + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint64(0), atomic.LoadUint64(&ops)) // nothing should have been dequeued yet + assert.Equal(t, uint64(0), atomic.LoadUint64(&closes)) + + err := oq.Enqueue(op1.Key, op1) + assert.Equal(t, nil, err) + err = oq.Enqueue(op2.Key, op2) + assert.Equal(t, nil, err) + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued + assert.Equal(t, uint64(0), atomic.LoadUint64(&closes)) + + oq.Close() + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // we still only had 2 uniq keys seen + assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes)) +} diff --git a/opqueue.go b/opqueue.go index 1d6dd66..7ac4177 100644 --- a/opqueue.go +++ b/opqueue.go @@ -133,11 +133,6 @@ func (q *OpQueue) Dequeue() (*OpSet, bool) { if q.closed { return nil, false } - - // release the lock and wait until signaled. On awake we'll acquire the lock. - // After wait acquires the lock we have to recheck the wait condition, - // because it's possible that someone else - // drained the queue while, we were reacquiring the lock. q.cond.Wait() } } diff --git a/opset.go b/opset.go index 4a6d957..cec56c3 100644 --- a/opset.go +++ b/opset.go @@ -16,6 +16,10 @@ func (os *OpSet) append(op *Op) { os.set = append(os.set, op) } +func (os *OpSet) Len() int { + return len(os.set) +} + // Ops get the list of ops in this set. func (os *OpSet) Ops() []*Op { return os.set