Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
go-version-file: go.mod
cache-dependency-path: go.sum
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
uses: golangci/golangci-lint-action@v9
with:
version: latest
args: -c .golangci.yaml --out-format=colored-line-number
args: -c .golangci.yaml
63 changes: 36 additions & 27 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# This file was adapted from https://github.com/dagger/dagger/blob/main/.golangci.yml
# to get a decent set of defaults.

version: "2"
output:
formats:
text:
path: stdout
print-linter-name: true
print-issued-lines: true
linters:
disable-all: true
default: none
enable:
- bodyclose
- copyloopvar
- dogsled
- dupl
- copyloopvar
- gocritic
- gocyclo
- gofmt
- goimports
- goprintffuncname
- gosec
- gosimple
- govet
- ineffassign
- misspell
Expand All @@ -24,25 +25,33 @@ linters:
- revive
- rowserrcheck
- staticcheck
- stylecheck
- typecheck
- unconvert
- unused
- whitespace

linters-settings:
revive:
rules:
# This rule is annoying. Often you want to name the
# parameters for clarity because it conforms to an
# interface.
- name: unused-parameter
severity: warning
disabled: true

output:
formats:
- format: colored-line-number
print-issued-lines: true
print-linter-name: true
sort-results: true
settings:
revive:
rules:
- name: unused-parameter
severity: warning
disabled: true
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gofmt
- goimports
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
43 changes: 28 additions & 15 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,30 @@ import (
"go.opentelemetry.io/otel"
)

