From 357b1c5dab5014798521032e2c8c90fe5ad7d432 Mon Sep 17 00:00:00 2001 From: Divi Date: Sun, 16 Mar 2025 14:27:10 +0530 Subject: [PATCH 1/6] rewrite api --- consumer/consumer.go | 43 +++++++++++++++++++----------- consumer/consumer_opts.go | 23 ---------------- producer/producer.go | 29 +++++++++++++------- producer/producer_opts.go | 21 --------------- test/main.go | 24 ++++++++--------- transformer/transformer.go | 47 ++++++++++++++++++++++----------- transformer/transformer_opts.go | 23 ---------------- work.go | 13 +++++---- 8 files changed, 97 insertions(+), 126 deletions(-) delete mode 100644 consumer/consumer_opts.go delete mode 100644 producer/producer_opts.go delete mode 100644 transformer/transformer_opts.go diff --git a/consumer/consumer.go b/consumer/consumer.go index 4da7564..7d1d48e 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -7,29 +7,30 @@ import ( "go.opentelemetry.io/otel" ) -type Consumer interface { +type Consumer[In any] interface { ConsumeOnce() Consume() Work() - setErrorQueue(err job.Queue[error]) - setWorkerPoolSize(int) - setSpanName(string) + Input() job.Queue[In] + Error() job.Queue[error] + + WithInput(job.Queue[In]) Consumer[In] + WithErrorQueue(job.Queue[error]) Consumer[In] + WithWorkerPoolSize(int) Consumer[In] + WithSpanName(string) Consumer[In] } -func New[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...Option) (Consumer, job.Queue[error]) { +func New[In any](name string, fn func(context.Context, In) error) Consumer[In] { c := &consumer[In]{ name: name, spanName: "consume job", fn: fn, - in: in, + in: make(job.Queue[In]), err: make(job.Queue[error]), workerPoolSize: 1, } - for _, opt := range opts { - opt(c) - } - return c, c.err + return c } type consumer[In any] struct { @@ -74,14 +75,26 @@ func (c consumer[In]) Work() { c.Consume() } -func (c *consumer[In]) setErrorQueue(err job.Queue[error]) { - c.err = err +func (c *consumer[In]) Input() job.Queue[In] { + return c.in +} +func (c *consumer[In]) Error() job.Queue[error] { + return c.err } -func (c *consumer[In]) setWorkerPoolSize(n int) { +func (c *consumer[In]) WithInput(in job.Queue[In]) Consumer[In] { + c.in = in + return c +} +func (c *consumer[In]) WithErrorQueue(err job.Queue[error]) Consumer[In] { + c.err = err + return c +} +func (c *consumer[In]) WithWorkerPoolSize(n int) Consumer[In] { c.workerPoolSize = n + return c } - -func (c *consumer[In]) setSpanName(name string) { +func (c *consumer[In]) WithSpanName(name string) Consumer[In] { c.spanName = name + return c } diff --git a/consumer/consumer_opts.go b/consumer/consumer_opts.go deleted file mode 100644 index ad803dc..0000000 --- a/consumer/consumer_opts.go +++ /dev/null @@ -1,23 +0,0 @@ -package consumer - -import "github.com/imperfect-fourth/work/job" - -type Option func(Consumer) - -func WithErrorQueue(err job.Queue[error]) Option { - return func(c Consumer) { - c.setErrorQueue(err) - } -} - -func WithWorkerPoolSize(n int) Option { - return func(c Consumer) { - c.setWorkerPoolSize(n) - } -} - -func WithSpanName(name string) Option { - return func(c Consumer) { - c.setSpanName(name) - } -} diff --git a/producer/producer.go b/producer/producer.go index b0ea5b7..95ff6fd 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -9,26 +9,25 @@ import ( "go.opentelemetry.io/otel/trace" ) -type Producer interface { +type Producer[Out any] interface { ProduceOnce() Produce() Work() + Output() job.Queue[Out] + Error() job.Queue[error] - setCooldown(time.Duration) - setErrorQueue(job.Queue[error]) + WithCooldown(time.Duration) Producer[Out] + WithErrorQueue(job.Queue[error]) Producer[Out] } -func New[Out any](name string, fn func() ([]Out, error), opts ...Option) (Producer, job.Queue[Out], job.Queue[error]) { +func New[Out any](name string, fn func() ([]Out, error)) Producer[Out] { p := &producer[Out]{ name: name, fn: fn, out: make(chan job.Job[Out]), err: make(chan job.Job[error]), } - for _, opt := range opts { - opt(p) - } - return p, p.out, p.err + return p } type producer[Out any] struct { @@ -79,12 +78,22 @@ func (p producer[Out]) Work() { p.Produce() } -func (p *producer[Out]) setCooldown(t time.Duration) { +func (p producer[Out]) Output() job.Queue[Out] { + return p.out +} + +func (p producer[Out]) Error() job.Queue[error] { + return p.err +} + +func (p *producer[Out]) WithCooldown(t time.Duration) Producer[Out] { p.cooldown = t + return p } -func (p *producer[Out]) setErrorQueue(err job.Queue[error]) { +func (p *producer[Out]) WithErrorQueue(err job.Queue[error]) Producer[Out] { p.err = err + return p } type ContextualOutput interface { diff --git a/producer/producer_opts.go b/producer/producer_opts.go deleted file mode 100644 index 657fb55..0000000 --- a/producer/producer_opts.go +++ /dev/null @@ -1,21 +0,0 @@ -package producer - -import ( - "time" - - "github.com/imperfect-fourth/work/job" -) - -type Option func(Producer) - -func WithCooldown(interval time.Duration) Option { - return func(p Producer) { - p.setCooldown(interval) - } -} - -func WithErrorQueue(err job.Queue[error]) Option { - return func(p Producer) { - p.setErrorQueue(err) - } -} diff --git a/test/main.go b/test/main.go index 1e25e38..d49d97a 100644 --- a/test/main.go +++ b/test/main.go @@ -8,9 +8,6 @@ import ( "time" "github.com/imperfect-fourth/work" - "github.com/imperfect-fourth/work/consumer" - "github.com/imperfect-fourth/work/producer" - "github.com/imperfect-fourth/work/transformer" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -54,23 +51,26 @@ func main() { } }() - producer, producedIntChan, _ := work.NewProducer( + producer := work.NewProducer( "int producer", producerFn, - producer.WithCooldown(10*time.Second), - ) + ).WithCooldown(10 * time.Second) go producer.Work() - transformer, transformedIntChan, _ := work.NewTransformer( + transformer := work.NewTransformer( "int transformer", - producedIntChan, transformerFn, - transformer.WithWorkerPoolSize(1), - transformer.WithSpanName("sleeping one and adding one"), - ) + ). + WithInput(producer.Output()). + WithWorkerPoolSize(1). + WithSpanName("sleeping one and adding one") + go transformer.Work() - c, _ := work.NewConsumer("int consumer", transformedIntChan, consumerFn, consumer.WithSpanName("sleeping one and printing")) + c := work.NewConsumer("int consumer", consumerFn). + WithInput(transformer.Output()). + WithSpanName("sleeping one and printing") + c.Work() } diff --git a/transformer/transformer.go b/transformer/transformer.go index e34d31c..396dd1f 100644 --- a/transformer/transformer.go +++ b/transformer/transformer.go @@ -7,30 +7,32 @@ import ( "go.opentelemetry.io/otel" ) -type Transformer interface { +type Transformer[In, Out any] interface { TransformOnce() Transform() Work() - setErrorQueue(job.Queue[error]) - setWorkerPoolSize(int) - setSpanName(string) + Input() job.Queue[In] + Output() job.Queue[Out] + Error() job.Queue[error] + + WithInput(job.Queue[In]) Transformer[In, Out] + WithErrorQueue(job.Queue[error]) Transformer[In, Out] + WithWorkerPoolSize(int) Transformer[In, Out] + WithSpanName(string) Transformer[In, Out] } -func New[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) { +func New[In any, Out any](name string, fn func(context.Context, In) (Out, error)) Transformer[In, Out] { t := &transformer[In, Out]{ name: name, spanName: "transform job", fn: fn, - in: in, + in: make(job.Queue[In]), out: make(job.Queue[Out]), err: make(job.Queue[error]), workerPoolSize: 1, } - for _, opt := range opts { - opt(t) - } - return t, t.out, t.err + return t } type transformer[In any, Out any] struct { @@ -80,14 +82,29 @@ func (t transformer[In, Out]) Work() { t.Transform() } -func (t *transformer[In, Out]) setErrorQueue(err job.Queue[error]) { - t.err = err +func (t *transformer[In, Out]) Input() job.Queue[In] { + return t.in +} +func (t *transformer[In, Out]) Output() job.Queue[Out] { + return t.out +} +func (t *transformer[In, Out]) Error() job.Queue[error] { + return t.err } -func (t *transformer[In, Out]) setWorkerPoolSize(n int) { +func (t *transformer[In, Out]) WithInput(in job.Queue[In]) Transformer[In, Out] { + t.in = in + return t +} +func (t *transformer[In, Out]) WithErrorQueue(err job.Queue[error]) Transformer[In, Out] { + t.err = err + return t +} +func (t *transformer[In, Out]) WithWorkerPoolSize(n int) Transformer[In, Out] { t.workerPoolSize = n + return t } - -func (t *transformer[In, Out]) setSpanName(name string) { +func (t *transformer[In, Out]) WithSpanName(name string) Transformer[In, Out] { t.spanName = name + return t } diff --git a/transformer/transformer_opts.go b/transformer/transformer_opts.go deleted file mode 100644 index d6b0854..0000000 --- a/transformer/transformer_opts.go +++ /dev/null @@ -1,23 +0,0 @@ -package transformer - -import "github.com/imperfect-fourth/work/job" - -type Option func(Transformer) - -func WithErrorQueue(err job.Queue[error]) Option { - return func(t Transformer) { - t.setErrorQueue(err) - } -} - -func WithWorkerPoolSize(n int) Option { - return func(t Transformer) { - t.setWorkerPoolSize(n) - } -} - -func WithSpanName(name string) Option { - return func(t Transformer) { - t.setSpanName(name) - } -} diff --git a/work.go b/work.go index ecdc5fb..fefa3c9 100644 --- a/work.go +++ b/work.go @@ -4,19 +4,18 @@ import ( "context" "github.com/imperfect-fourth/work/consumer" - "github.com/imperfect-fourth/work/job" "github.com/imperfect-fourth/work/producer" "github.com/imperfect-fourth/work/transformer" ) -func NewProducer[Out any](name string, fn func() ([]Out, error), opts ...producer.Option) (producer.Producer, job.Queue[Out], job.Queue[error]) { - return producer.New(name, fn, opts...) +func NewProducer[Out any](name string, fn func() ([]Out, error)) producer.Producer[Out] { + return producer.New(name, fn) } -func NewTransformer[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...transformer.Option) (transformer.Transformer, job.Queue[Out], job.Queue[error]) { - return transformer.New(name, in, fn, opts...) +func NewTransformer[In any, Out any](name string, fn func(context.Context, In) (Out, error)) transformer.Transformer[In, Out] { + return transformer.New(name, fn) } -func NewConsumer[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...consumer.Option) (consumer.Consumer, job.Queue[error]) { - return consumer.New(name, in, fn, opts...) +func NewConsumer[In any](name string, fn func(context.Context, In) error) consumer.Consumer[In] { + return consumer.New(name, fn) } From caac972a1792561831791c43407211781dd5a6cd Mon Sep 17 00:00:00 2001 From: divi Date: Sun, 28 Dec 2025 17:41:09 +0530 Subject: [PATCH 2/6] remove span name api --- consumer/consumer.go | 10 ++-------- producer/producer.go | 3 ++- transformer/transformer.go | 12 +++--------- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 7d1d48e..ba3b6be 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "fmt" "github.com/imperfect-fourth/work/job" "go.opentelemetry.io/otel" @@ -18,13 +19,11 @@ type Consumer[In any] interface { WithInput(job.Queue[In]) Consumer[In] WithErrorQueue(job.Queue[error]) Consumer[In] WithWorkerPoolSize(int) Consumer[In] - WithSpanName(string) Consumer[In] } func New[In any](name string, fn func(context.Context, In) error) Consumer[In] { c := &consumer[In]{ name: name, - spanName: "consume job", fn: fn, in: make(job.Queue[In]), err: make(job.Queue[error]), @@ -35,7 +34,6 @@ func New[In any](name string, fn func(context.Context, In) error) Consumer[In] { type consumer[In any] struct { name string - spanName string fn func(context.Context, In) error in job.Queue[In] @@ -51,7 +49,7 @@ func (c consumer[In]) ConsumeOnce() { ctx, span := otel.Tracer(c.name).Start(j.Context(), c.name) defer span.End() - fnctx, fnspan := otel.Tracer(c.name).Start(ctx, c.spanName) + fnctx, fnspan := otel.Tracer(c.name).Start(ctx, fmt.Sprintf("%s - run function", c.name)) err := c.fn(fnctx, j.Input()) fnspan.End() if err != nil { @@ -94,7 +92,3 @@ func (c *consumer[In]) WithWorkerPoolSize(n int) Consumer[In] { c.workerPoolSize = n return c } -func (c *consumer[In]) WithSpanName(name string) Consumer[In] { - c.spanName = name - return c -} diff --git a/producer/producer.go b/producer/producer.go index 95ff6fd..e90c532 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -2,6 +2,7 @@ package producer import ( "context" + "fmt" "time" "github.com/imperfect-fourth/work/job" @@ -56,7 +57,7 @@ func (p producer[Out]) ProduceOnce() { ctx = co.Context() } rootCtx, j := job.New(ctx, o) - _, jobspan := otel.Tracer(p.name).Start(rootCtx, "start queue wait") + _, jobspan := otel.Tracer(p.name).Start(rootCtx, fmt.Sprintf("%s - output queue wait", p.name)) jobs[i] = j spans[i] = jobspan } diff --git a/transformer/transformer.go b/transformer/transformer.go index 396dd1f..72eaf61 100644 --- a/transformer/transformer.go +++ b/transformer/transformer.go @@ -2,6 +2,7 @@ package transformer import ( "context" + "fmt" "github.com/imperfect-fourth/work/job" "go.opentelemetry.io/otel" @@ -19,13 +20,11 @@ type Transformer[In, Out any] interface { WithInput(job.Queue[In]) Transformer[In, Out] WithErrorQueue(job.Queue[error]) Transformer[In, Out] WithWorkerPoolSize(int) Transformer[In, Out] - WithSpanName(string) Transformer[In, Out] } func New[In any, Out any](name string, fn func(context.Context, In) (Out, error)) Transformer[In, Out] { t := &transformer[In, Out]{ name: name, - spanName: "transform job", fn: fn, in: make(job.Queue[In]), out: make(job.Queue[Out]), @@ -37,7 +36,6 @@ func New[In any, Out any](name string, fn func(context.Context, In) (Out, error) type transformer[In any, Out any] struct { name string - spanName string fn func(context.Context, In) (Out, error) in job.Queue[In] @@ -52,7 +50,7 @@ func (t transformer[In, Out]) TransformOnce() { ctx, span := otel.Tracer(t.name).Start(j.Context(), t.name) defer span.End() - fnctx, fnspan := otel.Tracer(t.name).Start(ctx, t.spanName) + fnctx, fnspan := otel.Tracer(t.name).Start(ctx, fmt.Sprintf("%s - run function", t.name)) o, err := t.fn(fnctx, j.Input()) fnspan.End() if err != nil { @@ -61,7 +59,7 @@ func (t transformer[In, Out]) TransformOnce() { return } - _, jobspan := otel.Tracer(t.name).Start(ctx, "output queue wait") + _, jobspan := otel.Tracer(t.name).Start(ctx, fmt.Sprintf("%s - output queue wait", t.name)) _, newJob := job.New(j.Context(), o) t.out <- newJob jobspan.End() @@ -104,7 +102,3 @@ func (t *transformer[In, Out]) WithWorkerPoolSize(n int) Transformer[In, Out] { t.workerPoolSize = n return t } -func (t *transformer[In, Out]) WithSpanName(name string) Transformer[In, Out] { - t.spanName = name - return t -} From ade57e25b59e5a2ecf42d2ef46bc4c9ce6b9104b Mon Sep 17 00:00:00 2001 From: divi Date: Sun, 28 Dec 2025 17:57:52 +0530 Subject: [PATCH 3/6] lint --- .golangci.yaml | 63 ++++++++++++++++++++++---------------- consumer/consumer.go | 2 +- test/main.go | 6 ++-- transformer/transformer.go | 2 +- 4 files changed, 40 insertions(+), 33 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 21fb7ad..532c78c 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,20 +1,21 @@ -# This file was adapted from https://github.com/dagger/dagger/blob/main/.golangci.yml -# to get a decent set of defaults. - +version: "2" +output: + formats: + text: + path: stdout + print-linter-name: true + print-issued-lines: true linters: - disable-all: true + default: none enable: - bodyclose + - copyloopvar - dogsled - dupl - - copyloopvar - gocritic - gocyclo - - gofmt - - goimports - goprintffuncname - gosec - - gosimple - govet - ineffassign - misspell @@ -24,25 +25,33 @@ linters: - revive - rowserrcheck - staticcheck - - stylecheck - - typecheck - unconvert - unused - whitespace - -linters-settings: - revive: - rules: - # This rule is annoying. Often you want to name the - # parameters for clarity because it conforms to an - # interface. - - name: unused-parameter - severity: warning - disabled: true - -output: - formats: - - format: colored-line-number - print-issued-lines: true - print-linter-name: true - sort-results: true + settings: + revive: + rules: + - name: unused-parameter + severity: warning + disabled: true + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gofmt + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/consumer/consumer.go b/consumer/consumer.go index ba3b6be..df9afbe 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -33,7 +33,7 @@ func New[In any](name string, fn func(context.Context, In) error) Consumer[In] { } type consumer[In any] struct { - name string + name string fn func(context.Context, In) error in job.Queue[In] diff --git a/test/main.go b/test/main.go index d49d97a..605ddf6 100644 --- a/test/main.go +++ b/test/main.go @@ -62,14 +62,12 @@ func main() { transformerFn, ). WithInput(producer.Output()). - WithWorkerPoolSize(1). - WithSpanName("sleeping one and adding one") + WithWorkerPoolSize(1) go transformer.Work() c := work.NewConsumer("int consumer", consumerFn). - WithInput(transformer.Output()). - WithSpanName("sleeping one and printing") + WithInput(transformer.Output()) c.Work() } diff --git a/transformer/transformer.go b/transformer/transformer.go index 72eaf61..ca424b2 100644 --- a/transformer/transformer.go +++ b/transformer/transformer.go @@ -35,7 +35,7 @@ func New[In any, Out any](name string, fn func(context.Context, In) (Out, error) } type transformer[In any, Out any] struct { - name string + name string fn func(context.Context, In) (Out, error) in job.Queue[In] From f6873c3892a23c5934d1b78c9d3adcf3d4222489 Mon Sep 17 00:00:00 2001 From: divi Date: Mon, 29 Dec 2025 10:33:37 +0530 Subject: [PATCH 4/6] update golangci-lint in workflow --- .github/workflows/lint.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index ddd335e..94b382d 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -24,7 +24,7 @@ jobs: go-version-file: go.mod cache-dependency-path: go.sum - name: golangci-lint - uses: golangci/golangci-lint-action@v6 + uses: golangci/golangci-lint-action@v9 with: version: latest args: -c .golangci.yaml --out-format=colored-line-number From 384a3e5ca4ebf7c6049d53e7121ef7ec54dacb47 Mon Sep 17 00:00:00 2001 From: divi Date: Mon, 29 Dec 2025 10:36:35 +0530 Subject: [PATCH 5/6] remove flag --- .github/workflows/lint.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 94b382d..46fc61d 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -27,4 +27,4 @@ jobs: uses: golangci/golangci-lint-action@v9 with: version: latest - args: -c .golangci.yaml --out-format=colored-line-number + args: -c .golangci.yaml From 1dd645ad0265dba4782df665d44650a116fe7a2f Mon Sep 17 00:00:00 2001 From: divi Date: Mon, 29 Dec 2025 12:32:01 +0530 Subject: [PATCH 6/6] Revert "remove span name api" This reverts commit caac972a1792561831791c43407211781dd5a6cd. --- consumer/consumer.go | 12 +++++++++--- producer/producer.go | 3 +-- transformer/transformer.go | 14 ++++++++++---- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index df9afbe..7d1d48e 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -2,7 +2,6 @@ package consumer import ( "context" - "fmt" "github.com/imperfect-fourth/work/job" "go.opentelemetry.io/otel" @@ -19,11 +18,13 @@ type Consumer[In any] interface { WithInput(job.Queue[In]) Consumer[In] WithErrorQueue(job.Queue[error]) Consumer[In] WithWorkerPoolSize(int) Consumer[In] + WithSpanName(string) Consumer[In] } func New[In any](name string, fn func(context.Context, In) error) Consumer[In] { c := &consumer[In]{ name: name, + spanName: "consume job", fn: fn, in: make(job.Queue[In]), err: make(job.Queue[error]), @@ -33,7 +34,8 @@ func New[In any](name string, fn func(context.Context, In) error) Consumer[In] { } type consumer[In any] struct { - name string + name string + spanName string fn func(context.Context, In) error in job.Queue[In] @@ -49,7 +51,7 @@ func (c consumer[In]) ConsumeOnce() { ctx, span := otel.Tracer(c.name).Start(j.Context(), c.name) defer span.End() - fnctx, fnspan := otel.Tracer(c.name).Start(ctx, fmt.Sprintf("%s - run function", c.name)) + fnctx, fnspan := otel.Tracer(c.name).Start(ctx, c.spanName) err := c.fn(fnctx, j.Input()) fnspan.End() if err != nil { @@ -92,3 +94,7 @@ func (c *consumer[In]) WithWorkerPoolSize(n int) Consumer[In] { c.workerPoolSize = n return c } +func (c *consumer[In]) WithSpanName(name string) Consumer[In] { + c.spanName = name + return c +} diff --git a/producer/producer.go b/producer/producer.go index e90c532..95ff6fd 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -2,7 +2,6 @@ package producer import ( "context" - "fmt" "time" "github.com/imperfect-fourth/work/job" @@ -57,7 +56,7 @@ func (p producer[Out]) ProduceOnce() { ctx = co.Context() } rootCtx, j := job.New(ctx, o) - _, jobspan := otel.Tracer(p.name).Start(rootCtx, fmt.Sprintf("%s - output queue wait", p.name)) + _, jobspan := otel.Tracer(p.name).Start(rootCtx, "start queue wait") jobs[i] = j spans[i] = jobspan } diff --git a/transformer/transformer.go b/transformer/transformer.go index ca424b2..396dd1f 100644 --- a/transformer/transformer.go +++ b/transformer/transformer.go @@ -2,7 +2,6 @@ package transformer import ( "context" - "fmt" "github.com/imperfect-fourth/work/job" "go.opentelemetry.io/otel" @@ -20,11 +19,13 @@ type Transformer[In, Out any] interface { WithInput(job.Queue[In]) Transformer[In, Out] WithErrorQueue(job.Queue[error]) Transformer[In, Out] WithWorkerPoolSize(int) Transformer[In, Out] + WithSpanName(string) Transformer[In, Out] } func New[In any, Out any](name string, fn func(context.Context, In) (Out, error)) Transformer[In, Out] { t := &transformer[In, Out]{ name: name, + spanName: "transform job", fn: fn, in: make(job.Queue[In]), out: make(job.Queue[Out]), @@ -35,7 +36,8 @@ func New[In any, Out any](name string, fn func(context.Context, In) (Out, error) } type transformer[In any, Out any] struct { - name string + name string + spanName string fn func(context.Context, In) (Out, error) in job.Queue[In] @@ -50,7 +52,7 @@ func (t transformer[In, Out]) TransformOnce() { ctx, span := otel.Tracer(t.name).Start(j.Context(), t.name) defer span.End() - fnctx, fnspan := otel.Tracer(t.name).Start(ctx, fmt.Sprintf("%s - run function", t.name)) + fnctx, fnspan := otel.Tracer(t.name).Start(ctx, t.spanName) o, err := t.fn(fnctx, j.Input()) fnspan.End() if err != nil { @@ -59,7 +61,7 @@ func (t transformer[In, Out]) TransformOnce() { return } - _, jobspan := otel.Tracer(t.name).Start(ctx, fmt.Sprintf("%s - output queue wait", t.name)) + _, jobspan := otel.Tracer(t.name).Start(ctx, "output queue wait") _, newJob := job.New(j.Context(), o) t.out <- newJob jobspan.End() @@ -102,3 +104,7 @@ func (t *transformer[In, Out]) WithWorkerPoolSize(n int) Transformer[In, Out] { t.workerPoolSize = n return t } +func (t *transformer[In, Out]) WithSpanName(name string) Transformer[In, Out] { + t.spanName = name + return t +}