diff --git a/consumer/consumer.go b/consumer/consumer.go index c80e06e..4da7564 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -1,6 +1,8 @@ package consumer import ( + "context" + "github.com/imperfect-fourth/work/job" "go.opentelemetry.io/otel" ) @@ -15,7 +17,7 @@ type Consumer interface { setSpanName(string) } -func New[In any](name string, in job.Queue[In], fn func(In) error, opts ...Option) (Consumer, job.Queue[error]) { +func New[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...Option) (Consumer, job.Queue[error]) { c := &consumer[In]{ name: name, spanName: "consume job", @@ -34,7 +36,7 @@ type consumer[In any] struct { name string spanName string - fn func(In) error + fn func(context.Context, In) error in job.Queue[In] err job.Queue[error] @@ -48,9 +50,9 @@ func (c consumer[In]) ConsumeOnce() { ctx, span := otel.Tracer(c.name).Start(j.Context(), c.name) defer span.End() - _, jobspan := otel.Tracer(c.name).Start(ctx, c.spanName) - err := c.fn(j.Input()) - jobspan.End() + fnctx, fnspan := otel.Tracer(c.name).Start(ctx, c.spanName) + err := c.fn(fnctx, j.Input()) + fnspan.End() if err != nil { _, errJob := job.New(ctx, err) c.err <- errJob diff --git a/producer/producer.go b/producer/producer.go index 10af2f2..b0ea5b7 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -52,7 +52,11 @@ func (p producer[Out]) ProduceOnce() { jobs := make([]job.Job[Out], len(out)) spans := make([]trace.Span, len(out)) for i, o := range out { - rootCtx, j := job.New(context.Background(), o) + ctx := context.Background() + if co, ok := any(o).(ContextualOutput); ok { + ctx = co.Context() + } + rootCtx, j := job.New(ctx, o) _, jobspan := otel.Tracer(p.name).Start(rootCtx, "start queue wait") jobs[i] = j spans[i] = jobspan @@ -82,3 +86,7 @@ func (p *producer[Out]) setCooldown(t time.Duration) { func (p *producer[Out]) setErrorQueue(err job.Queue[error]) { p.err = err } + +type ContextualOutput interface { + Context() context.Context +} diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..9daeafb --- /dev/null +++ b/test/.gitignore @@ -0,0 +1 @@ +test diff --git a/test/main.go b/test/main.go index 9b27c8f..1e25e38 100644 --- a/test/main.go +++ b/test/main.go @@ -12,23 +12,33 @@ import ( "github.com/imperfect-fourth/work/producer" "github.com/imperfect-fourth/work/transformer" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" ) func producerFn() ([]int, error) { fmt.Println("producing") return []int{0, 1, 2, 3, 4}, nil } -func consumerFn(i int) error { +func consumerFn(ctx context.Context, i int) error { + span := trace.SpanFromContext(ctx) + span.SetAttributes( + attribute.Int("i", i), + ) fmt.Println(i) time.Sleep(1 * time.Second) return nil } -func transformerFn(i int) (int, error) { +func transformerFn(ctx context.Context, i int) (int, error) { + span := trace.SpanFromContext(ctx) + span.SetAttributes( + attribute.Int("i", i), + ) time.Sleep(1 * time.Second) return i + 1, nil } @@ -64,7 +74,7 @@ func main() { c.Work() } -func startTracing() (*trace.TracerProvider, error) { +func startTracing() (*sdktrace.TracerProvider, error) { headers := map[string]string{ "content-type": "application/json", } @@ -82,14 +92,14 @@ func startTracing() (*trace.TracerProvider, error) { return nil, fmt.Errorf("creating new exporter: %w", err) } - tracerprovider := trace.NewTracerProvider( - trace.WithBatcher( + tracerprovider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher( exporter, - trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize), - trace.WithBatchTimeout(trace.DefaultScheduleDelay*time.Millisecond), - trace.WithMaxExportBatchSize(trace.DefaultMaxExportBatchSize), + sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize), + sdktrace.WithBatchTimeout(sdktrace.DefaultScheduleDelay*time.Millisecond), + sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize), ), - trace.WithResource( + sdktrace.WithResource( resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String("test"), diff --git a/test/test b/test/test deleted file mode 100755 index f6f818a..0000000 Binary files a/test/test and /dev/null differ diff --git a/transformer/transformer.go b/transformer/transformer.go index fd3a583..e34d31c 100644 --- a/transformer/transformer.go +++ b/transformer/transformer.go @@ -1,6 +1,8 @@ package transformer import ( + "context" + "github.com/imperfect-fourth/work/job" "go.opentelemetry.io/otel" ) @@ -15,7 +17,7 @@ type Transformer interface { setSpanName(string) } -func New[In any, Out any](name string, in job.Queue[In], fn func(In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) { +func New[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) { t := &transformer[In, Out]{ name: name, spanName: "transform job", @@ -35,7 +37,7 @@ type transformer[In any, Out any] struct { name string spanName string - fn func(In) (Out, error) + fn func(context.Context, In) (Out, error) in job.Queue[In] out job.Queue[Out] err job.Queue[error] @@ -48,8 +50,8 @@ func (t transformer[In, Out]) TransformOnce() { ctx, span := otel.Tracer(t.name).Start(j.Context(), t.name) defer span.End() - _, fnspan := otel.Tracer(t.name).Start(ctx, t.spanName) - o, err := t.fn(j.Input()) + fnctx, fnspan := otel.Tracer(t.name).Start(ctx, t.spanName) + o, err := t.fn(fnctx, j.Input()) fnspan.End() if err != nil { _, errJob := job.New(ctx, err) diff --git a/work.go b/work.go index 0dddfe6..ecdc5fb 100644 --- a/work.go +++ b/work.go @@ -1,6 +1,8 @@ package work import ( + "context" + "github.com/imperfect-fourth/work/consumer" "github.com/imperfect-fourth/work/job" "github.com/imperfect-fourth/work/producer" @@ -11,10 +13,10 @@ func NewProducer[Out any](name string, fn func() ([]Out, error), opts ...produce return producer.New(name, fn, opts...) } -func NewTransformer[In any, Out any](name string, in job.Queue[In], fn func(In) (Out, error), opts ...transformer.Option) (transformer.Transformer, job.Queue[Out], job.Queue[error]) { +func NewTransformer[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...transformer.Option) (transformer.Transformer, job.Queue[Out], job.Queue[error]) { return transformer.New(name, in, fn, opts...) } -func NewConsumer[In any](name string, in job.Queue[In], fn func(In) error, opts ...consumer.Option) (consumer.Consumer, job.Queue[error]) { +func NewConsumer[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...consumer.Option) (consumer.Consumer, job.Queue[error]) { return consumer.New(name, in, fn, opts...) }