type Consumer interface {
type Consumer[In any] interface {
ConsumeOnce()
Consume()
Work()

setErrorQueue(err job.Queue[error])
setWorkerPoolSize(int)
setSpanName(string)
Input() job.Queue[In]
Error() job.Queue[error]

WithInput(job.Queue[In]) Consumer[In]
WithErrorQueue(job.Queue[error]) Consumer[In]
WithWorkerPoolSize(int) Consumer[In]
WithSpanName(string) Consumer[In]
}

func New[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...Option) (Consumer, job.Queue[error]) {
func New[In any](name string, fn func(context.Context, In) error) Consumer[In] {
c := &consumer[In]{
name: name,
spanName: "consume job",
fn: fn,
in: in,
in: make(job.Queue[In]),
err: make(job.Queue[error]),
workerPoolSize: 1,
}
for _, opt := range opts {
opt(c)
}
return c, c.err
return c
}

type consumer[In any] struct {
Expand Down Expand Up @@ -74,14 +75,26 @@ func (c consumer[In]) Work() {
c.Consume()
}

func (c *consumer[In]) setErrorQueue(err job.Queue[error]) {
c.err = err
func (c *consumer[In]) Input() job.Queue[In] {
return c.in
}
func (c *consumer[In]) Error() job.Queue[error] {
return c.err
}

func (c *consumer[In]) setWorkerPoolSize(n int) {
func (c *consumer[In]) WithInput(in job.Queue[In]) Consumer[In] {
c.in = in
return c
}
func (c *consumer[In]) WithErrorQueue(err job.Queue[error]) Consumer[In] {
c.err = err
return c
}
func (c *consumer[In]) WithWorkerPoolSize(n int) Consumer[In] {
c.workerPoolSize = n
return c
}

func (c *consumer[In]) setSpanName(name string) {
func (c *consumer[In]) WithSpanName(name string) Consumer[In] {
c.spanName = name
return c
}
23 changes: 0 additions & 23 deletions consumer/consumer_opts.go

This file was deleted.

29 changes: 19 additions & 10 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,25 @@ import (
"go.opentelemetry.io/otel/trace"
)

type Producer interface {
type Producer[Out any] interface {
ProduceOnce()
Produce()
Work()
Output() job.Queue[Out]
Error() job.Queue[error]

setCooldown(time.Duration)
setErrorQueue(job.Queue[error])
WithCooldown(time.Duration) Producer[Out]
WithErrorQueue(job.Queue[error]) Producer[Out]
}

func New[Out any](name string, fn func() ([]Out, error), opts ...Option) (Producer, job.Queue[Out], job.Queue[error]) {
func New[Out any](name string, fn func() ([]Out, error)) Producer[Out] {
p := &producer[Out]{
name: name,
fn: fn,
out: make(chan job.Job[Out]),
err: make(chan job.Job[error]),
}
for _, opt := range opts {
opt(p)
}
return p, p.out, p.err
return p
}

type producer[Out any] struct {
Expand Down Expand Up @@ -79,12 +78,22 @@ func (p producer[Out]) Work() {
p.Produce()
}

func (p *producer[Out]) setCooldown(t time.Duration) {
func (p producer[Out]) Output() job.Queue[Out] {
return p.out
}

func (p producer[Out]) Error() job.Queue[error] {
return p.err
}

func (p *producer[Out]) WithCooldown(t time.Duration) Producer[Out] {
p.cooldown = t
return p
}

func (p *producer[Out]) setErrorQueue(err job.Queue[error]) {
func (p *producer[Out]) WithErrorQueue(err job.Queue[error]) Producer[Out] {
p.err = err
return p
}

type ContextualOutput interface {
Expand Down
21 changes: 0 additions & 21 deletions producer/producer_opts.go

This file was deleted.

22 changes: 10 additions & 12 deletions test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"time"

"github.com/imperfect-fourth/work"
"github.com/imperfect-fourth/work/consumer"
"github.com/imperfect-fourth/work/producer"
"github.com/imperfect-fourth/work/transformer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand Down Expand Up @@ -54,23 +51,24 @@ func main() {
}
}()

producer, producedIntChan, _ := work.NewProducer(
producer := work.NewProducer(
"int producer",
producerFn,
producer.WithCooldown(10*time.Second),
)
).WithCooldown(10 * time.Second)
go producer.Work()

transformer, transformedIntChan, _ := work.NewTransformer(
transformer := work.NewTransformer(
"int transformer",
producedIntChan,
transformerFn,
transformer.WithWorkerPoolSize(1),
transformer.WithSpanName("sleeping one and adding one"),
)
).
WithInput(producer.Output()).
WithWorkerPoolSize(1)

go transformer.Work()

c, _ := work.NewConsumer("int consumer", transformedIntChan, consumerFn, consumer.WithSpanName("sleeping one and printing"))
c := work.NewConsumer("int consumer", consumerFn).
WithInput(transformer.Output())

c.Work()
}

Expand Down
47 changes: 32 additions & 15 deletions transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import (
"go.opentelemetry.io/otel"
)

type Transformer interface {
type Transformer[In, Out any] interface {
TransformOnce()
Transform()
Work()

setErrorQueue(job.Queue[error])
setWorkerPoolSize(int)
setSpanName(string)
Input() job.Queue[In]
Output() job.Queue[Out]
Error() job.Queue[error]

WithInput(job.Queue[In]) Transformer[In, Out]
WithErrorQueue(job.Queue[error]) Transformer[In, Out]
WithWorkerPoolSize(int) Transformer[In, Out]
WithSpanName(string) Transformer[In, Out]
}

func New[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) {
func New[In any, Out any](name string, fn func(context.Context, In) (Out, error)) Transformer[In, Out] {
t := &transformer[In, Out]{
name: name,
spanName: "transform job",
fn: fn,
in: in,
in: make(job.Queue[In]),
out: make(job.Queue[Out]),
err: make(job.Queue[error]),
workerPoolSize: 1,
}
for _, opt := range opts {
opt(t)
}
return t, t.out, t.err
return t
}

type transformer[In any, Out any] struct {
Expand Down Expand Up @@ -80,14 +82,29 @@ func (t transformer[In, Out]) Work() {
t.Transform()
}

func (t *transformer[In, Out]) setErrorQueue(err job.Queue[error]) {
t.err = err
func (t *transformer[In, Out]) Input() job.Queue[In] {
return t.in
}
func (t *transformer[In, Out]) Output() job.Queue[Out] {
return t.out
}
func (t *transformer[In, Out]) Error() job.Queue[error] {
return t.err
}

func (t *transformer[In, Out]) setWorkerPoolSize(n int) {
func (t *transformer[In, Out]) WithInput(in job.Queue[In]) Transformer[In, Out] {
t.in = in
return t
}
func (t *transformer[In, Out]) WithErrorQueue(err job.Queue[error]) Transformer[In, Out] {
t.err = err
return t
}
func (t *transformer[In, Out]) WithWorkerPoolSize(n int) Transformer[In, Out] {
t.workerPoolSize = n
return t
}

func (t *transformer[In, Out]) setSpanName(name string) {
func (t *transformer[In, Out]) WithSpanName(name string) Transformer[In, Out] {
t.spanName = name
return t
}
Loading