Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buffer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"bytes_buffer.go",
"pool.go",
],
importpath = "github.com/freshdresch/go-data-structures/buffer",
visibility = ["//visibility:public"],
)

Expand Down
1 change: 1 addition & 0 deletions maps/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "maps",
srcs = ["maps.go"],
importpath = "github.com/freshdresch/go-data-structures/maps",
visibility = ["//visibility:public"],
)

Expand Down
23 changes: 21 additions & 2 deletions ring/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,35 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "ring",
srcs = ["ring.go"],
importpath = "github.com/freshdresch/go-data-structures/ring",
visibility = ["//visibility:public"],
)

# Internal tests (white-box)
go_test(
name = "ring_test",
srcs = ["ring_test.go"],
name = "ring_internal_test",
srcs = [
"ring_test.go",
],
embed = [":ring"],
deps = [
"@com_github_stretchr_testify//assert:assert",
"@com_github_stretchr_testify//require:require",
],
visibility = ["//:__subpackages__"],
)

# External benchmarks (black-box)
go_test(
name = "ring_bench",
srcs = [
"channel_bench_test.go",
"common_bench_test.go",
"mutex_slice_bench_test.go",
"ring_bench_test.go",
],
deps = [
":ring",
],
visibility = ["//:__subpackages__"],
)
302 changes: 302 additions & 0 deletions ring/channel_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
package ring_test

import (
"fmt"
"sync"
"testing"
)

type ChanWrapper[T any] struct {
ch chan T
}

func NewChanWrapper[T any](capacity int) *ChanWrapper[T] {
return &ChanWrapper[T]{ch: make(chan T, capacity)}
}

func (c *ChanWrapper[T]) EnqueueBurst(batch []T) {
for _, v := range batch {
c.ch <- v
}
}

func (c *ChanWrapper[T]) DequeueBurst(n int) []T {
res := make([]T, 0, n)
for i := 0; i < n; i++ {
select {
case v := <-c.ch:
res = append(res, v)
default:
return res
}
}
return res
}
Comment on lines +9 to +34
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused ChanWrapper type.

The ChanWrapper type and its methods are defined but never used in any of the benchmarks below. All benchmarks use raw channels directly.

Apply this diff to remove the unused code:

-type ChanWrapper[T any] struct {
-	ch chan T
-}
-
-func NewChanWrapper[T any](capacity int) *ChanWrapper[T] {
-	return &ChanWrapper[T]{ch: make(chan T, capacity)}
-}
-
-func (c *ChanWrapper[T]) EnqueueBurst(batch []T) {
-	for _, v := range batch {
-		c.ch <- v
-	}
-}
-
-func (c *ChanWrapper[T]) DequeueBurst(n int) []T {
-	res := make([]T, 0, n)
-	for i := 0; i < n; i++ {
-		select {
-		case v := <-c.ch:
-			res = append(res, v)
-		default:
-			return res
-		}
-	}
-	return res
-}
-

Alternatively, if you intended to use ChanWrapper for testing, you would need to update the benchmarks to actually use it.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type ChanWrapper[T any] struct {
ch chan T
}
func NewChanWrapper[T any](capacity int) *ChanWrapper[T] {
return &ChanWrapper[T]{ch: make(chan T, capacity)}
}
func (c *ChanWrapper[T]) EnqueueBurst(batch []T) {
for _, v := range batch {
c.ch <- v
}
}
func (c *ChanWrapper[T]) DequeueBurst(n int) []T {
res := make([]T, 0, n)
for i := 0; i < n; i++ {
select {
case v := <-c.ch:
res = append(res, v)
default:
return res
}
}
return res
}
🤖 Prompt for AI Agents
In ring/channel_bench_test.go around lines 9 to 34, the ChanWrapper type and its
methods (NewChanWrapper, EnqueueBurst, DequeueBurst) are declared but never used
by the benchmarks; remove the entire ChanWrapper type and its associated methods
from that section to eliminate dead code, or if you intended to test the wrapper
instead of raw channels, update the benchmark functions to instantiate
ChanWrapper via NewChanWrapper and replace direct channel operations with
EnqueueBurst and DequeueBurst calls.


func BenchmarkChan_SPSC_Seq(b *testing.B) {
for _, size := range payloadSizes {
b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) {
ch := make(chan []byte, ringSize)
payload := make([]byte, size)

b.SetBytes(int64(size))
b.ResetTimer()

for i := 0; i < b.N; i++ {
ch <- payload
<-ch
}
})
}
}

func BenchmarkChan_MPSC_Parallel(b *testing.B) {
const producers = 4
for _, size := range payloadSizes {
b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) {
ch := make(chan []byte, 1024)
payload := make([]byte, size)

b.SetBytes(int64(size))
b.ResetTimer()

var wg sync.WaitGroup
wg.Add(producers)

for i := 0; i < producers; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
ch <- payload
}
}()
}

