From df098b83708f41df726aeb6f106cb59ee823a696 Mon Sep 17 00:00:00 2001 From: Adam Drescher Date: Sun, 14 Dec 2025 13:23:44 -0800 Subject: [PATCH 1/2] Add benchmark suite for ring implementation. Compares against buffered channels and a slice protected by a sync.Mutex, two common primitives for queue behavior. The comparison includes SPSC, MPSC, SPMC, and MPMC. It varies batch sizes and payload sizes. It also measures plain enqueue/dequeue and batched enqueue / dequeue. As tested on my M3 Pro Macbook, the ring buffer implementation is substantially better under any form of contention. The only place where it does not shine is SPSC, which is no surprise because the lock-free implementation is designed for concurrent access. --- ring/BUILD.bazel | 23 +- ring/channel_bench_test.go | 302 +++++++++++++++++++++++++++ ring/common_bench_test.go | 8 + ring/mutex_slice_bench_test.go | 371 +++++++++++++++++++++++++++++++++ ring/ring_bench_test.go | 305 +++++++++++++++++++++++++++ ring/ring_test.go | 213 ------------------- 6 files changed, 1007 insertions(+), 215 deletions(-) create mode 100644 ring/channel_bench_test.go create mode 100644 ring/common_bench_test.go create mode 100644 ring/mutex_slice_bench_test.go create mode 100644 ring/ring_bench_test.go diff --git a/ring/BUILD.bazel b/ring/BUILD.bazel index 2911635..3e39010 100644 --- a/ring/BUILD.bazel +++ b/ring/BUILD.bazel @@ -3,12 +3,16 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ring", srcs = ["ring.go"], + importpath = "github.com/freshdresch/go-data-structures/ring", visibility = ["//visibility:public"], ) +# Internal tests (white-box) go_test( - name = "ring_test", - srcs = ["ring_test.go"], + name = "ring_internal_test", + srcs = [ + "ring_test.go", + ], embed = [":ring"], deps = [ "@com_github_stretchr_testify//assert:assert", @@ -16,3 +20,18 @@ go_test( ], visibility = ["//:__subpackages__"], ) + +# External benchmarks (black-box) +go_test( + name = "ring_bench", + srcs = [ + "channel_bench_test.go", + "common_bench_test.go", + "mutex_slice_bench_test.go", + "ring_bench_test.go", + ], + deps = [ + ":ring", + ], + visibility = ["//:__subpackages__"], +) diff --git a/ring/channel_bench_test.go b/ring/channel_bench_test.go new file mode 100644 index 0000000..f455548 --- /dev/null +++ b/ring/channel_bench_test.go @@ -0,0 +1,302 @@ +package ring_test + +import ( + "fmt" + "sync" + "testing" +) + +type ChanWrapper[T any] struct { + ch chan T +} + +func NewChanWrapper[T any](capacity int) *ChanWrapper[T] { + return &ChanWrapper[T]{ch: make(chan T, capacity)} +} + +func (c *ChanWrapper[T]) EnqueueBurst(batch []T) { + for _, v := range batch { + c.ch <- v + } +} + +func (c *ChanWrapper[T]) DequeueBurst(n int) []T { + res := make([]T, 0, n) + for i := 0; i < n; i++ { + select { + case v := <-c.ch: + res = append(res, v) + default: + return res + } + } + return res +} + +func BenchmarkChan_SPSC_Seq(b *testing.B) { + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ch <- payload + <-ch + } + }) + } +} + +func BenchmarkChan_MPSC_Parallel(b *testing.B) { + const producers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, 1024) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < b.N*producers; i++ { + <-ch + } + wg.Wait() + }) + } +} + +func BenchmarkChan_SPMC_Parallel(b *testing.B) { + const consumers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, 1024) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + ch <- payload + } + wg.Wait() + }) + } +} + +func BenchmarkChan_MPMC_Parallel(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, 1024) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + prodWG.Wait() + consWG.Wait() + }) + } +} + +func BenchmarkChan_SPSC_Seq_Batch(b *testing.B) { + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ch <- payload + <-ch + } + }, + ) + } + } +} + +func BenchmarkChan_MPSC_Parallel_Batch(b *testing.B) { + const producers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < b.N*producers; i++ { + <-ch + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkChan_SPMC_Parallel_Batch(b *testing.B) { + const consumers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + ch <- payload + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkChan_MPMC_Parallel_Batch(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + prodWG.Wait() + consWG.Wait() + }, + ) + } + } +} diff --git a/ring/common_bench_test.go b/ring/common_bench_test.go new file mode 100644 index 0000000..c7fac04 --- /dev/null +++ b/ring/common_bench_test.go @@ -0,0 +1,8 @@ +package ring_test + +const ringSize = 1024 + +var ( + batchSizes = []int{1, 8, 16, 32, 64} + payloadSizes = []int{8, 32, 64, 256, 512} // bytes +) diff --git a/ring/mutex_slice_bench_test.go b/ring/mutex_slice_bench_test.go new file mode 100644 index 0000000..0e09ce3 --- /dev/null +++ b/ring/mutex_slice_bench_test.go @@ -0,0 +1,371 @@ +package ring_test + +import ( + "fmt" + "runtime" + "sync" + "testing" +) + +// A simple bounded FIFO implemented as a slice protected by a mutex. +// This represents the most common "roll-your-own" queue alternative. +type mutexSliceQueue[T any] struct { + mu sync.Mutex + buf []T + head int + tail int + mask int + used int +} + +func newMutexSliceQueue[T any](size int) *mutexSliceQueue[T] { + // size must be power of two for mask + return &mutexSliceQueue[T]{ + buf: make([]T, size), + mask: size - 1, + } +} + +func (q *mutexSliceQueue[T]) Enqueue(v T) { + for { + q.mu.Lock() + if q.used < len(q.buf)-1 { + q.buf[q.tail&q.mask] = v + q.tail++ + q.used++ + q.mu.Unlock() + return + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func (q *mutexSliceQueue[T]) Dequeue() T { + var zero T + for { + q.mu.Lock() + if q.used > 0 { + v := q.buf[q.head&q.mask] + q.buf[q.head&q.mask] = zero + q.head++ + q.used-- + q.mu.Unlock() + return v + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func (q *mutexSliceQueue[T]) EnqueueBatch(items []T) { + n := len(items) + for { + q.mu.Lock() + if q.used+n <= int(q.mask)+1 { + for i := 0; i < n; i++ { + baseIdx := q.tail & q.mask + q.buf[baseIdx] = items[i] + q.tail++ + } + q.used += n + q.mu.Unlock() + return + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func (q *mutexSliceQueue[T]) DequeueBatch(n int) []T { + var zero T + items := make([]T, 0, n) + + for { + q.mu.Lock() + if q.used > 0 { + if n > q.used { + n = q.used + } + for i := 0; i < n; i++ { + baseIdx := q.head & q.mask + items = append(items, q.buf[baseIdx]) + q.buf[baseIdx] = zero + q.head++ + } + q.used -= n + q.mu.Unlock() + return items + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func BenchmarkMutexSlice_SPSC_Seq(b *testing.B) { + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + q.Enqueue(payload) + _ = q.Dequeue() + } + }) + } +} + +func BenchmarkMutexSlice_MPSC_Parallel(b *testing.B) { + const producers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + q.Enqueue(payload) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _ = q.Dequeue() + } + wg.Wait() + }) + } +} + +func BenchmarkMutexSlice_SPMC_Parallel(b *testing.B) { + const consumers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = q.Dequeue() + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + q.Enqueue(payload) + } + wg.Wait() + }) + } +} + +func BenchmarkMutexSlice_MPMC_Parallel(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + q.Enqueue(payload) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _ = q.Dequeue() + } + }() + } + + prodWG.Wait() + consWG.Wait() + }) + } +} + +func BenchmarkMutexSlice_SPSC_Seq_Batch(b *testing.B) { + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + q.EnqueueBatch(payload) + _ = q.DequeueBatch(batchSize) + } + }, + ) + } + } +} + +func BenchmarkMutexSlice_MPSC_Parallel_Batch(b *testing.B) { + const producers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + q.EnqueueBatch(payload) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _ = q.DequeueBatch(batchSize) + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkMutexSlice_SPMC_Parallel_Batch(b *testing.B) { + const consumers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = q.DequeueBatch(batchSize) + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + q.EnqueueBatch(payload) + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkMutexSlice_MPMC_Parallel_Batch(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + q.EnqueueBatch(payload) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _ = q.DequeueBatch(batchSize) + } + }() + } + + prodWG.Wait() + consWG.Wait() + }, + ) + } + } +} diff --git a/ring/ring_bench_test.go b/ring/ring_bench_test.go new file mode 100644 index 0000000..b962007 --- /dev/null +++ b/ring/ring_bench_test.go @@ -0,0 +1,305 @@ +package ring_test + +import ( + "fmt" + "sync" + "testing" + + "github.com/freshdresch/go-data-structures/ring" +) + +func BenchmarkRing_SPSC_Seq(b *testing.B) { + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte](ringSize) + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = rb.Enqueue(p) + _, _ = rb.Dequeue() + } + }) + } +} + +func BenchmarkRing_SPMC_Parallel(b *testing.B) { + const consumers = 4 + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiConsDequeue[[]byte](), + ) + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.Dequeue() + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + _ = rb.Enqueue(p) + } + wg.Wait() + }) + } +} + +func BenchmarkRing_MPSC_Parallel(b *testing.B) { + const producers = 4 + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ) + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = rb.Enqueue(p) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _, _ = rb.Dequeue() + } + wg.Wait() + }) + } +} + +func BenchmarkRing_MPMC_Parallel(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ring.WithMultiConsDequeue[[]byte](), + ) + + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + var ( + prodWG sync.WaitGroup + consWG sync.WaitGroup + ) + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + _ = rb.Enqueue(p) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.Dequeue() + } + }() + } + + prodWG.Wait() + consWG.Wait() + }) + } +} + +func BenchmarkRing_SPSC_Seq_Batch(b *testing.B) { + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + }, + ) + } + } +} + +func BenchmarkRing_SPMC_Parallel_Batch(b *testing.B) { + const consumers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiConsDequeue[[]byte](), + ) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkRing_MPSC_Parallel_Batch(b *testing.B) { + const producers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + + wg.Wait() + }, + ) + } + } +} + +func BenchmarkRing_MPMC_Parallel_Batch(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ring.WithMultiConsDequeue[[]byte](), + ) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var ( + prodWG sync.WaitGroup + consWG sync.WaitGroup + ) + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + }() + } + + prodWG.Wait() + consWG.Wait() + }, + ) + } + } +} diff --git a/ring/ring_test.go b/ring/ring_test.go index 8d6e79a..a0a133d 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -474,219 +474,6 @@ func TestMPMC_OutOfOrderPublication(t *testing.T) { assert.Contains(t, set, 2) } -func BenchmarkSPSCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string](uint32(buflen)) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkSPSCRingBufferSeqBatch(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string](uint32(buflen)) - - item := "test" - batchSize := 10 - b.SetBytes(int64(len(item) * batchSize)) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - for j := 0; j < batchSize; j++ { - _ = buffer.Enqueue(item) - } - for j := 0; j < batchSize; j++ { - _, _ = buffer.Dequeue() - } - } -} - -func BenchmarkSPSCRingBufferContention(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string](uint32(buflen)) - - item := "test" - b.SetBytes(int64(len(item) * 2)) - b.ResetTimer() - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - } - }() - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - _, _ = buffer.Dequeue() - } - }() - wg.Wait() -} - -func BenchmarkSPMCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiConsDequeue[string](), - ) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkSPMCRingBufferParallel(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiConsDequeue[string](), - ) - - numConsumers := 4 - item := "test" - b.SetBytes(int64(len(item) * numConsumers)) - b.ResetTimer() - - var wg sync.WaitGroup - wg.Add(numConsumers) - - for i := 0; i < numConsumers; i++ { - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - _, _ = buffer.Dequeue() - } - }() - } - - for i := 0; i < b.N*numConsumers; i++ { - _ = buffer.Enqueue(item) - } - wg.Wait() -} - -func BenchmarkMPSCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - ) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkMPSCRingBufferParallel(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - ) - - numProducers := 4 - item := "test" - b.SetBytes(int64(len(item) * numProducers)) - b.ResetTimer() - - var wg sync.WaitGroup - wg.Add(numProducers) - - for i := 0; i < numProducers; i++ { - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - _ = buffer.Enqueue(item) - } - }() - } - - for i := 0; i < b.N*numProducers; i++ { - _, _ = buffer.Dequeue() - } - wg.Wait() -} - -func BenchmarkMPMCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - WithMultiConsDequeue[string](), - ) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkMPMCRingBufferParallel(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - WithMultiConsDequeue[string](), - ) - - numProducers := 4 - numConsumers := 4 - item := "test" - b.SetBytes(int64(len(item) * numProducers)) - b.ResetTimer() - - var producerWG sync.WaitGroup - producerWG.Add(numProducers) - - for i := 0; i < numProducers; i++ { - go func() { - defer producerWG.Done() - for j := 0; j < b.N; j++ { - _ = buffer.Enqueue(item) - } - }() - } - - var consumerWG sync.WaitGroup - consumerWG.Add(numConsumers) - - for i := 0; i < numConsumers; i++ { - go func() { - defer consumerWG.Done() - for j := 0; j < b.N; j++ { - _, _ = buffer.Dequeue() - } - }() - } - - producerWG.Wait() - consumerWG.Wait() -} - func assertNewRing[T any](t *testing.T, ring *Ring[T], count uint32) { assert.NotNil(t, ring.entries) assert.Equal(t, ring.size, count) From 48e66dd079c5204aeec03cef72e1aa18a018cb0b Mon Sep 17 00:00:00 2001 From: Adam Drescher Date: Sun, 14 Dec 2025 13:27:35 -0800 Subject: [PATCH 2/2] Add missing import path to other bazel build files. --- buffer/BUILD.bazel | 1 + maps/BUILD.bazel | 1 + set/BUILD.bazel | 1 + slices/BUILD.bazel | 1 + 4 files changed, 4 insertions(+) diff --git a/buffer/BUILD.bazel b/buffer/BUILD.bazel index a658f7e..aad580e 100644 --- a/buffer/BUILD.bazel +++ b/buffer/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "bytes_buffer.go", "pool.go", ], + importpath = "github.com/freshdresch/go-data-structures/buffer", visibility = ["//visibility:public"], ) diff --git a/maps/BUILD.bazel b/maps/BUILD.bazel index fa2dc2d..44e5534 100644 --- a/maps/BUILD.bazel +++ b/maps/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "maps", srcs = ["maps.go"], + importpath = "github.com/freshdresch/go-data-structures/maps", visibility = ["//visibility:public"], ) diff --git a/set/BUILD.bazel b/set/BUILD.bazel index a991d4b..b5bfb56 100644 --- a/set/BUILD.bazel +++ b/set/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "set", srcs = ["set.go"], + importpath = "github.com/freshdresch/go-data-structures/set", visibility = ["//visibility:public"], ) diff --git a/slices/BUILD.bazel b/slices/BUILD.bazel index fbbfa99..037cc57 100644 --- a/slices/BUILD.bazel +++ b/slices/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "slices", srcs = ["slices.go"], + importpath = "github.com/freshdresch/go-data-structures/slices", visibility = ["//visibility:public"], )