Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.6
version: v2.7

- name: install goveralls
run: |
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ linters:
- unparam
- unused
- nestif
- modernize
settings:
goconst:
min-len: 2
Expand Down
173 changes: 173 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Build and Test Commands

```bash
# run all tests with race detection and coverage
go test -timeout=60s -race -covermode=atomic -coverprofile=profile.cov ./...

# run a single test
go test -run TestName

# run tests with verbose output
go test -v ./...

# run benchmarks
go test -bench=. -run=Bench

# lint (required for CI)
golangci-lint run

# format code
gofmt -s -w .
goimports -w .
```

## Architecture Overview

Generic worker pool library for Go 1.24+ with batching, work distribution, and metrics.

### Package Structure

- **pool.go** - Core `WorkerGroup[T]` with builder-pattern configuration
- **collector.go** - `Collector[V]` for async result gathering
- **metrics/** - Stats tracking (processed, errors, timings, custom counters)
- **middleware/** - Built-in middleware (Retry, Timeout, Recovery, Validator, RateLimiter)
- **examples/** - Usage examples (separate go.mod, excluded from tests)

### Core Types

```go
// Worker interface - implement Do() or use WorkerFunc adapter
type Worker[T any] interface {
Do(ctx context.Context, v T) error
}

// WorkerFunc adapter for functions
type WorkerFunc[T any] func(ctx context.Context, v T) error

// Middleware wraps worker
type Middleware[T any] func(Worker[T]) Worker[T]
```

### Pool Modes

1. **Stateless** (`New[T](size, worker)`) - Single shared worker for all goroutines
2. **Stateful** (`NewStateful[T](size, maker)`) - Each goroutine gets own worker via maker function

### Channel Architecture

WorkerGroup maintains two sets of channels:

**Shared channels** (used when no `WithChunkFn`):
- `sharedCh chan T` - all workers compete for items
- `sharedBatchCh chan []T` - for batch mode

**Per-worker channels** (used with `WithChunkFn`):
- `workersCh []chan T` - dedicated channel per worker
- `workerBatchCh []chan []T` - batch channel per worker

Channel selection in `Go()`:
```go
workerCh := p.sharedCh
batchCh := p.sharedBatchCh
if p.chunkFn != nil {
workerCh = p.workersCh[i]
batchCh = p.workerBatchCh[i]
}
```

**Important**: `WithWorkerChanSize()` recreates ALL channels (both shared and per-worker).

### Work Distribution

**Direct mode (batchSize=0):**
- Without `WithChunkFn`: Shared channel - workers compete for items
- With `WithChunkFn`: Per-worker channels with FNV-1a hash routing

**Batching mode (batchSize>0):**
- Without `WithChunkFn`: Random accumulator slot selection via `rand.Intn(poolSize)`
- With `WithChunkFn`: Consistent hashing using FNV-1a hash:
```go
h := fnv.New32a()
h.Write([]byte(p.chunkFn(v)))
id := int(h.Sum32()) % p.poolSize
```

### Batching

When `batchSize > 0`:
1. Items accumulate in `accumulators[id]` (one per worker slot)
2. When accumulator reaches `batchSize`, batch sent to channel
3. `Close()` flushes remaining partial batches

### Lifecycle

1. Create pool: `New[T]()` or `NewStateful[T]()`
2. Configure: chain `With*()` methods (must be before `Go()`)
3. Start: `Go(ctx)` - starts worker goroutines via errgroup
4. Submit: `Submit(v)` (single producer) or `Send(v)` (concurrent-safe)
5. Finish: `Close(ctx)` - flushes batches, closes channels, waits for workers

### Metrics System

`metrics.Value` tracks per-worker stats without locks (each worker writes to own slot):
- `workerStats []Stats` - per-worker counters (Processed, Errors, Dropped, timings)
- `userData map[string]int` - custom counters (mutex-protected)

Workers access metrics via context:
```go
m := metrics.Get(ctx)
m.Inc("custom-counter")
```

Timer types: `TimerProc`, `TimerWait`, `TimerInit`, `TimerWrap`

### Middleware Pattern

Middleware applied in reverse order (first = outermost):
```go
// For stateless: wraps p.worker directly
// For stateful: wraps the maker function
for i := len(middlewares) - 1; i >= 0; i-- {
wrapped = middlewares[i](wrapped)
}
```

Built-in middleware in `middleware/`:
- `Retry(attempts, baseDelay)` - exponential backoff with jitter
- `Timeout(duration)` - per-operation timeout
- `Recovery(handler)` - panic recovery
- `Validator(fn)` - input validation
- `RateLimiter(rate, burst)` - token bucket rate limiting (global, not per-worker)

### Collector

Simple async-to-sync bridge:
```go
type Collector[V any] struct {
ch chan V
ctx context.Context
}
```

- `Submit(v)` - blocks if buffer full
- `Iter()` - returns `iter.Seq2[V, error]` (Go 1.23 range-over-func)
- `All()` - collects all into slice

### Error Handling

- Default: first error stops pool (errgroup behavior)
- `WithContinueOnError()`: accumulates errors, returns last error with total count
- Worker completion callbacks only run if no error or `continueOnError` is set
- Pool completion callback runs when last worker finishes (skipped only on `context.Canceled`, still runs on `context.DeadlineExceeded`)

### Key Implementation Details

1. `activated` flag prevents double `Go()` calls
2. `activeWorkers` atomic counter tracks live workers for pool completion callback
3. `sendMu` mutex makes `Send()` concurrent-safe (wraps `Submit()`)
4. errgroup manages worker goroutines and error propagation
5. Context from errgroup used for cancellation propagation
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pool [![Build Status](https://github.com/go-pkgz/pool/workflows/build/badge.svg)](https://github.com/go-pkgz/pool/actions) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/pool/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/pool?branch=master) [![godoc](https://godoc.org/github.com/go-pkgz/pool?status.svg)](https://godoc.org/github.com/go-pkgz/pool)

`pool` is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.23+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection.
`pool` is a Go package that provides a generic, efficient worker pool implementation for parallel task processing. Built for Go 1.24+, it offers a flexible API with features like batching, work distribution strategies, and comprehensive metrics collection.

## Features

Expand All @@ -16,7 +16,7 @@
- Optional completion callbacks
- Extensible middleware system for custom functionality
- Built-in middlewares for common tasks
- No external dependencies except for the testing framework
- Minimal dependencies (only `golang.org/x/sync` and `golang.org/x/time` at runtime)

## Quick Start

Expand Down Expand Up @@ -52,8 +52,8 @@ func main() {
return nil
})

// create a pool with 5 workers
p := pool.New[string](5, worker).WithContinueOnError(), // don't stop on errors
// create a pool with 5 workers
p := pool.New[string](5, worker).WithContinueOnError() // don't stop on errors

// start the pool
if err := p.Go(context.Background()); err != nil {
Expand Down Expand Up @@ -233,13 +233,14 @@ p := pool.New[Task](3, worker).WithChunkFn(func(t Task) string {

How distribution works:
1. Without chunk function:
- Items are distributed randomly among workers
- Direct mode: Items go to a shared channel, workers compete for items
- Batching mode: Items are assigned to random accumulator slots
- Good for independent tasks

2. With chunk function:
- Function returns string key for each item
- Items with the same key always go to the same worker
- Uses consistent hashing to map keys to workers
- Uses FNV-1a hash to map keys to workers deterministically

When to use custom distribution:
- Maintain ordering for related items
Expand Down Expand Up @@ -425,7 +426,7 @@ The `GetStats()` method returns a comprehensive `Stats` structure with the follo
**Counters:**
- `Processed` - total number of successfully processed items
- `Errors` - total number of items that returned errors
- `Dropped` - total number of items dropped (e.g., due to context cancellation)
- `Dropped` - total number of items dropped (user-incrementable via `m.IncDropped(workerID)` where `m` is the metrics object)

**Timing Metrics:**
- `ProcessingTime` - cumulative time spent processing items (max across workers)
Expand All @@ -436,7 +437,7 @@ The `GetStats()` method returns a comprehensive `Stats` structure with the follo

**Derived Statistics:**
- `RatePerSec` - items processed per second (Processed / TotalTime)
- `AvgLatency` - average processing time per item (ProcessingTime / Processed)
- `AvgLatency` - average wall-clock time per item (max ProcessingTime across workers / Processed)
- `ErrorRate` - percentage of items that failed (Errors / Total)
- `DroppedRate` - percentage of items dropped (Dropped / Total)
- `Utilization` - percentage of time spent processing vs waiting (ProcessingTime / (ProcessingTime + WaitTime))
Expand Down Expand Up @@ -523,7 +524,7 @@ p := pool.New[string](5, worker).
The completion callback executes when:
- All workers have completed processing
- Errors occurred but pool continued (`WithContinueOnError()`)
- Does not execute on context cancellation
- Skipped only on `context.Canceled` (still runs on `context.DeadlineExceeded`)

Important notes:
- Use `Submit` when sending items from a single goroutine
Expand All @@ -538,21 +539,21 @@ configuration and must be called before `Go()`; calling them after `Go()` is
unsupported and may lead to deadlocks or no-ops.

```go
p := pool.New[string](2, worker). // pool with 2 workers
WithBatchSize(10). // process items in batches
WithWorkerChanSize(5). // set worker channel buffer size
WithChunkFn(chunkFn). // control work distribution
WithContinueOnError(). // don't stop on errors
WithCompleteFn(completeFn) // called when worker finishes
p := pool.New[string](2, worker). // pool with 2 workers
WithBatchSize(10). // process items in batches
WithWorkerChanSize(5). // set worker channel buffer size
WithChunkFn(chunkFn). // control work distribution
WithContinueOnError(). // don't stop on errors
WithWorkerCompleteFn(workerComplete) // called when each worker finishes
```

Available options:
- `WithBatchSize(size int)` - enables batch processing, accumulating items before sending to workers (default: 10)
- `WithWorkerChanSize(size int)` - sets buffer size for worker channels (default: 1)
- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, random distribution)
- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, shared channel)
- `WithContinueOnError()` - continues processing on errors (default: false)
- `WithWorkerCompleteFn(fn func(ctx, id, worker))` - called on worker completion (default: none)
- `WithPoolCompleteFn(fn func(ctx))` - called on pool completion, i.e., when all workers have completed (default: none)
- `WithWorkerCompleteFn(fn func(ctx context.Context, id int, worker Worker[T]) error)` - called on worker completion (default: none)
- `WithPoolCompleteFn(fn func(ctx context.Context) error)` - called on pool completion, i.e., when all workers have completed (default: none)

## Collector

Expand Down
Loading