Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package consumer

import (
"context"

"github.com/imperfect-fourth/work/job"
"go.opentelemetry.io/otel"
)
Expand All @@ -15,7 +17,7 @@ type Consumer interface {
setSpanName(string)
}

func New[In any](name string, in job.Queue[In], fn func(In) error, opts ...Option) (Consumer, job.Queue[error]) {
func New[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...Option) (Consumer, job.Queue[error]) {
c := &consumer[In]{
name: name,
spanName: "consume job",
Expand All @@ -34,7 +36,7 @@ type consumer[In any] struct {
name string
spanName string

fn func(In) error
fn func(context.Context, In) error
in job.Queue[In]
err job.Queue[error]

Expand All @@ -48,9 +50,9 @@ func (c consumer[In]) ConsumeOnce() {
ctx, span := otel.Tracer(c.name).Start(j.Context(), c.name)
defer span.End()

_, jobspan := otel.Tracer(c.name).Start(ctx, c.spanName)
err := c.fn(j.Input())
jobspan.End()
fnctx, fnspan := otel.Tracer(c.name).Start(ctx, c.spanName)
err := c.fn(fnctx, j.Input())
fnspan.End()
if err != nil {
_, errJob := job.New(ctx, err)
c.err <- errJob
Expand Down
10 changes: 9 additions & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (p producer[Out]) ProduceOnce() {
jobs := make([]job.Job[Out], len(out))
spans := make([]trace.Span, len(out))
for i, o := range out {
rootCtx, j := job.New(context.Background(), o)
ctx := context.Background()
if co, ok := any(o).(ContextualOutput); ok {
ctx = co.Context()
}
rootCtx, j := job.New(ctx, o)
_, jobspan := otel.Tracer(p.name).Start(rootCtx, "start queue wait")
jobs[i] = j
spans[i] = jobspan
Expand Down Expand Up @@ -82,3 +86,7 @@ func (p *producer[Out]) setCooldown(t time.Duration) {
func (p *producer[Out]) setErrorQueue(err job.Queue[error]) {
p.err = err
}

type ContextualOutput interface {
Context() context.Context
}
1 change: 1 addition & 0 deletions test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test
30 changes: 20 additions & 10 deletions test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,33 @@ import (
"github.com/imperfect-fourth/work/producer"
"github.com/imperfect-fourth/work/transformer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)

func producerFn() ([]int, error) {
fmt.Println("producing")
return []int{0, 1, 2, 3, 4}, nil
}
func consumerFn(i int) error {
func consumerFn(ctx context.Context, i int) error {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Int("i", i),
)
fmt.Println(i)
time.Sleep(1 * time.Second)
return nil
}
func transformerFn(i int) (int, error) {
func transformerFn(ctx context.Context, i int) (int, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.Int("i", i),
)
time.Sleep(1 * time.Second)
return i + 1, nil
}
Expand Down Expand Up @@ -64,7 +74,7 @@ func main() {
c.Work()
}

func startTracing() (*trace.TracerProvider, error) {
func startTracing() (*sdktrace.TracerProvider, error) {
headers := map[string]string{
"content-type": "application/json",
}
Expand All @@ -82,14 +92,14 @@ func startTracing() (*trace.TracerProvider, error) {
return nil, fmt.Errorf("creating new exporter: %w", err)
}

tracerprovider := trace.NewTracerProvider(
trace.WithBatcher(
tracerprovider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(
exporter,
trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize),
trace.WithBatchTimeout(trace.DefaultScheduleDelay*time.Millisecond),
trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize),
sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize),
sdktrace.WithBatchTimeout(sdktrace.DefaultScheduleDelay*time.Millisecond),
sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize),
),
trace.WithResource(
sdktrace.WithResource(
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("test"),
Expand Down
Binary file removed test/test
Binary file not shown.
10 changes: 6 additions & 4 deletions transformer/transformer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package transformer

import (
"context"

"github.com/imperfect-fourth/work/job"
"go.opentelemetry.io/otel"
)
Expand All @@ -15,7 +17,7 @@ type Transformer interface {
setSpanName(string)
}

func New[In any, Out any](name string, in job.Queue[In], fn func(In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) {
func New[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) {
t := &transformer[In, Out]{
name: name,
spanName: "transform job",
Expand All @@ -35,7 +37,7 @@ type transformer[In any, Out any] struct {
name string
spanName string

fn func(In) (Out, error)
fn func(context.Context, In) (Out, error)
in job.Queue[In]
out job.Queue[Out]
err job.Queue[error]
Expand All @@ -48,8 +50,8 @@ func (t transformer[In, Out]) TransformOnce() {
ctx, span := otel.Tracer(t.name).Start(j.Context(), t.name)
defer span.End()

_, fnspan := otel.Tracer(t.name).Start(ctx, t.spanName)
o, err := t.fn(j.Input())
fnctx, fnspan := otel.Tracer(t.name).Start(ctx, t.spanName)
o, err := t.fn(fnctx, j.Input())
fnspan.End()
if err != nil {
_, errJob := job.New(ctx, err)
Expand Down
6 changes: 4 additions & 2 deletions work.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package work

import (
"context"

"github.com/imperfect-fourth/work/consumer"
"github.com/imperfect-fourth/work/job"
"github.com/imperfect-fourth/work/producer"
Expand All @@ -11,10 +13,10 @@ func NewProducer[Out any](name string, fn func() ([]Out, error), opts ...produce
return producer.New(name, fn, opts...)
}

func NewTransformer[In any, Out any](name string, in job.Queue[In], fn func(In) (Out, error), opts ...transformer.Option) (transformer.Transformer, job.Queue[Out], job.Queue[error]) {
func NewTransformer[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...transformer.Option) (transformer.Transformer, job.Queue[Out], job.Queue[error]) {
return transformer.New(name, in, fn, opts...)
}

func NewConsumer[In any](name string, in job.Queue[In], fn func(In) error, opts ...consumer.Option) (consumer.Consumer, job.Queue[error]) {
func NewConsumer[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...consumer.Option) (consumer.Consumer, job.Queue[error]) {
return consumer.New(name, in, fn, opts...)
}