diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e17b71..2a94727 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v7 with: - version: v2.6 + version: v2.7 - name: install goveralls run: | diff --git a/.golangci.yml b/.golangci.yml index 042c61b..51e6345 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -28,6 +28,7 @@ linters: - unparam - unused - nestif + - modernize settings: goconst: min-len: 2 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..fbdf449 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,173 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Build and Test Commands + +```bash +# run all tests with race detection and coverage +go test -timeout=60s -race -covermode=atomic -coverprofile=profile.cov ./... + +# run a single test +go test -run TestName + +# run tests with verbose output +go test -v ./... + +# run benchmarks +go test -bench=. -run=Bench + +# lint (required for CI) +golangci-lint run + +# format code +gofmt -s -w . +goimports -w . +``` + +## Architecture Overview + +Generic worker pool library for Go 1.24+ with batching, work distribution, and metrics. + +### Package Structure + +- **pool.go** - Core `WorkerGroup[T]` with builder-pattern configuration +- **collector.go** - `Collector[V]` for async result gathering +- **metrics/** - Stats tracking (processed, errors, timings, custom counters) +- **middleware/** - Built-in middleware (Retry, Timeout, Recovery, Validator, RateLimiter) +- **examples/** - Usage examples (separate go.mod, excluded from tests) + +### Core Types + +```go +// Worker interface - implement Do() or use WorkerFunc adapter +type Worker[T any] interface { + Do(ctx context.Context, v T) error +} + +// WorkerFunc adapter for functions +type WorkerFunc[T any] func(ctx context.Context, v T) error + +// Middleware wraps worker +type Middleware[T any] func(Worker[T]) Worker[T] +``` + +### Pool Modes + +1. **Stateless** (`New[T](size, worker)`) - Single shared worker for all goroutines +2. **Stateful** (`NewStateful[T](size, maker)`) - Each goroutine gets own worker via maker function + +### Channel Architecture + +WorkerGroup maintains two sets of channels: + +**Shared channels** (used when no `WithChunkFn`): +- `sharedCh chan T` - all workers compete for items +- `sharedBatchCh chan []T` - for batch mode + +**Per-worker channels** (used with `WithChunkFn`): +- `workersCh []chan T` - dedicated channel per worker +- `workerBatchCh []chan []T` - batch channel per worker + +Channel selection in `Go()`: +```go +workerCh := p.sharedCh +batchCh := p.sharedBatchCh +if p.chunkFn != nil { + workerCh = p.workersCh[i] + batchCh = p.workerBatchCh[i] +} +``` + +**Important**: `WithWorkerChanSize()` recreates ALL channels (both shared and per-worker). + +### Work Distribution + +**Direct mode (batchSize=0):** +- Without `WithChunkFn`: Shared channel - workers compete for items +- With `WithChunkFn`: Per-worker channels with FNV-1a hash routing + +**Batching mode (batchSize>0):** +- Without `WithChunkFn`: Random accumulator slot selection via `rand.Intn(poolSize)` +- With `WithChunkFn`: Consistent hashing using FNV-1a hash: +```go +h := fnv.New32a() +h.Write([]byte(p.chunkFn(v))) +id := int(h.Sum32()) % p.poolSize +``` + +### Batching + +When `batchSize > 0`: +1. Items accumulate in `accumulators[id]` (one per worker slot) +2. When accumulator reaches `batchSize`, batch sent to channel +3. `Close()` flushes remaining partial batches + +### Lifecycle + +1. Create pool: `New[T]()` or `NewStateful[T]()` +2. Configure: chain `With*()` methods (must be before `Go()`) +3. Start: `Go(ctx)` - starts worker goroutines via errgroup +4. Submit: `Submit(v)` (single producer) or `Send(v)` (concurrent-safe) +5. Finish: `Close(ctx)` - flushes batches, closes channels, waits for workers + +### Metrics System + +`metrics.Value` tracks per-worker stats without locks (each worker writes to own slot): +- `workerStats []Stats` - per-worker counters (Processed, Errors, Dropped, timings) +- `userData map[string]int` - custom counters (mutex-protected) + +Workers access metrics via context: +```go +m := metrics.Get(ctx) +m.Inc("custom-counter") +``` + +Timer types: `TimerProc`, `TimerWait`, `TimerInit`, `TimerWrap` + +### Middleware Pattern + +Middleware applied in reverse order (first = outermost): +```go +// For stateless: wraps p.worker directly +// For stateful: wraps the maker function +for i := len(middlewares) - 1; i >= 0; i-- { + wrapped = middlewares[i](wrapped) +} +``` + +Built-in middleware in `middleware/`: +- `Retry(attempts, baseDelay)` - exponential backoff with jitter +- `Timeout(duration)` - per-operation timeout +- `Recovery(handler)` - panic recovery +- `Validator(fn)` - input validation +- `RateLimiter(rate, burst)` - token bucket rate limiting (global, not per-worker) + +### Collector + +Simple async-to-sync bridge: +```go +type Collector[V any] struct { + ch chan V + ctx context.Context +} +``` + +- `Submit(v)` - blocks if buffer full +- `Iter()` - returns `iter.Seq2[V, error]` (Go 1.23 range-over-func) +- `All()` - collects all into slice + +### Error Handling + +- Default: first error stops pool (errgroup behavior) +- `WithContinueOnError()`: accumulates errors, returns last error with total count +- Worker completion callbacks only run if no error or `continueOnError` is set +- Pool completion callback runs when last worker finishes (skipped only on `context.Canceled`, still runs on `context.DeadlineExceeded`) + +### Key Implementation Details + +1. `activated` flag prevents double `Go()` calls +2. `activeWorkers` atomic counter tracks live workers for pool completion callback +3. `sendMu` mutex makes `Send()` concurrent-safe (wraps `Submit()`) +4. errgroup manages worker goroutines and error propagation +5. Context from errgroup used for cancellation propagation diff --git a/README.md b/README.md index e195741..2c16031 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # pool [![Build Status](https://github.com/go-pkgz/pool/workflows/build/badge.svg)](https://github.com/go-pkgz/pool/actions) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/pool/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/pool?branch=master) [![godoc](https://godoc.org/github.com/go-pkgz/pool?status.svg)](https://godoc.org/github.com/go-pkgz/pool) -`pool` is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.23+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection. +`pool` is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.24+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection. ## Features @@ -16,7 +16,7 @@ - Optional completion callbacks - Extensible middleware system for custom functionality - Built-in middlewares for common tasks -- No external dependencies except for the testing framework +- Minimal dependencies (only `golang.org/x/sync` and `golang.org/x/time` at runtime) ## Quick Start @@ -52,8 +52,8 @@ func main() { return nil }) - // create a pool with 5 workers - p := pool.New[string](5, worker).WithContinueOnError(), // don't stop on errors + // create a pool with 5 workers + p := pool.New[string](5, worker).WithContinueOnError() // don't stop on errors // start the pool if err := p.Go(context.Background()); err != nil { @@ -233,13 +233,14 @@ p := pool.New[Task](3, worker).WithChunkFn(func(t Task) string { How distribution works: 1. Without chunk function: - - Items are distributed randomly among workers + - Direct mode: Items go to a shared channel, workers compete for items + - Batching mode: Items are assigned to random accumulator slots - Good for independent tasks 2. With chunk function: - Function returns string key for each item - Items with the same key always go to the same worker - - Uses consistent hashing to map keys to workers + - Uses FNV-1a hash to map keys to workers deterministically When to use custom distribution: - Maintain ordering for related items @@ -425,7 +426,7 @@ The `GetStats()` method returns a comprehensive `Stats` structure with the follo **Counters:** - `Processed` - total number of successfully processed items - `Errors` - total number of items that returned errors -- `Dropped` - total number of items dropped (e.g., due to context cancellation) +- `Dropped` - total number of items dropped (user-incrementable via `m.IncDropped(workerID)` where `m` is the metrics object) **Timing Metrics:** - `ProcessingTime` - cumulative time spent processing items (max across workers) @@ -436,7 +437,7 @@ The `GetStats()` method returns a comprehensive `Stats` structure with the follo **Derived Statistics:** - `RatePerSec` - items processed per second (Processed / TotalTime) -- `AvgLatency` - average processing time per item (ProcessingTime / Processed) +- `AvgLatency` - average wall-clock time per item (max ProcessingTime across workers / Processed) - `ErrorRate` - percentage of items that failed (Errors / Total) - `DroppedRate` - percentage of items dropped (Dropped / Total) - `Utilization` - percentage of time spent processing vs waiting (ProcessingTime / (ProcessingTime + WaitTime)) @@ -523,7 +524,7 @@ p := pool.New[string](5, worker). The completion callback executes when: - All workers have completed processing - Errors occurred but pool continued (`WithContinueOnError()`) -- Does not execute on context cancellation +- Skipped only on `context.Canceled` (still runs on `context.DeadlineExceeded`) Important notes: - Use `Submit` when sending items from a single goroutine @@ -538,21 +539,21 @@ configuration and must be called before `Go()`; calling them after `Go()` is unsupported and may lead to deadlocks or no-ops. ```go -p := pool.New[string](2, worker). // pool with 2 workers - WithBatchSize(10). // process items in batches - WithWorkerChanSize(5). // set worker channel buffer size - WithChunkFn(chunkFn). // control work distribution - WithContinueOnError(). // don't stop on errors - WithCompleteFn(completeFn) // called when worker finishes +p := pool.New[string](2, worker). // pool with 2 workers + WithBatchSize(10). // process items in batches + WithWorkerChanSize(5). // set worker channel buffer size + WithChunkFn(chunkFn). // control work distribution + WithContinueOnError(). // don't stop on errors + WithWorkerCompleteFn(workerComplete) // called when each worker finishes ``` Available options: - `WithBatchSize(size int)` - enables batch processing, accumulating items before sending to workers (default: 10) - `WithWorkerChanSize(size int)` - sets buffer size for worker channels (default: 1) -- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, random distribution) +- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, shared channel) - `WithContinueOnError()` - continues processing on errors (default: false) -- `WithWorkerCompleteFn(fn func(ctx, id, worker))` - called on worker completion (default: none) -- `WithPoolCompleteFn(fn func(ctx))` - called on pool completion, i.e., when all workers have completed (default: none) +- `WithWorkerCompleteFn(fn func(ctx context.Context, id int, worker Worker[T]) error)` - called on worker completion (default: none) +- `WithPoolCompleteFn(fn func(ctx context.Context) error)` - called on pool completion, i.e., when all workers have completed (default: none) ## Collector diff --git a/bench_test.go b/bench_test.go index 9cad26a..3100678 100644 --- a/bench_test.go +++ b/bench_test.go @@ -18,13 +18,13 @@ import ( func benchTask(size int) []int { //nolint:unparam // size is used in the benchmark task := func(n int) int { // simulate some CPU work sum := 0 - for i := 0; i < n; i++ { + for i := range n { sum += i } return sum } res := make([]int, 0, size) - for i := 0; i < size; i++ { + for range size { res = append(res, task(1)) } return res @@ -44,7 +44,7 @@ func TestPoolPerf(t *testing.T) { }() g, _ := errgroup.WithContext(ctx) g.SetLimit(8) - for i := 0; i < 1000000; i++ { + for range 1000000 { g.Go(func() error { benchTask(n) atomic.AddInt32(&count2, 1) @@ -68,7 +68,7 @@ func TestPoolPerf(t *testing.T) { p := New[int](8, worker) require.NoError(t, p.Go(ctx)) go func() { - for i := 0; i < 1000000; i++ { + for i := range 1000000 { p.Submit(i) } assert.NoError(t, p.Close(ctx)) @@ -92,7 +92,7 @@ func TestPoolPerf(t *testing.T) { p := New[int](8, worker).WithWorkerChanSize(100) require.NoError(t, p.Go(ctx)) go func() { - for i := 0; i < 1000000; i++ { + for i := range 1000000 { p.Submit(i) } assert.NoError(t, p.Close(ctx)) @@ -116,7 +116,7 @@ func TestPoolPerf(t *testing.T) { p := New[int](8, worker).WithWorkerChanSize(100).WithBatchSize(100) require.NoError(t, p.Go(ctx)) go func() { - for i := 0; i < 1000000; i++ { + for i := range 1000000 { p.Submit(i) } assert.NoError(t, p.Close(ctx)) @@ -142,7 +142,7 @@ func TestPoolPerf(t *testing.T) { }) require.NoError(t, p.Go(ctx)) go func() { - for i := 0; i < 1000000; i++ { + for i := range 1000000 { p.Submit(i) } assert.NoError(t, p.Close(ctx)) @@ -163,12 +163,12 @@ func BenchmarkPoolCompare(b *testing.B) { b.Run("errgroup", func(b *testing.B) { b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { var count int32 g, _ := errgroup.WithContext(ctx) g.SetLimit(workers) - for j := 0; j < iterations; j++ { + for range iterations { g.Go(func() error { benchTask(n) atomic.AddInt32(&count, 1) @@ -182,7 +182,7 @@ func BenchmarkPoolCompare(b *testing.B) { b.Run("pool default", func(b *testing.B) { b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { var count int32 p := New[int](workers, WorkerFunc[int](func(context.Context, int) error { benchTask(n) @@ -192,7 +192,7 @@ func BenchmarkPoolCompare(b *testing.B) { require.NoError(b, p.Go(ctx)) go func() { - for j := 0; j < iterations; j++ { + for j := range iterations { p.Submit(j) } p.Close(ctx) @@ -204,7 +204,7 @@ func BenchmarkPoolCompare(b *testing.B) { b.Run("pool with chan=100", func(b *testing.B) { b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { var count int32 p := New[int](workers, WorkerFunc[int](func(context.Context, int) error { benchTask(n) @@ -214,7 +214,7 @@ func BenchmarkPoolCompare(b *testing.B) { require.NoError(b, p.Go(ctx)) go func() { - for j := 0; j < iterations; j++ { + for j := range iterations { p.Submit(j) } p.Close(ctx) @@ -226,7 +226,7 @@ func BenchmarkPoolCompare(b *testing.B) { b.Run("pool with batching", func(b *testing.B) { b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { var count int32 p := New[int](workers, WorkerFunc[int](func(context.Context, int) error { benchTask(n) @@ -236,7 +236,7 @@ func BenchmarkPoolCompare(b *testing.B) { require.NoError(b, p.Go(ctx)) go func() { - for j := 0; j < iterations; j++ { + for j := range iterations { p.Submit(j) } p.Close(ctx) @@ -248,7 +248,7 @@ func BenchmarkPoolCompare(b *testing.B) { b.Run("pool with batching and chunking", func(b *testing.B) { b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { var count int32 p := New[int](workers, WorkerFunc[int](func(context.Context, int) error { benchTask(n) @@ -260,7 +260,7 @@ func BenchmarkPoolCompare(b *testing.B) { require.NoError(b, p.Go(ctx)) go func() { - for j := 0; j < iterations; j++ { + for j := range iterations { p.Submit(j) } p.Close(ctx) @@ -303,7 +303,7 @@ func TestPoolWithProfiling(t *testing.T) { done := make(chan struct{}) go func() { - for i := 0; i < iterations; i++ { + for i := range iterations { p.Submit(i) } p.Close(ctx) diff --git a/examples/collector_errors/go.mod b/examples/collector_errors/go.mod index dec2570..a10ac89 100644 --- a/examples/collector_errors/go.mod +++ b/examples/collector_errors/go.mod @@ -1,9 +1,11 @@ module examples/collector_errors -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.8.0 -require golang.org/x/sync v0.15.0 // indirect +require golang.org/x/sync v0.19.0 // indirect replace github.com/go-pkgz/pool => ../.. diff --git a/examples/collector_errors/go.sum b/examples/collector_errors/go.sum index bca33c8..8f48f08 100644 --- a/examples/collector_errors/go.sum +++ b/examples/collector_errors/go.sum @@ -8,5 +8,6 @@ golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/collectors_chain/go.mod b/examples/collectors_chain/go.mod index aff3b58..c558139 100644 --- a/examples/collectors_chain/go.mod +++ b/examples/collectors_chain/go.mod @@ -1,9 +1,11 @@ module examples/collectors_chain -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.8.0 -require golang.org/x/sync v0.15.0 // indirect +require golang.org/x/sync v0.19.0 // indirect replace github.com/go-pkgz/pool => ../.. diff --git a/examples/collectors_chain/go.sum b/examples/collectors_chain/go.sum index bdcecbc..c33ea5f 100644 --- a/examples/collectors_chain/go.sum +++ b/examples/collectors_chain/go.sum @@ -10,5 +10,6 @@ golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/direct_chain/go.mod b/examples/direct_chain/go.mod index 4a1ad6a..ae58282 100644 --- a/examples/direct_chain/go.mod +++ b/examples/direct_chain/go.mod @@ -1,9 +1,11 @@ module examples/direct_chain -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.7.0 -require golang.org/x/sync v0.15.0 // indirect +require golang.org/x/sync v0.19.0 // indirect replace github.com/go-pkgz/pool => ../.. diff --git a/examples/direct_chain/go.sum b/examples/direct_chain/go.sum index bdcecbc..c33ea5f 100644 --- a/examples/direct_chain/go.sum +++ b/examples/direct_chain/go.sum @@ -10,5 +10,6 @@ golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/middleware/go.mod b/examples/middleware/go.mod index 526c704..e20d3fb 100644 --- a/examples/middleware/go.mod +++ b/examples/middleware/go.mod @@ -1,12 +1,14 @@ module examples/middleware -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.7.0 require ( - golang.org/x/sync v0.15.0 // indirect - golang.org/x/time v0.12.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/time v0.14.0 // indirect ) replace github.com/go-pkgz/pool => ../.. diff --git a/examples/middleware/go.sum b/examples/middleware/go.sum index c569ec8..aecd43f 100644 --- a/examples/middleware/go.sum +++ b/examples/middleware/go.sum @@ -7,8 +7,10 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/parallel_files/go.mod b/examples/parallel_files/go.mod index 7fe5bb6..3893956 100644 --- a/examples/parallel_files/go.mod +++ b/examples/parallel_files/go.mod @@ -1,9 +1,11 @@ module examples/parallel_files -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.8.0 -require golang.org/x/sync v0.15.0 // indirect +require golang.org/x/sync v0.19.0 // indirect replace github.com/go-pkgz/pool => ../.. diff --git a/examples/parallel_files/go.sum b/examples/parallel_files/go.sum index bca33c8..8f48f08 100644 --- a/examples/parallel_files/go.sum +++ b/examples/parallel_files/go.sum @@ -8,5 +8,6 @@ golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/tokenizer_stateful/go.mod b/examples/tokenizer_stateful/go.mod index 11d8e70..53211a6 100644 --- a/examples/tokenizer_stateful/go.mod +++ b/examples/tokenizer_stateful/go.mod @@ -1,9 +1,11 @@ module examples/tokenizer_stateful -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.8.0 -require golang.org/x/sync v0.15.0 // indirect +require golang.org/x/sync v0.19.0 // indirect replace github.com/go-pkgz/pool => ../.. diff --git a/examples/tokenizer_stateful/go.sum b/examples/tokenizer_stateful/go.sum index f3027b0..1fd44af 100644 --- a/examples/tokenizer_stateful/go.sum +++ b/examples/tokenizer_stateful/go.sum @@ -10,5 +10,6 @@ golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/tokenizer_stateless/go.mod b/examples/tokenizer_stateless/go.mod index 9ebd327..36597a8 100644 --- a/examples/tokenizer_stateless/go.mod +++ b/examples/tokenizer_stateless/go.mod @@ -1,9 +1,11 @@ module examples/tokenizer_stateless -go 1.24 +go 1.24.0 + +toolchain go1.24.6 require github.com/go-pkgz/pool v0.8.0 -require golang.org/x/sync v0.15.0 // indirect +require golang.org/x/sync v0.19.0 // indirect replace github.com/go-pkgz/pool => ../.. diff --git a/examples/tokenizer_stateless/go.sum b/examples/tokenizer_stateless/go.sum index f3027b0..1fd44af 100644 --- a/examples/tokenizer_stateless/go.sum +++ b/examples/tokenizer_stateless/go.sum @@ -10,5 +10,6 @@ golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples_test.go b/examples_test.go index 78c9f22..68c9eb2 100644 --- a/examples_test.go +++ b/examples_test.go @@ -260,7 +260,7 @@ func Example_fibCalculator() { // calculate fibonacci number var a, b uint64 = 0, 1 - for i := 0; i < n; i++ { + for range n { a, b = b, a+b } @@ -311,7 +311,7 @@ func Example_chainedCalculation() { fibWorker := WorkerFunc[int](func(_ context.Context, n int) error { var a, b uint64 = 0, 1 - for i := 0; i < n; i++ { + for range n { a, b = b, a+b } stage1Collector.Submit(FibResult{n: n, fib: a}) @@ -454,7 +454,7 @@ func Example_middleware() { return func(next Worker[string]) Worker[string] { return WorkerFunc[string](func(ctx context.Context, v string) error { var lastErr error - for i := 0; i < attempts; i++ { + for i := range attempts { var err error if err = next.Do(ctx, v); err == nil { return nil diff --git a/go.mod b/go.mod index 3ab905e..0045912 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module github.com/go-pkgz/pool -go 1.24 +go 1.24.0 require ( - github.com/stretchr/testify v1.10.0 - golang.org/x/sync v0.15.0 - golang.org/x/time v0.12.0 + github.com/stretchr/testify v1.11.1 + golang.org/x/sync v0.19.0 + golang.org/x/time v0.14.0 ) require ( diff --git a/go.sum b/go.sum index f29dcbe..646197b 100644 --- a/go.sum +++ b/go.sum @@ -2,12 +2,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= -golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 9442b2f..bd9be11 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -164,10 +164,10 @@ func TestMetrics_Concurrent(t *testing.T) { var wg sync.WaitGroup wg.Add(goroutines) - for i := 0; i < goroutines; i++ { + for range goroutines { go func() { defer wg.Done() - for j := 0; j < iterations; j++ { + for range iterations { m.Inc("counter") val := m.Get("counter") assert.Positive(t, val) @@ -188,12 +188,12 @@ func TestMetrics_Concurrent(t *testing.T) { wg.Add(workers) // each worker operates on its own stats - for wid := 0; wid < workers; wid++ { + for wid := range workers { go func(id int) { defer wg.Done() const iterations = 1000 - for j := 0; j < iterations; j++ { + for range iterations { m.IncProcessed(id) end := m.StartTimer(id, TimerProc) time.Sleep(time.Microsecond) @@ -208,7 +208,7 @@ func TestMetrics_Concurrent(t *testing.T) { assert.Greater(t, stats.ProcessingTime, time.Duration(0)) // verify each worker's stats are accurate - for wid := 0; wid < workers; wid++ { + for wid := range workers { assert.Equal(t, 1000, m.workerStats[wid].Processed) assert.Greater(t, m.workerStats[wid].ProcessingTime, time.Duration(0)) } diff --git a/middleware/middleware.go b/middleware/middleware.go index a42d53c..36dcf8a 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -73,7 +73,7 @@ func Timeout[T any](timeout time.Duration) pool.Middleware[T] { // Recovery returns a middleware that recovers from panics and converts them to errors. // If handler is provided, it will be called with the panic value before the error is returned. -func Recovery[T any](handler func(interface{})) pool.Middleware[T] { +func Recovery[T any](handler func(any)) pool.Middleware[T] { return func(next pool.Worker[T]) pool.Worker[T] { return pool.WorkerFunc[T](func(ctx context.Context, v T) (err error) { defer func() { diff --git a/middleware/middleware_test.go b/middleware/middleware_test.go index 52afc93..134c5f4 100644 --- a/middleware/middleware_test.go +++ b/middleware/middleware_test.go @@ -101,7 +101,7 @@ func TestTimeout(t *testing.T) { func TestRecover(t *testing.T) { t.Run("recovers from panic", func(t *testing.T) { - var recovered interface{} + var recovered any worker := pool.WorkerFunc[string](func(_ context.Context, v string) error { if v == "panic" { panic("test panic") @@ -109,7 +109,7 @@ func TestRecover(t *testing.T) { return nil }) - p := pool.New[string](1, worker).Use(Recovery[string](func(p interface{}) { + p := pool.New[string](1, worker).Use(Recovery[string](func(p any) { recovered = p })) require.NoError(t, p.Go(context.Background())) @@ -126,8 +126,8 @@ func TestRecover(t *testing.T) { return nil }) - var recovered interface{} - p := pool.New[string](1, worker).Use(Recovery[string](func(p interface{}) { + var recovered any + p := pool.New[string](1, worker).Use(Recovery[string](func(p any) { recovered = p })) require.NoError(t, p.Go(context.Background())) @@ -214,7 +214,7 @@ func TestRateLimiter(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit 5 tasks - should all process immediately due to burst - for i := 0; i < 5; i++ { + for range 5 { p.Submit("task") } @@ -240,7 +240,7 @@ func TestRateLimiter(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit 3 tasks - for i := 0; i < 3; i++ { + for i := range 3 { p.Submit(fmt.Sprintf("task-%d", i)) } @@ -276,7 +276,7 @@ func TestRateLimiter(t *testing.T) { require.NoError(t, p.Go(ctx)) // submit multiple tasks - for i := 0; i < 5; i++ { + for range 5 { p.Submit("task") } @@ -320,7 +320,7 @@ func TestRateLimiter(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit 4 tasks - for i := 0; i < 4; i++ { + for i := range 4 { p.Submit(fmt.Sprintf("task-%d", i)) } diff --git a/pool.go b/pool.go index d82aa68..02bfd6c 100644 --- a/pool.go +++ b/pool.go @@ -133,10 +133,7 @@ func NewStateful[T any](size int, maker func() Worker[T]) *WorkerGroup[T] { // Larger sizes can help with bursty workloads but increase memory usage. // Default: 1 func (p *WorkerGroup[T]) WithWorkerChanSize(size int) *WorkerGroup[T] { - p.workerChanSize = size - if size < 1 { - p.workerChanSize = 1 - } + p.workerChanSize = max(size, 1) // recreate per-worker channels with new size for i := range p.poolSize { diff --git a/pool_test.go b/pool_test.go index 6ce785c..d522bf0 100644 --- a/pool_test.go +++ b/pool_test.go @@ -57,7 +57,7 @@ func TestPool_ChunkDistribution(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit same value multiple times, should always go to same worker - for i := 0; i < 10; i++ { + for range 10 { p.Submit("test1") } require.NoError(t, p.Close(context.Background())) @@ -154,7 +154,7 @@ func TestPool_StatefulWorker(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit more items to increase chance of concurrent processing - for i := 0; i < 100; i++ { + for range 100 { p.Submit("test") } assert.NoError(t, p.Close(context.Background())) @@ -287,7 +287,7 @@ func TestPool_Distribution(t *testing.T) { require.NoError(t, p.Go(context.Background())) const n = 10000 - for i := 0; i < n; i++ { + for i := range n { p.Submit(i) } require.NoError(t, p.Close(context.Background())) @@ -315,7 +315,7 @@ func TestPool_Distribution(t *testing.T) { require.NoError(t, p.Go(context.Background())) const n = 10000 - for i := 0; i < n; i++ { + for i := range n { p.Submit(i) } require.NoError(t, p.Close(context.Background())) @@ -339,7 +339,7 @@ func TestPool_Metrics(t *testing.T) { p := New[int](2, worker) require.NoError(t, p.Go(context.Background())) - for i := 0; i < 10; i++ { + for i := range 10 { p.Submit(i) } require.NoError(t, p.Close(context.Background())) @@ -365,7 +365,7 @@ func TestPool_Metrics(t *testing.T) { p := New[int](2, worker).WithContinueOnError() require.NoError(t, p.Go(context.Background())) - for i := 0; i < 10; i++ { + for i := range 10 { p.Submit(i) } require.Error(t, p.Close(context.Background())) @@ -394,7 +394,7 @@ func TestPool_Metrics(t *testing.T) { p.Submit(2) // wait for both items to be processed - for i := 0; i < 2; i++ { + for range 2 { select { case <-processed: case <-time.After(time.Second): @@ -433,7 +433,7 @@ func TestPool_Metrics(t *testing.T) { // submit enough items to ensure both workers get some n := 100 - for i := 0; i < n; i++ { + for i := range n { p.Submit(i) } require.Error(t, p.Close(context.Background())) @@ -688,12 +688,12 @@ func TestPool_TimingUnderLoad(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit all tasks - for i := 0; i < tasks; i++ { + for i := range tasks { p.Submit(i) } // wait for all tasks to complete - for i := 0; i < tasks; i++ { + for range tasks { select { case <-done: case <-time.After(2 * time.Second): @@ -910,7 +910,7 @@ func TestMiddleware_Practical(t *testing.T) { return func(next Worker[string]) Worker[string] { return WorkerFunc[string](func(ctx context.Context, v string) error { var lastErr error - for i := 0; i < maxAttempts; i++ { + for range maxAttempts { var err error if err = next.Do(ctx, v); err == nil { return nil @@ -989,7 +989,7 @@ func TestPool_Batch(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit 8 items - should make 2 full batches and 1 partial - for i := 0; i < 8; i++ { + for i := range 8 { p.Submit(fmt.Sprintf("v%d", i)) } require.NoError(t, p.Close(context.Background())) @@ -1139,7 +1139,7 @@ func TestPool_Batch(t *testing.T) { require.NoError(t, p.Go(context.Background())) // fill batches with items to verify processing of full batches - for i := 0; i < 6; i++ { + for i := range 6 { p.Submit(fmt.Sprintf("item%d", i)) time.Sleep(10 * time.Millisecond) // allow time for processing } @@ -1229,7 +1229,7 @@ func TestPool_Batch(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit items - for i := 0; i < totalItems; i++ { + for i := range totalItems { p.Submit(i) } require.NoError(t, p.Close(context.Background())) @@ -1242,7 +1242,7 @@ func TestPool_Batch(t *testing.T) { // verify concurrent processing by checking for overlapping time ranges var overlapped bool - for i := 0; i < totalItems/batchSize; i++ { + for i := range totalItems / batchSize { for j := i + 1; j < totalItems/batchSize; j++ { // check if batch i and j overlapped in time if !batchEndTimes[i].Before(batchStartTimes[j]) && !batchEndTimes[j].Before(batchStartTimes[i]) { @@ -1284,7 +1284,7 @@ func TestPool_DirectModeChunking(t *testing.T) { // verify items with same first letter went to the same worker var worker0Items, worker1Items []string - processed.Range(func(key, value interface{}) bool { + processed.Range(func(key, value any) bool { items := value.([]string) if key.(string) == "worker-0" { worker0Items = items @@ -1333,7 +1333,7 @@ func TestPool_PoolCompletion(t *testing.T) { require.NoError(t, p.Go(context.Background())) // submit enough work for all workers - for i := 0; i < 3; i++ { + for i := range 3 { p.Submit(fmt.Sprintf("test%d", i)) } require.NoError(t, p.Close(context.Background())) @@ -1511,7 +1511,7 @@ func TestPool_ChainedBatching(t *testing.T) { // submit items inputCount := 100 - for i := 0; i < inputCount; i++ { + for i := range inputCount { p1.Submit(i) } require.NoError(t, p1.Close(context.Background())) @@ -1542,7 +1542,7 @@ func TestPool_HeavyBatching(t *testing.T) { // submit items that should form complete and partial batches const items = 1000 - for i := 0; i < items; i++ { + for i := range items { p.Submit(i) } require.NoError(t, p.Close(context.Background())) @@ -1584,14 +1584,14 @@ func TestPool_BatchedSend(t *testing.T) { require.NoError(t, p2.Go(context.Background())) const items = 1000 - for i := 0; i < items; i++ { + for i := range items { p1.Submit(i) } require.NoError(t, p1.Close(context.Background())) // count unique processed items var uniqueProcessed int - processed.Range(func(_, _ interface{}) bool { + processed.Range(func(_, _ any) bool { uniqueProcessed++ return true })