diff --git a/buffer/BUILD.bazel b/buffer/BUILD.bazel index a658f7e..aad580e 100644 --- a/buffer/BUILD.bazel +++ b/buffer/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "bytes_buffer.go", "pool.go", ], + importpath = "github.com/freshdresch/go-data-structures/buffer", visibility = ["//visibility:public"], ) diff --git a/maps/BUILD.bazel b/maps/BUILD.bazel index fa2dc2d..44e5534 100644 --- a/maps/BUILD.bazel +++ b/maps/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "maps", srcs = ["maps.go"], + importpath = "github.com/freshdresch/go-data-structures/maps", visibility = ["//visibility:public"], ) diff --git a/ring/BUILD.bazel b/ring/BUILD.bazel index 2911635..3e39010 100644 --- a/ring/BUILD.bazel +++ b/ring/BUILD.bazel @@ -3,12 +3,16 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ring", srcs = ["ring.go"], + importpath = "github.com/freshdresch/go-data-structures/ring", visibility = ["//visibility:public"], ) +# Internal tests (white-box) go_test( - name = "ring_test", - srcs = ["ring_test.go"], + name = "ring_internal_test", + srcs = [ + "ring_test.go", + ], embed = [":ring"], deps = [ "@com_github_stretchr_testify//assert:assert", @@ -16,3 +20,18 @@ go_test( ], visibility = ["//:__subpackages__"], ) + +# External benchmarks (black-box) +go_test( + name = "ring_bench", + srcs = [ + "channel_bench_test.go", + "common_bench_test.go", + "mutex_slice_bench_test.go", + "ring_bench_test.go", + ], + deps = [ + ":ring", + ], + visibility = ["//:__subpackages__"], +) diff --git a/ring/channel_bench_test.go b/ring/channel_bench_test.go new file mode 100644 index 0000000..f455548 --- /dev/null +++ b/ring/channel_bench_test.go @@ -0,0 +1,302 @@ +package ring_test + +import ( + "fmt" + "sync" + "testing" +) + +type ChanWrapper[T any] struct { + ch chan T +} + +func NewChanWrapper[T any](capacity int) *ChanWrapper[T] { + return &ChanWrapper[T]{ch: make(chan T, capacity)} +} + +func (c *ChanWrapper[T]) EnqueueBurst(batch []T) { + for _, v := range batch { + c.ch <- v + } +} + +func (c *ChanWrapper[T]) DequeueBurst(n int) []T { + res := make([]T, 0, n) + for i := 0; i < n; i++ { + select { + case v := <-c.ch: + res = append(res, v) + default: + return res + } + } + return res +} + +func BenchmarkChan_SPSC_Seq(b *testing.B) { + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ch <- payload + <-ch + } + }) + } +} + +func BenchmarkChan_MPSC_Parallel(b *testing.B) { + const producers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, 1024) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < b.N*producers; i++ { + <-ch + } + wg.Wait() + }) + } +} + +func BenchmarkChan_SPMC_Parallel(b *testing.B) { + const consumers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, 1024) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + ch <- payload + } + wg.Wait() + }) + } +} + +func BenchmarkChan_MPMC_Parallel(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + ch := make(chan []byte, 1024) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + prodWG.Wait() + consWG.Wait() + }) + } +} + +func BenchmarkChan_SPSC_Seq_Batch(b *testing.B) { + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ch <- payload + <-ch + } + }, + ) + } + } +} + +func BenchmarkChan_MPSC_Parallel_Batch(b *testing.B) { + const producers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < b.N*producers; i++ { + <-ch + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkChan_SPMC_Parallel_Batch(b *testing.B) { + const consumers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + ch <- payload + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkChan_MPMC_Parallel_Batch(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + ch := make(chan [][]byte, ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + ch <- payload + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + <-ch + } + }() + } + + prodWG.Wait() + consWG.Wait() + }, + ) + } + } +} diff --git a/ring/common_bench_test.go b/ring/common_bench_test.go new file mode 100644 index 0000000..c7fac04 --- /dev/null +++ b/ring/common_bench_test.go @@ -0,0 +1,8 @@ +package ring_test + +const ringSize = 1024 + +var ( + batchSizes = []int{1, 8, 16, 32, 64} + payloadSizes = []int{8, 32, 64, 256, 512} // bytes +) diff --git a/ring/mutex_slice_bench_test.go b/ring/mutex_slice_bench_test.go new file mode 100644 index 0000000..0e09ce3 --- /dev/null +++ b/ring/mutex_slice_bench_test.go @@ -0,0 +1,371 @@ +package ring_test + +import ( + "fmt" + "runtime" + "sync" + "testing" +) + +// A simple bounded FIFO implemented as a slice protected by a mutex. +// This represents the most common "roll-your-own" queue alternative. +type mutexSliceQueue[T any] struct { + mu sync.Mutex + buf []T + head int + tail int + mask int + used int +} + +func newMutexSliceQueue[T any](size int) *mutexSliceQueue[T] { + // size must be power of two for mask + return &mutexSliceQueue[T]{ + buf: make([]T, size), + mask: size - 1, + } +} + +func (q *mutexSliceQueue[T]) Enqueue(v T) { + for { + q.mu.Lock() + if q.used < len(q.buf)-1 { + q.buf[q.tail&q.mask] = v + q.tail++ + q.used++ + q.mu.Unlock() + return + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func (q *mutexSliceQueue[T]) Dequeue() T { + var zero T + for { + q.mu.Lock() + if q.used > 0 { + v := q.buf[q.head&q.mask] + q.buf[q.head&q.mask] = zero + q.head++ + q.used-- + q.mu.Unlock() + return v + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func (q *mutexSliceQueue[T]) EnqueueBatch(items []T) { + n := len(items) + for { + q.mu.Lock() + if q.used+n <= int(q.mask)+1 { + for i := 0; i < n; i++ { + baseIdx := q.tail & q.mask + q.buf[baseIdx] = items[i] + q.tail++ + } + q.used += n + q.mu.Unlock() + return + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func (q *mutexSliceQueue[T]) DequeueBatch(n int) []T { + var zero T + items := make([]T, 0, n) + + for { + q.mu.Lock() + if q.used > 0 { + if n > q.used { + n = q.used + } + for i := 0; i < n; i++ { + baseIdx := q.head & q.mask + items = append(items, q.buf[baseIdx]) + q.buf[baseIdx] = zero + q.head++ + } + q.used -= n + q.mu.Unlock() + return items + } + q.mu.Unlock() + runtime.Gosched() + } +} + +func BenchmarkMutexSlice_SPSC_Seq(b *testing.B) { + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + q.Enqueue(payload) + _ = q.Dequeue() + } + }) + } +} + +func BenchmarkMutexSlice_MPSC_Parallel(b *testing.B) { + const producers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + q.Enqueue(payload) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _ = q.Dequeue() + } + wg.Wait() + }) + } +} + +func BenchmarkMutexSlice_SPMC_Parallel(b *testing.B) { + const consumers = 4 + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = q.Dequeue() + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + q.Enqueue(payload) + } + wg.Wait() + }) + } +} + +func BenchmarkMutexSlice_MPMC_Parallel(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, size := range payloadSizes { + b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([]byte, size) + + b.SetBytes(int64(size)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + q.Enqueue(payload) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _ = q.Dequeue() + } + }() + } + + prodWG.Wait() + consWG.Wait() + }) + } +} + +func BenchmarkMutexSlice_SPSC_Seq_Batch(b *testing.B) { + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + q.EnqueueBatch(payload) + _ = q.DequeueBatch(batchSize) + } + }, + ) + } + } +} + +func BenchmarkMutexSlice_MPSC_Parallel_Batch(b *testing.B) { + const producers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + q.EnqueueBatch(payload) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _ = q.DequeueBatch(batchSize) + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkMutexSlice_SPMC_Parallel_Batch(b *testing.B) { + const consumers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = q.DequeueBatch(batchSize) + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + q.EnqueueBatch(payload) + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkMutexSlice_MPMC_Parallel_Batch(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + q := newMutexSliceQueue[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var prodWG, consWG sync.WaitGroup + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + q.EnqueueBatch(payload) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _ = q.DequeueBatch(batchSize) + } + }() + } + + prodWG.Wait() + consWG.Wait() + }, + ) + } + } +} diff --git a/ring/ring_bench_test.go b/ring/ring_bench_test.go new file mode 100644 index 0000000..b962007 --- /dev/null +++ b/ring/ring_bench_test.go @@ -0,0 +1,305 @@ +package ring_test + +import ( + "fmt" + "sync" + "testing" + + "github.com/freshdresch/go-data-structures/ring" +) + +func BenchmarkRing_SPSC_Seq(b *testing.B) { + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte](ringSize) + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = rb.Enqueue(p) + _, _ = rb.Dequeue() + } + }) + } +} + +func BenchmarkRing_SPMC_Parallel(b *testing.B) { + const consumers = 4 + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiConsDequeue[[]byte](), + ) + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.Dequeue() + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + _ = rb.Enqueue(p) + } + wg.Wait() + }) + } +} + +func BenchmarkRing_MPSC_Parallel(b *testing.B) { + const producers = 4 + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ) + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = rb.Enqueue(p) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _, _ = rb.Dequeue() + } + wg.Wait() + }) + } +} + +func BenchmarkRing_MPMC_Parallel(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, sz := range payloadSizes { + b.Run(fmt.Sprintf("%dB", sz), func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ring.WithMultiConsDequeue[[]byte](), + ) + + p := make([]byte, sz) + + b.SetBytes(int64(sz)) + b.ResetTimer() + + var ( + prodWG sync.WaitGroup + consWG sync.WaitGroup + ) + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + _ = rb.Enqueue(p) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.Dequeue() + } + }() + } + + prodWG.Wait() + consWG.Wait() + }) + } +} + +func BenchmarkRing_SPSC_Seq_Batch(b *testing.B) { + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte](ringSize) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + }, + ) + } + } +} + +func BenchmarkRing_SPMC_Parallel_Batch(b *testing.B) { + const consumers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiConsDequeue[[]byte](), + ) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(consumers) + + for i := 0; i < consumers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + }() + } + + for i := 0; i < b.N*consumers; i++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + } + wg.Wait() + }, + ) + } + } +} + +func BenchmarkRing_MPSC_Parallel_Batch(b *testing.B) { + const producers = 4 + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var wg sync.WaitGroup + wg.Add(producers) + + for i := 0; i < producers; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + } + }() + } + + for i := 0; i < b.N*producers; i++ { + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + + wg.Wait() + }, + ) + } + } +} + +func BenchmarkRing_MPMC_Parallel_Batch(b *testing.B) { + const ( + producers = 4 + consumers = 4 + ) + for _, batchSize := range batchSizes { + for _, payloadSize := range payloadSizes { + b.Run( + fmt.Sprintf("batch%d_%dB", batchSize, payloadSize), + func(b *testing.B) { + rb, _ := ring.NewRing[[]byte]( + ringSize, + ring.WithMultiProdEnqueue[[]byte](), + ring.WithMultiConsDequeue[[]byte](), + ) + payload := make([][]byte, batchSize) + for i := range payload { + payload[i] = make([]byte, payloadSize) + } + + b.SetBytes(int64(payloadSize)) + b.ResetTimer() + + var ( + prodWG sync.WaitGroup + consWG sync.WaitGroup + ) + prodWG.Add(producers) + consWG.Add(consumers) + + for i := 0; i < producers; i++ { + go func() { + defer prodWG.Done() + for j := 0; j < b.N; j++ { + _ = rb.EnqueueBurst(payload, uint32(batchSize), nil) + } + }() + } + + for i := 0; i < consumers; i++ { + go func() { + defer consWG.Done() + for j := 0; j < b.N; j++ { + _, _ = rb.DequeueBurst(uint32(batchSize), nil) + } + }() + } + + prodWG.Wait() + consWG.Wait() + }, + ) + } + } +} diff --git a/ring/ring_test.go b/ring/ring_test.go index 8d6e79a..a0a133d 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -474,219 +474,6 @@ func TestMPMC_OutOfOrderPublication(t *testing.T) { assert.Contains(t, set, 2) } -func BenchmarkSPSCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string](uint32(buflen)) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkSPSCRingBufferSeqBatch(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string](uint32(buflen)) - - item := "test" - batchSize := 10 - b.SetBytes(int64(len(item) * batchSize)) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - for j := 0; j < batchSize; j++ { - _ = buffer.Enqueue(item) - } - for j := 0; j < batchSize; j++ { - _, _ = buffer.Dequeue() - } - } -} - -func BenchmarkSPSCRingBufferContention(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string](uint32(buflen)) - - item := "test" - b.SetBytes(int64(len(item) * 2)) - b.ResetTimer() - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - } - }() - go func() { - defer wg.Done() - for i := 0; i < b.N; i++ { - _, _ = buffer.Dequeue() - } - }() - wg.Wait() -} - -func BenchmarkSPMCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiConsDequeue[string](), - ) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkSPMCRingBufferParallel(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiConsDequeue[string](), - ) - - numConsumers := 4 - item := "test" - b.SetBytes(int64(len(item) * numConsumers)) - b.ResetTimer() - - var wg sync.WaitGroup - wg.Add(numConsumers) - - for i := 0; i < numConsumers; i++ { - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - _, _ = buffer.Dequeue() - } - }() - } - - for i := 0; i < b.N*numConsumers; i++ { - _ = buffer.Enqueue(item) - } - wg.Wait() -} - -func BenchmarkMPSCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - ) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkMPSCRingBufferParallel(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - ) - - numProducers := 4 - item := "test" - b.SetBytes(int64(len(item) * numProducers)) - b.ResetTimer() - - var wg sync.WaitGroup - wg.Add(numProducers) - - for i := 0; i < numProducers; i++ { - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - _ = buffer.Enqueue(item) - } - }() - } - - for i := 0; i < b.N*numProducers; i++ { - _, _ = buffer.Dequeue() - } - wg.Wait() -} - -func BenchmarkMPMCRingBufferSeq(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - WithMultiConsDequeue[string](), - ) - - item := "test" - b.SetBytes(int64(len(item))) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _ = buffer.Enqueue(item) - _, _ = buffer.Dequeue() - } -} - -func BenchmarkMPMCRingBufferParallel(b *testing.B) { - buflen := 1024 - buffer, _ := NewRing[string]( - uint32(buflen), - WithMultiProdEnqueue[string](), - WithMultiConsDequeue[string](), - ) - - numProducers := 4 - numConsumers := 4 - item := "test" - b.SetBytes(int64(len(item) * numProducers)) - b.ResetTimer() - - var producerWG sync.WaitGroup - producerWG.Add(numProducers) - - for i := 0; i < numProducers; i++ { - go func() { - defer producerWG.Done() - for j := 0; j < b.N; j++ { - _ = buffer.Enqueue(item) - } - }() - } - - var consumerWG sync.WaitGroup - consumerWG.Add(numConsumers) - - for i := 0; i < numConsumers; i++ { - go func() { - defer consumerWG.Done() - for j := 0; j < b.N; j++ { - _, _ = buffer.Dequeue() - } - }() - } - - producerWG.Wait() - consumerWG.Wait() -} - func assertNewRing[T any](t *testing.T, ring *Ring[T], count uint32) { assert.NotNil(t, ring.entries) assert.Equal(t, ring.size, count) diff --git a/set/BUILD.bazel b/set/BUILD.bazel index a991d4b..b5bfb56 100644 --- a/set/BUILD.bazel +++ b/set/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "set", srcs = ["set.go"], + importpath = "github.com/freshdresch/go-data-structures/set", visibility = ["//visibility:public"], ) diff --git a/slices/BUILD.bazel b/slices/BUILD.bazel index fbbfa99..037cc57 100644 --- a/slices/BUILD.bazel +++ b/slices/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "slices", srcs = ["slices.go"], + importpath = "github.com/freshdresch/go-data-structures/slices", visibility = ["//visibility:public"], )