for i := 0; i < b.N*producers; i++ {
<-ch
}
wg.Wait()
})
}
}

func BenchmarkChan_SPMC_Parallel(b *testing.B) {
const consumers = 4
for _, size := range payloadSizes {
b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) {
ch := make(chan []byte, 1024)
payload := make([]byte, size)

b.SetBytes(int64(size))
b.ResetTimer()

var wg sync.WaitGroup
wg.Add(consumers)

for i := 0; i < consumers; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
<-ch
}
}()
}

for i := 0; i < b.N*consumers; i++ {
ch <- payload
}
wg.Wait()
})
}
}

func BenchmarkChan_MPMC_Parallel(b *testing.B) {
const (
producers = 4
consumers = 4
)
for _, size := range payloadSizes {
b.Run(fmt.Sprintf("%dB", size), func(b *testing.B) {
ch := make(chan []byte, 1024)
payload := make([]byte, size)

b.SetBytes(int64(size))
b.ResetTimer()

var prodWG, consWG sync.WaitGroup
prodWG.Add(producers)
consWG.Add(consumers)

for i := 0; i < producers; i++ {
go func() {
defer prodWG.Done()
for j := 0; j < b.N; j++ {
ch <- payload
}
}()
}

for i := 0; i < consumers; i++ {
go func() {
defer consWG.Done()
for j := 0; j < b.N; j++ {
<-ch
}
}()
}

prodWG.Wait()
consWG.Wait()
})
}
}

func BenchmarkChan_SPSC_Seq_Batch(b *testing.B) {
for _, batchSize := range batchSizes {
for _, payloadSize := range payloadSizes {
b.Run(
fmt.Sprintf("batch%d_%dB", batchSize, payloadSize),
func(b *testing.B) {
ch := make(chan [][]byte, ringSize)
payload := make([][]byte, batchSize)
for i := range payload {
payload[i] = make([]byte, payloadSize)
}

b.SetBytes(int64(payloadSize))
b.ResetTimer()

for i := 0; i < b.N; i++ {
ch <- payload
<-ch
}
},
)
}
}
}

func BenchmarkChan_MPSC_Parallel_Batch(b *testing.B) {
const producers = 4
for _, batchSize := range batchSizes {
for _, payloadSize := range payloadSizes {
b.Run(
fmt.Sprintf("batch%d_%dB", batchSize, payloadSize),
func(b *testing.B) {
ch := make(chan [][]byte, ringSize)
payload := make([][]byte, batchSize)
for i := range payload {
payload[i] = make([]byte, payloadSize)
}

b.SetBytes(int64(payloadSize))
b.ResetTimer()

var wg sync.WaitGroup
wg.Add(producers)

for i := 0; i < producers; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
ch <- payload
}
}()
}

for i := 0; i < b.N*producers; i++ {
<-ch
}
wg.Wait()
},
)
}
}
}

func BenchmarkChan_SPMC_Parallel_Batch(b *testing.B) {
const consumers = 4
for _, batchSize := range batchSizes {
for _, payloadSize := range payloadSizes {
b.Run(
fmt.Sprintf("batch%d_%dB", batchSize, payloadSize),
func(b *testing.B) {
ch := make(chan [][]byte, ringSize)
payload := make([][]byte, batchSize)
for i := range payload {
payload[i] = make([]byte, payloadSize)
}

b.SetBytes(int64(payloadSize))
b.ResetTimer()

var wg sync.WaitGroup
wg.Add(consumers)

for i := 0; i < consumers; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
<-ch
}
}()
}

for i := 0; i < b.N*consumers; i++ {
ch <- payload
}
wg.Wait()
},
)
}
}
}

func BenchmarkChan_MPMC_Parallel_Batch(b *testing.B) {
const (
producers = 4
consumers = 4
)
for _, batchSize := range batchSizes {
for _, payloadSize := range payloadSizes {
b.Run(
fmt.Sprintf("batch%d_%dB", batchSize, payloadSize),
func(b *testing.B) {
ch := make(chan [][]byte, ringSize)
payload := make([][]byte, batchSize)
for i := range payload {
payload[i] = make([]byte, payloadSize)
}

b.SetBytes(int64(payloadSize))
b.ResetTimer()

var prodWG, consWG sync.WaitGroup
prodWG.Add(producers)
consWG.Add(consumers)

for i := 0; i < producers; i++ {
go func() {
defer prodWG.Done()
for j := 0; j < b.N; j++ {
ch <- payload
}
}()
}

for i := 0; i < consumers; i++ {
go func() {
defer consWG.Done()
for j := 0; j < b.N; j++ {
<-ch
}
}()
}

prodWG.Wait()
consWG.Wait()
},
)
}
}
}
8 changes: 8 additions & 0 deletions ring/common_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ring_test

const ringSize = 1024

var (
batchSizes = []int{1, 8, 16, 32, 64}
payloadSizes = []int{8, 32, 64, 256, 512} // bytes
)
Loading