From c50e461bdfd111a276ba733a40c291594dbb2f2f Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 5 Jan 2026 14:58:27 -0700 Subject: [PATCH 01/19] Adds chipingress batching client --- pkg/chipingress/batch/client.go | 166 ++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 pkg/chipingress/batch/client.go diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go new file mode 100644 index 000000000..729c3cb17 --- /dev/null +++ b/pkg/chipingress/batch/client.go @@ -0,0 +1,166 @@ +package batch + +import ( + "context" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +type BatchClient struct { + client chipingress.Client + batchSize int + maxConcurrentSends chan struct{} + batchTimeout time.Duration + compressionType string + messageBuffer chan *chipingress.CloudEventPb + shutdownChan chan struct{} +} + +type Opt func(*BatchClient) + +func NewBatchClient(client chipingress.Client, opts ...Opt) (*BatchClient, error) { + + c := &BatchClient{ + client: client, + batchSize: 1, + maxConcurrentSends: make(chan struct{}, 1), + messageBuffer: make(chan *chipingress.CloudEventPb, 1000), + batchTimeout: 100 * time.Millisecond, + compressionType: "gzip", + shutdownChan: make(chan struct{}), + } + + for _, opt := range opts { + opt(c) + } + + return c, nil +} + +func (b *BatchClient) Start(ctx context.Context) { + go func() { + + batch := make([]*chipingress.CloudEventPb, 0, b.batchSize) + timer := time.NewTimer(b.batchTimeout) + timer.Stop() + + for { + select { + case <-ctx.Done(): + b.flush(batch) + close(b.shutdownChan) + return + case <-b.shutdownChan: + b.flush(batch) + return + case event := <-b.messageBuffer: + + if len(batch) == 0 { + timer.Reset(b.batchTimeout) + } + + batch = append(batch, event) + + if len(batch) >= b.batchSize { + batchToSend := batch + batch = make([]*chipingress.CloudEventPb, 0, b.batchSize) + timer.Stop() + b.sendBatch(ctx, batchToSend) + } + case <-timer.C: + if len(batch) > 0 { + batchToSend := batch + batch = make([]*chipingress.CloudEventPb, 0, b.batchSize) + b.sendBatch(ctx, batchToSend) + } + } + } + }() +} + +func (b *BatchClient) Stop() { + close(b.shutdownChan) + // wait for pending sends by getting all semaphore slots + for range cap(b.maxConcurrentSends) { + b.maxConcurrentSends <- struct{}{} + } +} + +// QueueMessage queues a single message to the batch client. +// Returns immediately with no blocking - drops message if channel is full. +// Returns true if message was queued, false if it was dropped. +func (b *BatchClient) QueueMessage(event *chipingress.CloudEventPb) bool { + + if event == nil { + return false + } + + select { + case b.messageBuffer <- event: + return true + default: + return false + } +} + +func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.CloudEventPb) { + + if len(events) == 0 { + return + } + + batch := make([]*chipingress.CloudEventPb, 0, len(events)) + for _, event := range events { + batch = append(batch, event) + } + + b.maxConcurrentSends <- struct{}{} + + go func() { + defer func() { <-b.maxConcurrentSends }() + b.client.PublishBatch(ctx, &chipingress.CloudEventBatch{Events: batch}) + }() +} + +func (b *BatchClient) flush(batch []*chipingress.CloudEventPb) { + + if len(batch) == 0 { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + b.sendBatch(ctx, batch) +} + +func WithBatchSize(batchSize int) Opt { + return func(c *BatchClient) { + c.batchSize = batchSize + } +} + +func WithMaxConcurrentSends(maxConcurrentSends int) Opt { + return func(c *BatchClient) { + c.maxConcurrentSends = make(chan struct{}, maxConcurrentSends) + } +} + +func WithBatchTimeout(batchTimeout time.Duration) Opt { + return func(c *BatchClient) { + c.batchTimeout = batchTimeout + } +} + +func WithCompressionType(compressionType string) Opt { + return func(c *BatchClient) { + c.compressionType = compressionType + } +} + +func WithMessageBuffer(messageBufferSize int) Opt { + return func(c *BatchClient) { + c.messageBuffer = make(chan *chipingress.CloudEventPb, messageBufferSize) + } +} From 92e5a613b4aa44d379cdc904e74533d66743b792 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 5 Jan 2026 14:58:41 -0700 Subject: [PATCH 02/19] Adds unit-tests FIxes test --- pkg/chipingress/batch/client_test.go | 433 +++++++++++++++++++++++++++ 1 file changed, 433 insertions(+) create mode 100644 pkg/chipingress/batch/client_test.go diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go new file mode 100644 index 000000000..5b9c871aa --- /dev/null +++ b/pkg/chipingress/batch/client_test.go @@ -0,0 +1,433 @@ +package batch + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNewBatchClient(t *testing.T) { + + t.Run("NewBatchClient", func(t *testing.T) { + client, err := NewBatchClient(nil) + require.NoError(t, err) + assert.NotNil(t, client) + }) + + t.Run("WithBatchSize", func(t *testing.T) { + client, err := NewBatchClient(nil, WithBatchSize(100)) + require.NoError(t, err) + assert.Equal(t, 100, client.batchSize) + }) + + t.Run("WithMaxConcurrentSends", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMaxConcurrentSends(10)) + require.NoError(t, err) + assert.Equal(t, 10, cap(client.maxConcurrentSends)) + }) + + t.Run("WithBatchTimeout", func(t *testing.T) { + client, err := NewBatchClient(nil, WithBatchTimeout(100*time.Millisecond)) + require.NoError(t, err) + assert.Equal(t, 100*time.Millisecond, client.batchTimeout) + }) + + t.Run("WithCompressionType", func(t *testing.T) { + client, err := NewBatchClient(nil, WithCompressionType("gzip")) + require.NoError(t, err) + assert.Equal(t, "gzip", client.compressionType) + }) + + t.Run("WithMessageBuffer", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMessageBuffer(1000)) + require.NoError(t, err) + assert.Equal(t, 1000, cap(client.messageBuffer)) + }) +} + +func TestQueueMessage(t *testing.T) { + + t.Run("successfully queues a message", func(t *testing.T) { + + client, err := NewBatchClient(nil, WithMessageBuffer(5)) + require.NoError(t, err) + + event := &chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + } + + client.QueueMessage(event) + + assert.Equal(t, 1, len(client.messageBuffer)) + + received := <-client.messageBuffer + assert.Equal(t, event.Id, received.Id) + assert.Equal(t, event.Source, received.Source) + assert.Equal(t, event.Type, received.Type) + }) + + t.Run("drops message if buffer is full", func(t *testing.T) { + + client, err := NewBatchClient(nil, WithMessageBuffer(1)) + require.NoError(t, err) + require.NotNil(t, client) + + event := &chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + } + + client.QueueMessage(event) + client.QueueMessage(event) + + assert.Equal(t, 1, len(client.messageBuffer)) + + received := <-client.messageBuffer + assert.Equal(t, event.Id, received.Id) + assert.Equal(t, event.Source, received.Source) + assert.Equal(t, event.Type, received.Type) + }) + + t.Run("handles nil event", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMessageBuffer(5)) + require.NoError(t, err) + + client.QueueMessage(nil) + assert.Equal(t, 0, len(client.messageBuffer)) + }) +} + +func TestSendBatch(t *testing.T) { + + t.Run("successfully sends a batch", func(t *testing.T) { + + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + // verify the batch contains the expected events + return len(batch.Events) == 3 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" && + batch.Events[2].Id == "test-id-3" + }), + ). + Return(&chipingress.PublishResponse{}, nil).Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + require.NoError(t, err) + + events := []*chipingress.CloudEventPb{ + {Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, + {Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, + {Id: "test-id-3", Source: "test-source", Type: "test.event.type"}, + } + + client.sendBatch(t.Context(), events) + + // wait for the internal goroutine to complete + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("doesn't publish empty batch", func(t *testing.T) { + + mockClient := mocks.NewClient(t) + + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + require.NoError(t, err) + + client.sendBatch(t.Context(), []*chipingress.CloudEventPb{}) + + mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything) + }) + + t.Run("sends multiple batches successfully", func(t *testing.T) { + mockClient := mocks.NewClient(t) + + done := make(chan struct{}) + callCount := 0 + + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { + callCount++ + if callCount == 3 { + close(done) + } + }). + Times(3) + + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + require.NoError(t, err) + + batch1 := []*chipingress.CloudEventPb{ + {Id: "batch1-id-1", Source: "test-source", Type: "test.event.type"}, + } + batch2 := []*chipingress.CloudEventPb{ + {Id: "batch2-id-1", Source: "test-source", Type: "test.event.type"}, + {Id: "batch2-id-2", Source: "test-source", Type: "test.event.type"}, + } + batch3 := []*chipingress.CloudEventPb{ + {Id: "batch3-id-1", Source: "test-source", Type: "test.event.type"}, + } + + client.sendBatch(context.Background(), batch1) + client.sendBatch(context.Background(), batch2) + client.sendBatch(context.Background(), batch3) + + // wait for the internal goroutines to complete + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for multiple batches to be sent") + } + + mockClient.AssertExpectations(t) + }) +} + +func TestStart(t *testing.T) { + + t.Run("batch size trigger", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 3 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" && + batch.Events[2].Id == "test-id-3" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchTimeout(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("timeout trigger", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(50*time.Millisecond)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent after timeout") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("context cancellation flushes pending batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.MatchedBy(func(ctx context.Context) bool { + return ctx != nil + }), + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + + time.Sleep(10 * time.Millisecond) + + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for flush on context cancellation") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("stop flushes pending batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(100*time.Millisecond), WithMessageBuffer(10)) + require.NoError(t, err) + + client.Start(t.Context()) + + queued1 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) + queued2 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + require.True(t, queued1) + require.True(t, queued2) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch timeout to trigger") + } + + client.Stop() + + mockClient.AssertExpectations(t) + }) + + t.Run("no flush when batch is empty", func(t *testing.T) { + + mockClient := mocks.NewClient(t) + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + client.Start(ctx) + + time.Sleep(10 * time.Millisecond) + + cancel() + + time.Sleep(50 * time.Millisecond) + + mockClient.AssertNotCalled(t, "PublishBatch") + }) + + t.Run("multiple batches via size trigger", func(t *testing.T) { + + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callCount := 0 + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { + callCount++ + if callCount == 3 { + close(done) + } + }). + Times(3) + + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchTimeout(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + for i := 1; i <= 6; i++ { + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-" + strconv.Itoa(i), + Source: "test-source", + Type: "test.event.type", + }) + } + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for multiple batches to be sent") + } + + mockClient.AssertExpectations(t) + }) +} From 703498f7eece70986f29f3f790f2e3839f2515bf Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 5 Jan 2026 15:11:39 -0700 Subject: [PATCH 03/19] Removes copying of slice --- pkg/chipingress/batch/client.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 729c3cb17..7a22da66d 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -110,16 +110,11 @@ func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.Cloud return } - batch := make([]*chipingress.CloudEventPb, 0, len(events)) - for _, event := range events { - batch = append(batch, event) - } - b.maxConcurrentSends <- struct{}{} go func() { defer func() { <-b.maxConcurrentSends }() - b.client.PublishBatch(ctx, &chipingress.CloudEventBatch{Events: batch}) + b.client.PublishBatch(ctx, &chipingress.CloudEventBatch{Events: events}) }() } From 7ac8912f13cff80a2d5df67cb497b8f6be588a74 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 5 Jan 2026 15:19:55 -0700 Subject: [PATCH 04/19] Adds zap logger --- pkg/chipingress/batch/client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 7a22da66d..5e10dee18 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "go.uber.org/zap" ) type BatchClient struct { @@ -15,6 +16,7 @@ type BatchClient struct { compressionType string messageBuffer chan *chipingress.CloudEventPb shutdownChan chan struct{} + log *zap.SugaredLogger } type Opt func(*BatchClient) @@ -114,7 +116,10 @@ func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.Cloud go func() { defer func() { <-b.maxConcurrentSends }() - b.client.PublishBatch(ctx, &chipingress.CloudEventBatch{Events: events}) + _, err := b.client.PublishBatch(ctx, &chipingress.CloudEventBatch{Events: events}) + if err != nil { + b.log.Errorw("failed to publish batch", "error", err) + } }() } From 24a80d6d4f58834c98f5a700b2fd394cda69727a Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 5 Jan 2026 15:28:07 -0700 Subject: [PATCH 05/19] Fixes mod file --- pkg/chipingress/client_test.go | 2 +- pkg/chipingress/go.mod | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 979bf02cb..ba8649f42 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -115,7 +115,7 @@ func TestNewEvent(t *testing.T) { assert.Equal(t, "example-subject", event.Subject()) assert.Equal(t, attributes["time"].(time.Time).UTC(), event.Time()) assert.NotEmpty(t, event.Extensions()["recordedtime"]) - assert.True(t, event.Extensions()["recordedtime"].(ce.Timestamp).Time.After(attributes["time"].(time.Time))) + assert.True(t, event.Extensions()["recordedtime"].(ce.Timestamp).After(attributes["time"].(time.Time))) // Assert the event data was set as expected var resultProto pb.PingResponse diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 5599238cb..716f610e3 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -12,6 +12,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 + go.uber.org/zap v1.27.0 google.golang.org/grpc v1.75.0 google.golang.org/protobuf v1.36.8 ) @@ -28,7 +29,6 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect From 3cff2cc646b134a4173419d021e27346b12e6078 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Wed, 7 Jan 2026 08:34:21 -0700 Subject: [PATCH 06/19] FIxes linting --- pkg/chipingress/batch/client.go | 37 ++++++++++++---------------- pkg/chipingress/batch/client_test.go | 21 +++++----------- pkg/chipingress/go.mod | 1 + 3 files changed, 23 insertions(+), 36 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 5e10dee18..77303956c 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -4,11 +4,12 @@ import ( "context" "time" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "go.uber.org/zap" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) -type BatchClient struct { +type Client struct { client chipingress.Client batchSize int maxConcurrentSends chan struct{} @@ -19,11 +20,10 @@ type BatchClient struct { log *zap.SugaredLogger } -type Opt func(*BatchClient) +type Opt func(*Client) -func NewBatchClient(client chipingress.Client, opts ...Opt) (*BatchClient, error) { - - c := &BatchClient{ +func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { + c := &Client{ client: client, batchSize: 1, maxConcurrentSends: make(chan struct{}, 1), @@ -40,9 +40,8 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*BatchClient, error return c, nil } -func (b *BatchClient) Start(ctx context.Context) { +func (b *Client) Start(ctx context.Context) { go func() { - batch := make([]*chipingress.CloudEventPb, 0, b.batchSize) timer := time.NewTimer(b.batchTimeout) timer.Stop() @@ -57,7 +56,6 @@ func (b *BatchClient) Start(ctx context.Context) { b.flush(batch) return case event := <-b.messageBuffer: - if len(batch) == 0 { timer.Reset(b.batchTimeout) } @@ -81,7 +79,7 @@ func (b *BatchClient) Start(ctx context.Context) { }() } -func (b *BatchClient) Stop() { +func (b *Client) Stop() { close(b.shutdownChan) // wait for pending sends by getting all semaphore slots for range cap(b.maxConcurrentSends) { @@ -92,8 +90,7 @@ func (b *BatchClient) Stop() { // QueueMessage queues a single message to the batch client. // Returns immediately with no blocking - drops message if channel is full. // Returns true if message was queued, false if it was dropped. -func (b *BatchClient) QueueMessage(event *chipingress.CloudEventPb) bool { - +func (b *Client) QueueMessage(event *chipingress.CloudEventPb) bool { if event == nil { return false } @@ -106,8 +103,7 @@ func (b *BatchClient) QueueMessage(event *chipingress.CloudEventPb) bool { } } -func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.CloudEventPb) { - +func (b *Client) sendBatch(ctx context.Context, events []*chipingress.CloudEventPb) { if len(events) == 0 { return } @@ -123,8 +119,7 @@ func (b *BatchClient) sendBatch(ctx context.Context, events []*chipingress.Cloud }() } -func (b *BatchClient) flush(batch []*chipingress.CloudEventPb) { - +func (b *Client) flush(batch []*chipingress.CloudEventPb) { if len(batch) == 0 { return } @@ -136,31 +131,31 @@ func (b *BatchClient) flush(batch []*chipingress.CloudEventPb) { } func WithBatchSize(batchSize int) Opt { - return func(c *BatchClient) { + return func(c *Client) { c.batchSize = batchSize } } func WithMaxConcurrentSends(maxConcurrentSends int) Opt { - return func(c *BatchClient) { + return func(c *Client) { c.maxConcurrentSends = make(chan struct{}, maxConcurrentSends) } } func WithBatchTimeout(batchTimeout time.Duration) Opt { - return func(c *BatchClient) { + return func(c *Client) { c.batchTimeout = batchTimeout } } func WithCompressionType(compressionType string) Opt { - return func(c *BatchClient) { + return func(c *Client) { c.compressionType = compressionType } } func WithMessageBuffer(messageBufferSize int) Opt { - return func(c *BatchClient) { + return func(c *Client) { c.messageBuffer = make(chan *chipingress.CloudEventPb, messageBufferSize) } } diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 5b9c871aa..7ee9ccd60 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -6,15 +6,15 @@ import ( "testing" "time" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" ) func TestNewBatchClient(t *testing.T) { - t.Run("NewBatchClient", func(t *testing.T) { client, err := NewBatchClient(nil) require.NoError(t, err) @@ -53,9 +53,7 @@ func TestNewBatchClient(t *testing.T) { } func TestQueueMessage(t *testing.T) { - t.Run("successfully queues a message", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(5)) require.NoError(t, err) @@ -67,7 +65,7 @@ func TestQueueMessage(t *testing.T) { client.QueueMessage(event) - assert.Equal(t, 1, len(client.messageBuffer)) + assert.Len(t, client.messageBuffer, 1) received := <-client.messageBuffer assert.Equal(t, event.Id, received.Id) @@ -76,7 +74,6 @@ func TestQueueMessage(t *testing.T) { }) t.Run("drops message if buffer is full", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1)) require.NoError(t, err) require.NotNil(t, client) @@ -90,7 +87,7 @@ func TestQueueMessage(t *testing.T) { client.QueueMessage(event) client.QueueMessage(event) - assert.Equal(t, 1, len(client.messageBuffer)) + assert.Len(t, client.messageBuffer, 1) received := <-client.messageBuffer assert.Equal(t, event.Id, received.Id) @@ -103,14 +100,12 @@ func TestQueueMessage(t *testing.T) { require.NoError(t, err) client.QueueMessage(nil) - assert.Equal(t, 0, len(client.messageBuffer)) + assert.Empty(t, client.messageBuffer) }) } func TestSendBatch(t *testing.T) { - t.Run("successfully sends a batch", func(t *testing.T) { - mockClient := mocks.NewClient(t) done := make(chan struct{}) @@ -150,7 +145,6 @@ func TestSendBatch(t *testing.T) { }) t.Run("doesn't publish empty batch", func(t *testing.T) { - mockClient := mocks.NewClient(t) client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) @@ -208,7 +202,6 @@ func TestSendBatch(t *testing.T) { } func TestStart(t *testing.T) { - t.Run("batch size trigger", func(t *testing.T) { mockClient := mocks.NewClient(t) done := make(chan struct{}) @@ -366,7 +359,6 @@ func TestStart(t *testing.T) { }) t.Run("no flush when batch is empty", func(t *testing.T) { - mockClient := mocks.NewClient(t) client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) @@ -385,7 +377,6 @@ func TestStart(t *testing.T) { }) t.Run("multiple batches via size trigger", func(t *testing.T) { - mockClient := mocks.NewClient(t) done := make(chan struct{}) callCount := 0 diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 716f610e3..4ca89da49 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -36,4 +36,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +// Retracted due to initial development versions retract [v1.0.0, v1.0.1] From f4fadd3747a52e7156909ca5a0e188ed1af44344 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 12 Jan 2026 09:43:59 -0700 Subject: [PATCH 07/19] Rename timeout vars --- pkg/chipingress/batch/client.go | 20 +++++++++++++------- pkg/chipingress/batch/client_test.go | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 77303956c..d86878d62 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -13,7 +13,8 @@ type Client struct { client chipingress.Client batchSize int maxConcurrentSends chan struct{} - batchTimeout time.Duration + batchInterval time.Duration + maxPublishTimeout time.Duration compressionType string messageBuffer chan *chipingress.CloudEventPb shutdownChan chan struct{} @@ -28,7 +29,8 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { batchSize: 1, maxConcurrentSends: make(chan struct{}, 1), messageBuffer: make(chan *chipingress.CloudEventPb, 1000), - batchTimeout: 100 * time.Millisecond, + batchInterval: 100 * time.Millisecond, + maxPublishTimeout: 5 * time.Second, compressionType: "gzip", shutdownChan: make(chan struct{}), } @@ -43,7 +45,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { func (b *Client) Start(ctx context.Context) { go func() { batch := make([]*chipingress.CloudEventPb, 0, b.batchSize) - timer := time.NewTimer(b.batchTimeout) + timer := time.NewTimer(b.batchInterval) timer.Stop() for { @@ -57,7 +59,7 @@ func (b *Client) Start(ctx context.Context) { return case event := <-b.messageBuffer: if len(batch) == 0 { - timer.Reset(b.batchTimeout) + timer.Reset(b.batchInterval) } batch = append(batch, event) @@ -112,7 +114,11 @@ func (b *Client) sendBatch(ctx context.Context, events []*chipingress.CloudEvent go func() { defer func() { <-b.maxConcurrentSends }() - _, err := b.client.PublishBatch(ctx, &chipingress.CloudEventBatch{Events: events}) + + ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) + defer cancel() + + _, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events}) if err != nil { b.log.Errorw("failed to publish batch", "error", err) } @@ -124,7 +130,7 @@ func (b *Client) flush(batch []*chipingress.CloudEventPb) { return } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), b.maxPublishTimeout) defer cancel() b.sendBatch(ctx, batch) @@ -144,7 +150,7 @@ func WithMaxConcurrentSends(maxConcurrentSends int) Opt { func WithBatchTimeout(batchTimeout time.Duration) Opt { return func(c *Client) { - c.batchTimeout = batchTimeout + c.batchInterval = batchTimeout } } diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 7ee9ccd60..246051974 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -36,7 +36,7 @@ func TestNewBatchClient(t *testing.T) { t.Run("WithBatchTimeout", func(t *testing.T) { client, err := NewBatchClient(nil, WithBatchTimeout(100*time.Millisecond)) require.NoError(t, err) - assert.Equal(t, 100*time.Millisecond, client.batchTimeout) + assert.Equal(t, 100*time.Millisecond, client.batchInterval) }) t.Run("WithCompressionType", func(t *testing.T) { From 4f10dfa6a8e51dc740a13d581ff56b969a7f273d Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 12 Jan 2026 10:01:00 -0700 Subject: [PATCH 08/19] Adds callback mechanism --- pkg/chipingress/batch/client.go | 65 +++- pkg/chipingress/batch/client_test.go | 449 +++++++++++++++++++++++++-- 2 files changed, 464 insertions(+), 50 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index d86878d62..4e0cd66cc 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -9,14 +9,19 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) +type messageWithCallback struct { + event *chipingress.CloudEventPb + callback func(error) +} + type Client struct { client chipingress.Client batchSize int maxConcurrentSends chan struct{} batchInterval time.Duration - maxPublishTimeout time.Duration + maxPublishTimeout time.Duration compressionType string - messageBuffer chan *chipingress.CloudEventPb + messageBuffer chan *messageWithCallback shutdownChan chan struct{} log *zap.SugaredLogger } @@ -28,9 +33,9 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { client: client, batchSize: 1, maxConcurrentSends: make(chan struct{}, 1), - messageBuffer: make(chan *chipingress.CloudEventPb, 1000), + messageBuffer: make(chan *messageWithCallback, 1000), batchInterval: 100 * time.Millisecond, - maxPublishTimeout: 5 * time.Second, + maxPublishTimeout: 5 * time.Second, compressionType: "gzip", shutdownChan: make(chan struct{}), } @@ -44,7 +49,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { func (b *Client) Start(ctx context.Context) { go func() { - batch := make([]*chipingress.CloudEventPb, 0, b.batchSize) + batch := make([]*messageWithCallback, 0, b.batchSize) timer := time.NewTimer(b.batchInterval) timer.Stop() @@ -57,23 +62,23 @@ func (b *Client) Start(ctx context.Context) { case <-b.shutdownChan: b.flush(batch) return - case event := <-b.messageBuffer: + case msg := <-b.messageBuffer: if len(batch) == 0 { timer.Reset(b.batchInterval) } - batch = append(batch, event) + batch = append(batch, msg) if len(batch) >= b.batchSize { batchToSend := batch - batch = make([]*chipingress.CloudEventPb, 0, b.batchSize) + batch = make([]*messageWithCallback, 0, b.batchSize) timer.Stop() b.sendBatch(ctx, batchToSend) } case <-timer.C: if len(batch) > 0 { batchToSend := batch - batch = make([]*chipingress.CloudEventPb, 0, b.batchSize) + batch = make([]*messageWithCallback, 0, b.batchSize) b.sendBatch(ctx, batchToSend) } } @@ -89,24 +94,32 @@ func (b *Client) Stop() { } } -// QueueMessage queues a single message to the batch client. +// QueueMessage queues a single message to the batch client with an optional callback. +// The callback will be invoked after the batch containing this message is sent. +// The callback receives an error parameter (nil on success). +// Callbacks are invoked from goroutines // Returns immediately with no blocking - drops message if channel is full. // Returns true if message was queued, false if it was dropped. -func (b *Client) QueueMessage(event *chipingress.CloudEventPb) bool { +func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) bool { if event == nil { return false } + msg := &messageWithCallback{ + event: event, + callback: callback, + } + select { - case b.messageBuffer <- event: + case b.messageBuffer <- msg: return true default: return false } } -func (b *Client) sendBatch(ctx context.Context, events []*chipingress.CloudEventPb) { - if len(events) == 0 { +func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) { + if len(messages) == 0 { return } @@ -118,14 +131,26 @@ func (b *Client) sendBatch(ctx context.Context, events []*chipingress.CloudEvent ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) defer cancel() + events := make([]*chipingress.CloudEventPb, len(messages)) + for i, msg := range messages { + events[i] = msg.event + } + _, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events}) - if err != nil { + if err != nil && b.log != nil { b.log.Errorw("failed to publish batch", "error", err) } + + // Invoke callbacks for all messages in the batch + for _, msg := range messages { + if msg.callback != nil { + msg.callback(err) + } + } }() } -func (b *Client) flush(batch []*chipingress.CloudEventPb) { +func (b *Client) flush(batch []*messageWithCallback) { if len(batch) == 0 { return } @@ -162,6 +187,12 @@ func WithCompressionType(compressionType string) Opt { func WithMessageBuffer(messageBufferSize int) Opt { return func(c *Client) { - c.messageBuffer = make(chan *chipingress.CloudEventPb, messageBufferSize) + c.messageBuffer = make(chan *messageWithCallback, messageBufferSize) + } +} + +func WithLogger(log *zap.SugaredLogger) Opt { + return func(c *Client) { + c.log = log } } diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 246051974..001bb5637 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -63,14 +63,14 @@ func TestQueueMessage(t *testing.T) { Type: "test.event.type", } - client.QueueMessage(event) + client.QueueMessage(event, nil) assert.Len(t, client.messageBuffer, 1) received := <-client.messageBuffer - assert.Equal(t, event.Id, received.Id) - assert.Equal(t, event.Source, received.Source) - assert.Equal(t, event.Type, received.Type) + assert.Equal(t, event.Id, received.event.Id) + assert.Equal(t, event.Source, received.event.Source) + assert.Equal(t, event.Type, received.event.Type) }) t.Run("drops message if buffer is full", func(t *testing.T) { @@ -84,22 +84,22 @@ func TestQueueMessage(t *testing.T) { Type: "test.event.type", } - client.QueueMessage(event) - client.QueueMessage(event) + client.QueueMessage(event, nil) + client.QueueMessage(event, nil) assert.Len(t, client.messageBuffer, 1) received := <-client.messageBuffer - assert.Equal(t, event.Id, received.Id) - assert.Equal(t, event.Source, received.Source) - assert.Equal(t, event.Type, received.Type) + assert.Equal(t, event.Id, received.event.Id) + assert.Equal(t, event.Source, received.event.Source) + assert.Equal(t, event.Type, received.event.Type) }) t.Run("handles nil event", func(t *testing.T) { client, err := NewBatchClient(nil, WithMessageBuffer(5)) require.NoError(t, err) - client.QueueMessage(nil) + client.QueueMessage(nil, nil) assert.Empty(t, client.messageBuffer) }) } @@ -126,13 +126,13 @@ func TestSendBatch(t *testing.T) { client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) - events := []*chipingress.CloudEventPb{ - {Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, - {Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, - {Id: "test-id-3", Source: "test-source", Type: "test.event.type"}, + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}}, + {event: &chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}}, + {event: &chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}}, } - client.sendBatch(t.Context(), events) + client.sendBatch(t.Context(), messages) // wait for the internal goroutine to complete select { @@ -150,7 +150,7 @@ func TestSendBatch(t *testing.T) { client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) - client.sendBatch(t.Context(), []*chipingress.CloudEventPb{}) + client.sendBatch(t.Context(), []*messageWithCallback{}) mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything) }) @@ -175,15 +175,15 @@ func TestSendBatch(t *testing.T) { client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) - batch1 := []*chipingress.CloudEventPb{ - {Id: "batch1-id-1", Source: "test-source", Type: "test.event.type"}, + batch1 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "batch1-id-1", Source: "test-source", Type: "test.event.type"}}, } - batch2 := []*chipingress.CloudEventPb{ - {Id: "batch2-id-1", Source: "test-source", Type: "test.event.type"}, - {Id: "batch2-id-2", Source: "test-source", Type: "test.event.type"}, + batch2 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "batch2-id-1", Source: "test-source", Type: "test.event.type"}}, + {event: &chipingress.CloudEventPb{Id: "batch2-id-2", Source: "test-source", Type: "test.event.type"}}, } - batch3 := []*chipingress.CloudEventPb{ - {Id: "batch3-id-1", Source: "test-source", Type: "test.event.type"}, + batch3 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "batch3-id-1", Source: "test-source", Type: "test.event.type"}}, } client.sendBatch(context.Background(), batch1) @@ -228,9 +228,9 @@ func TestStart(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}, nil) select { case <-done: @@ -266,8 +266,8 @@ func TestStart(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) select { case <-done: @@ -304,8 +304,8 @@ func TestStart(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) time.Sleep(10 * time.Millisecond) @@ -342,8 +342,8 @@ func TestStart(t *testing.T) { client.Start(t.Context()) - queued1 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}) - queued2 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}) + queued1 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + queued2 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) require.True(t, queued1) require.True(t, queued2) @@ -410,7 +410,7 @@ func TestStart(t *testing.T) { Id: "test-id-" + strconv.Itoa(i), Source: "test-source", Type: "test.event.type", - }) + }, nil) } select { @@ -422,3 +422,386 @@ func TestStart(t *testing.T) { mockClient.AssertExpectations(t) }) } + +func TestCallbacks(t *testing.T) { + t.Run("callback invoked on successful send", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + // wait for callback + select { + case err := <-callbackDone: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callback receives error on failed send", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + expectedErr := assert.AnError + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, expectedErr). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + // wait for batch to be sent + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + // wait for callback to be invoked with error + select { + case err := <-callbackDone: + assert.Error(t, err) + assert.Equal(t, expectedErr, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("nil callback works without panic", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + // Queue message with nil callback - should not panic + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, nil) + + // wait for batch to be sent + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("multiple messages with different callbacks", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callback1Done := make(chan error, 1) + callback2Done := make(chan error, 1) + callback3Done := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 3 + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(3)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callback1Done <- err + }) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-2", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callback2Done <- err + }) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-3", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callback3Done <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + select { + case err := <-callback1Done: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback 1") + } + + select { + case err := <-callback2Done: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback 2") + } + + select { + case err := <-callback3Done: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback 3") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callback invoked for timeout-triggered batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(50*time.Millisecond)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + // wait for callback + select { + case err := <-callbackDone: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callback invoked for size-triggered batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchTimeout(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, nil) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-2", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + select { + case err := <-callbackDone: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callbacks invoked on context cancellation", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + + client.Start(ctx) + + client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + time.Sleep(10 * time.Millisecond) + + // cancel context to trigger flush + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for flush on cancellation") + } + + select { + case err := <-callbackDone: + assert.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) +} From c717835f9b839cdfb8cdd652aa56b90b420cc812 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 12 Jan 2026 10:48:34 -0700 Subject: [PATCH 09/19] fixes linting --- pkg/chipingress/batch/client_test.go | 15 +++++++-------- pkg/chipingress/client_test.go | 1 - 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 001bb5637..918cecb7d 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -466,7 +466,7 @@ func TestCallbacks(t *testing.T) { // wait for callback select { case err := <-callbackDone: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback") } @@ -518,7 +518,6 @@ func TestCallbacks(t *testing.T) { // wait for callback to be invoked with error select { case err := <-callbackDone: - assert.Error(t, err) assert.Equal(t, expectedErr, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback") @@ -626,21 +625,21 @@ func TestCallbacks(t *testing.T) { select { case err := <-callback1Done: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback 1") } select { case err := <-callback2Done: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback 2") } select { case err := <-callback3Done: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback 3") } @@ -690,7 +689,7 @@ func TestCallbacks(t *testing.T) { // wait for callback select { case err := <-callbackDone: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback") } @@ -744,7 +743,7 @@ func TestCallbacks(t *testing.T) { select { case err := <-callbackDone: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback") } @@ -797,7 +796,7 @@ func TestCallbacks(t *testing.T) { select { case err := <-callbackDone: - assert.NoError(t, err) + require.NoError(t, err) case <-time.After(time.Second): t.Fatal("timeout waiting for callback") } diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index ba8649f42..3166a1e4b 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -282,7 +282,6 @@ func TestProtoToEvent(t *testing.T) { t.Run("conversion with nil protobuf event", func(t *testing.T) { // Test with nil input convertedEvent, err := ProtoToEvent(nil) - assert.Error(t, err) assert.Equal(t, CloudEvent{}, convertedEvent) assert.Contains(t, err.Error(), "could not convert proto to event") }) From cc94df3d3e8a06f251175395ffd11e0f2db4a7e4 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Tue, 13 Jan 2026 10:30:55 -0700 Subject: [PATCH 10/19] Move callback execution into separate goroutine stops semaphore blocking on callback execution to complete --- pkg/chipingress/batch/client.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 4e0cd66cc..7799a88ca 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -128,6 +128,7 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) go func() { defer func() { <-b.maxConcurrentSends }() + // this is specifically to prevent long running network calls ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) defer cancel() @@ -141,12 +142,18 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) b.log.Errorw("failed to publish batch", "error", err) } - // Invoke callbacks for all messages in the batch - for _, msg := range messages { - if msg.callback != nil { - msg.callback(err) + go func() { + for _, msg := range messages { + select { + case <-ctx.Done(): + return + default: + if msg.callback != nil { + msg.callback(err) + } + } } - } + }() }() } From 69806053bd8da5deeffa415cc4faea887c164461 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Tue, 13 Jan 2026 10:40:33 -0700 Subject: [PATCH 11/19] check for shutdown --- pkg/chipingress/batch/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 7799a88ca..1f65f4c2b 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -147,6 +147,8 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) select { case <-ctx.Done(): return + case <-b.shutdownChan: + return default: if msg.callback != nil { msg.callback(err) From ad4482f85351bec33e447c3994904f916c959e42 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 10:29:19 -0700 Subject: [PATCH 12/19] Ensures all callbacks complete after .Stop - ensures no panic if .Stop is called multiple times --- pkg/chipingress/batch/client.go | 57 ++++++++++++++++++---------- pkg/chipingress/batch/client_test.go | 16 ++++++++ pkg/chipingress/go.mod | 4 +- 3 files changed, 54 insertions(+), 23 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 1f65f4c2b..fca10a116 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -2,6 +2,7 @@ package batch import ( "context" + "sync" "time" "go.uber.org/zap" @@ -24,6 +25,9 @@ type Client struct { messageBuffer chan *messageWithCallback shutdownChan chan struct{} log *zap.SugaredLogger + callbackWg sync.WaitGroup + shutdownTimeout time.Duration + shutdownOnce sync.Once } type Opt func(*Client) @@ -31,6 +35,7 @@ type Opt func(*Client) func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { c := &Client{ client: client, + log: zap.NewNop().Sugar(), batchSize: 1, maxConcurrentSends: make(chan struct{}, 1), messageBuffer: make(chan *messageWithCallback, 1000), @@ -38,6 +43,8 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { maxPublishTimeout: 5 * time.Second, compressionType: "gzip", shutdownChan: make(chan struct{}), + callbackWg: sync.WaitGroup{}, + shutdownTimeout: 5 * time.Second, } for _, opt := range opts { @@ -57,7 +64,6 @@ func (b *Client) Start(ctx context.Context) { select { case <-ctx.Done(): b.flush(batch) - close(b.shutdownChan) return case <-b.shutdownChan: b.flush(batch) @@ -87,11 +93,26 @@ func (b *Client) Start(ctx context.Context) { } func (b *Client) Stop() { - close(b.shutdownChan) - // wait for pending sends by getting all semaphore slots - for range cap(b.maxConcurrentSends) { - b.maxConcurrentSends <- struct{}{} - } + b.shutdownOnce.Do(func() { + close(b.shutdownChan) + // wait for pending sends by getting all semaphore slots + for range cap(b.maxConcurrentSends) { + b.maxConcurrentSends <- struct{}{} + } + // wait for all callbacks to complete with timeout + done := make(chan struct{}) + go func() { + b.callbackWg.Wait() + close(done) + }() + + select { + case <-done: + // All callbacks completed successfully + case <-time.After(b.shutdownTimeout): + b.log.Warnw("timed out waiting for callbacks to complete", "timeout", b.shutdownTimeout) + } + }) } // QueueMessage queues a single message to the batch client with an optional callback. @@ -119,15 +140,16 @@ func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(err } func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) { + if len(messages) == 0 { return } + // acquire semaphore, limiting concurrent sends b.maxConcurrentSends <- struct{}{} go func() { defer func() { <-b.maxConcurrentSends }() - // this is specifically to prevent long running network calls ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) defer cancel() @@ -138,25 +160,20 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) } _, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events}) - if err != nil && b.log != nil { + if err != nil { b.log.Errorw("failed to publish batch", "error", err) } - - go func() { + // the callbacks are placed in their own goroutine to not block releasing the semaphore + // we use a wait group, to ensure all callbacks are completed if .Stop() is called. + b.callbackWg.Go(func() { for _, msg := range messages { - select { - case <-ctx.Done(): - return - case <-b.shutdownChan: - return - default: - if msg.callback != nil { - msg.callback(err) - } + if msg.callback != nil { + msg.callback(err) } } - }() + }) }() + } func (b *Client) flush(batch []*messageWithCallback) { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 918cecb7d..3ec1d312e 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -804,3 +804,19 @@ func TestCallbacks(t *testing.T) { mockClient.AssertExpectations(t) }) } + +func TestStop(t *testing.T) { + t.Run("can call Stop multiple times without panic", func(t *testing.T) { + mockClient := mocks.NewClient(t) + client, err := NewBatchClient(mockClient, WithBatchSize(10)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + client.Stop() + client.Stop() + client.Stop() + }) +} diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 4ca89da49..d8d4a7db1 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -1,8 +1,6 @@ module github.com/smartcontractkit/chainlink-common/pkg/chipingress -go 1.24.5 - -toolchain go1.24.10 +go 1.25.3 require ( github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.16.1 From 2ca60d399fb60d9d0a53c33e5ea85b13159e7c67 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 10:59:15 -0700 Subject: [PATCH 13/19] Removes compression type --- pkg/chipingress/batch/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index fca10a116..2cd846ece 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -41,7 +41,6 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { messageBuffer: make(chan *messageWithCallback, 1000), batchInterval: 100 * time.Millisecond, maxPublishTimeout: 5 * time.Second, - compressionType: "gzip", shutdownChan: make(chan struct{}), callbackWg: sync.WaitGroup{}, shutdownTimeout: 5 * time.Second, From 5670f84ba40e58de881cb71a182c1bd3ab06aada Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 11:59:53 -0700 Subject: [PATCH 14/19] Adjust configuration --- pkg/chipingress/batch/client.go | 17 +++++++++++------ pkg/chipingress/batch/client_test.go | 28 +++++++++++----------------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 2cd846ece..488255bdc 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -21,7 +21,6 @@ type Client struct { maxConcurrentSends chan struct{} batchInterval time.Duration maxPublishTimeout time.Duration - compressionType string messageBuffer chan *messageWithCallback shutdownChan chan struct{} log *zap.SugaredLogger @@ -36,9 +35,9 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { c := &Client{ client: client, log: zap.NewNop().Sugar(), - batchSize: 1, + batchSize: 10, maxConcurrentSends: make(chan struct{}, 1), - messageBuffer: make(chan *messageWithCallback, 1000), + messageBuffer: make(chan *messageWithCallback, 200), batchInterval: 100 * time.Millisecond, maxPublishTimeout: 5 * time.Second, shutdownChan: make(chan struct{}), @@ -198,15 +197,15 @@ func WithMaxConcurrentSends(maxConcurrentSends int) Opt { } } -func WithBatchTimeout(batchTimeout time.Duration) Opt { +func WithBatchInterval(batchTimeout time.Duration) Opt { return func(c *Client) { c.batchInterval = batchTimeout } } -func WithCompressionType(compressionType string) Opt { +func WithShutdownTimeout(shutdownTimeout time.Duration) Opt { return func(c *Client) { - c.compressionType = compressionType + c.shutdownTimeout = shutdownTimeout } } @@ -216,6 +215,12 @@ func WithMessageBuffer(messageBufferSize int) Opt { } } +func WithMaxPublishTimeout(maxPublishTimeout time.Duration) Opt { + return func(c *Client) { + c.maxPublishTimeout = maxPublishTimeout + } +} + func WithLogger(log *zap.SugaredLogger) Opt { return func(c *Client) { c.log = log diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 3ec1d312e..042bcfc2f 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -33,18 +33,12 @@ func TestNewBatchClient(t *testing.T) { assert.Equal(t, 10, cap(client.maxConcurrentSends)) }) - t.Run("WithBatchTimeout", func(t *testing.T) { - client, err := NewBatchClient(nil, WithBatchTimeout(100*time.Millisecond)) + t.Run("WithBatchInterval", func(t *testing.T) { + client, err := NewBatchClient(nil, WithBatchInterval(100*time.Millisecond)) require.NoError(t, err) assert.Equal(t, 100*time.Millisecond, client.batchInterval) }) - t.Run("WithCompressionType", func(t *testing.T) { - client, err := NewBatchClient(nil, WithCompressionType("gzip")) - require.NoError(t, err) - assert.Equal(t, "gzip", client.compressionType) - }) - t.Run("WithMessageBuffer", func(t *testing.T) { client, err := NewBatchClient(nil, WithMessageBuffer(1000)) require.NoError(t, err) @@ -220,7 +214,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchTimeout(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -258,7 +252,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(50*time.Millisecond)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -297,7 +291,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -337,7 +331,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(100*time.Millisecond), WithMessageBuffer(10)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) require.NoError(t, err) client.Start(t.Context()) @@ -361,7 +355,7 @@ func TestStart(t *testing.T) { t.Run("no flush when batch is empty", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -397,7 +391,7 @@ func TestStart(t *testing.T) { }). Times(3) - client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchTimeout(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -664,7 +658,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(50*time.Millisecond)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -713,7 +707,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchTimeout(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -768,7 +762,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchTimeout(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) From e8073bc3886a336bfaa139e326992efe1c9a679b Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 12:06:52 -0700 Subject: [PATCH 15/19] Return errors from queue message --- pkg/chipingress/batch/client.go | 14 +++++++---- pkg/chipingress/batch/client_test.go | 35 ++++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 488255bdc..6d5ff8fca 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -2,6 +2,7 @@ package batch import ( "context" + "errors" "sync" "time" @@ -118,10 +119,11 @@ func (b *Client) Stop() { // The callback receives an error parameter (nil on success). // Callbacks are invoked from goroutines // Returns immediately with no blocking - drops message if channel is full. -// Returns true if message was queued, false if it was dropped. -func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) bool { +// Returns an error if the message was dropped. +func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error { + if event == nil { - return false + return nil } msg := &messageWithCallback{ @@ -130,10 +132,12 @@ func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(err } select { + case <-b.shutdownChan: + return errors.New("client is shutdown") case b.messageBuffer <- msg: - return true + return nil default: - return false + return errors.New("message buffer is full") } } diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 042bcfc2f..0271856e0 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -338,8 +338,8 @@ func TestStart(t *testing.T) { queued1 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) queued2 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) - require.True(t, queued1) - require.True(t, queued2) + require.NoError(t, queued1) + require.NoError(t, queued2) select { case <-done: @@ -813,4 +813,35 @@ func TestStop(t *testing.T) { client.Stop() client.Stop() }) + + t.Run("QueueMessage returns error after Stop", func(t *testing.T) { + mockClient := mocks.NewClient(t) + client, err := NewBatchClient(mockClient, WithBatchSize(10)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + // Queue message before stop - should succeed + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, nil) + require.NoError(t, err) + + // Stop the client + client.Stop() + + // Queue message after stop - should fail + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-2", + Source: "test-source", + Type: "test.event.type", + }, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "shutdown") + }) } From c7404c021dcf4fb85503261472a18855755970f6 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 13:43:57 -0700 Subject: [PATCH 16/19] Adds buffer --- pkg/chipingress/batch/buffer.go | 72 +++++++++++ pkg/chipingress/batch/buffer_test.go | 183 +++++++++++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 pkg/chipingress/batch/buffer.go create mode 100644 pkg/chipingress/batch/buffer_test.go diff --git a/pkg/chipingress/batch/buffer.go b/pkg/chipingress/batch/buffer.go new file mode 100644 index 000000000..91d3dade9 --- /dev/null +++ b/pkg/chipingress/batch/buffer.go @@ -0,0 +1,72 @@ +package batch + +import ( + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +// messageBatch is a thread-safe buffer for accumulating messages before sending as a batch +type messageBatch struct { + messages []*messageWithCallback + mu sync.Mutex +} + +type messageWithCallback struct { + event *chipingress.CloudEventPb + callback func(error) +} + +// newMessageBatch creates a new messageBatch with the given initial capacity +func newMessageBatch(capacity int) *messageBatch { + return &messageBatch{ + messages: make([]*messageWithCallback, 0, capacity), + } +} + +// Add appends a message to the batch +func (b *messageBatch) Add(msg *messageWithCallback) { + b.mu.Lock() + defer b.mu.Unlock() + b.messages = append(b.messages, msg) +} + +// Len returns the current number of messages in the batch +func (b *messageBatch) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.messages) +} + +// Clear removes all messages from the batch and returns a copy of them +func (b *messageBatch) Clear() []*messageWithCallback { + b.mu.Lock() + defer b.mu.Unlock() + + if len(b.messages) == 0 { + return nil + } + + // Make a copy + result := make([]*messageWithCallback, len(b.messages)) + copy(result, b.messages) + + // Reset the internal slice + b.messages = b.messages[:0] + + return result +} + +// Values returns a copy of the current messages without clearing them +func (b *messageBatch) Values() []*messageWithCallback { + b.mu.Lock() + defer b.mu.Unlock() + + if len(b.messages) == 0 { + return nil + } + + result := make([]*messageWithCallback, len(b.messages)) + copy(result, b.messages) + return result +} diff --git a/pkg/chipingress/batch/buffer_test.go b/pkg/chipingress/batch/buffer_test.go new file mode 100644 index 000000000..1fc6290c8 --- /dev/null +++ b/pkg/chipingress/batch/buffer_test.go @@ -0,0 +1,183 @@ +package batch + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +func TestMessageBatch(t *testing.T) { + t.Run("newMessageBatch creates empty batch", func(t *testing.T) { + batch := newMessageBatch(10) + require.NotNil(t, batch) + assert.Equal(t, 0, batch.Len()) + }) + + t.Run("Add appends messages", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + msg2 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + } + + batch.Add(msg1) + assert.Equal(t, 1, batch.Len()) + + batch.Add(msg2) + assert.Equal(t, 2, batch.Len()) + }) + + t.Run("Clear returns copy and empties batch", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + msg2 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + } + + batch.Add(msg1) + batch.Add(msg2) + + result := batch.Clear() + require.NotNil(t, result) + assert.Len(t, result, 2) + assert.Equal(t, "test-1", result[0].event.Id) + assert.Equal(t, "test-2", result[1].event.Id) + + assert.Equal(t, 0, batch.Len()) + }) + + t.Run("Clear on empty batch returns nil", func(t *testing.T) { + batch := newMessageBatch(10) + result := batch.Clear() + assert.Nil(t, result) + }) + + t.Run("Values returns copy without clearing", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + msg2 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + } + + batch.Add(msg1) + batch.Add(msg2) + + result := batch.Values() + require.NotNil(t, result) + assert.Len(t, result, 2) + assert.Equal(t, "test-1", result[0].event.Id) + assert.Equal(t, "test-2", result[1].event.Id) + + // Batch should still have messages after Values + assert.Equal(t, 2, batch.Len()) + }) + + t.Run("Values on empty batch returns nil", func(t *testing.T) { + batch := newMessageBatch(10) + result := batch.Values() + assert.Nil(t, result) + }) + + t.Run("Values returns slice copy with same pointers", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + + batch.Add(msg1) + result := batch.Values() + require.Len(t, result, 1) + + assert.Equal(t, msg1, result[0]) + assert.Same(t, msg1.event, result[0].event) + + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + }) + assert.Len(t, result, 1) + assert.Equal(t, 2, batch.Len()) + }) + + t.Run("concurrent Add operations are safe", func(t *testing.T) { + batch := newMessageBatch(100) + var wg sync.WaitGroup + numGoroutines := 10 + messagesPerGoroutine := 10 + + for range numGoroutines { + wg.Go(func() { + for range messagesPerGoroutine { + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test"}, + }) + } + }) + } + + wg.Wait() + + assert.Equal(t, numGoroutines*messagesPerGoroutine, batch.Len()) + }) + + t.Run("concurrent Add and Clear operations are safe", func(t *testing.T) { + batch := newMessageBatch(100) + var wg sync.WaitGroup + numAdders := 5 + numClears := 3 + + for range numAdders { + wg.Go(func() { + for range 20 { + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test"}, + }) + } + }) + } + + // Start clearers + for range numClears { + wg.Go(func() { + for range 10 { + batch.Clear() + } + }) + } + + wg.Wait() + assert.GreaterOrEqual(t, batch.Len(), 0) + }) + + t.Run("concurrent Len operations are safe", func(t *testing.T) { + batch := newMessageBatch(100) + var wg sync.WaitGroup + for range 10 { + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test"}, + }) + } + + for range 100 { + wg.Go(func() { + length := batch.Len() + assert.GreaterOrEqual(t, length, 0) + }) + } + + wg.Wait() + }) +} From 13579fde85854677da88a4d6665751edb6d069d4 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 13:44:16 -0700 Subject: [PATCH 17/19] Correct shutdown protocol --- pkg/chipingress/batch/client.go | 65 ++++++++++++++++------------ pkg/chipingress/batch/client_test.go | 52 +++++++++++----------- 2 files changed, 64 insertions(+), 53 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 6d5ff8fca..b0d14d6e6 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -11,11 +11,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) -type messageWithCallback struct { - event *chipingress.CloudEventPb - callback func(error) -} - type Client struct { client chipingress.Client batchSize int @@ -28,11 +23,12 @@ type Client struct { callbackWg sync.WaitGroup shutdownTimeout time.Duration shutdownOnce sync.Once + batch *messageBatch } type Opt func(*Client) -func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { +func NewClient(client chipingress.Client, opts ...Opt) (*Client, error) { c := &Client{ client: client, log: zap.NewNop().Sugar(), @@ -44,6 +40,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { shutdownChan: make(chan struct{}), callbackWg: sync.WaitGroup{}, shutdownTimeout: 5 * time.Second, + batch: newMessageBatch(10), } for _, opt := range opts { @@ -55,35 +52,36 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { func (b *Client) Start(ctx context.Context) { go func() { - batch := make([]*messageWithCallback, 0, b.batchSize) timer := time.NewTimer(b.batchInterval) timer.Stop() for { select { case <-ctx.Done(): - b.flush(batch) + // ensure: + // - current batch is flushed + // - all current network calls are completed + // - all callbacks are completed + b.Stop() return - case <-b.shutdownChan: - b.flush(batch) + case <-b.shutdownChan: // this can only happen if .Stop() + // since this was called from Stop, the remaining batch will be flushed alredy return case msg := <-b.messageBuffer: - if len(batch) == 0 { + if b.batch.Len() == 0 { timer.Reset(b.batchInterval) } - batch = append(batch, msg) + b.batch.Add(msg) - if len(batch) >= b.batchSize { - batchToSend := batch - batch = make([]*messageWithCallback, 0, b.batchSize) + if b.batch.Len() >= b.batchSize { + batchToSend := b.batch.Clear() timer.Stop() b.sendBatch(ctx, batchToSend) } case <-timer.C: - if len(batch) > 0 { - batchToSend := batch - batch = make([]*messageWithCallback, 0, b.batchSize) + if b.batch.Len() > 0 { + batchToSend := b.batch.Clear() b.sendBatch(ctx, batchToSend) } } @@ -91,25 +89,33 @@ func (b *Client) Start(ctx context.Context) { }() } +// Stop ensures: +// - current batch is flushed +// - all current network calls are completed +// - all callbacks are completed +// Forcibly shutdowns down after timeout if not completed. func (b *Client) Stop() { b.shutdownOnce.Do(func() { close(b.shutdownChan) - // wait for pending sends by getting all semaphore slots - for range cap(b.maxConcurrentSends) { - b.maxConcurrentSends <- struct{}{} - } - // wait for all callbacks to complete with timeout + done := make(chan struct{}) go func() { + // flush remaining batch + b.flush(b.batch.Clear()) + // wait for pending sends by getting all semaphore slots + for range cap(b.maxConcurrentSends) { + b.maxConcurrentSends <- struct{}{} + } + // wait for all callbacks to complete b.callbackWg.Wait() close(done) }() select { case <-done: - // All callbacks completed successfully + // All successfully shutdown case <-time.After(b.shutdownTimeout): - b.log.Warnw("timed out waiting for callbacks to complete", "timeout", b.shutdownTimeout) + b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout) } }) } @@ -126,14 +132,19 @@ func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(err return nil } + // Check shutdown first to avoid race with buffer send + select { + case <-b.shutdownChan: + return errors.New("client is shutdown") + default: + } + msg := &messageWithCallback{ event: event, callback: callback, } select { - case <-b.shutdownChan: - return errors.New("client is shutdown") case b.messageBuffer <- msg: return nil default: diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 0271856e0..a9e94031e 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -16,31 +16,31 @@ import ( func TestNewBatchClient(t *testing.T) { t.Run("NewBatchClient", func(t *testing.T) { - client, err := NewBatchClient(nil) + client, err := NewClient(nil) require.NoError(t, err) assert.NotNil(t, client) }) t.Run("WithBatchSize", func(t *testing.T) { - client, err := NewBatchClient(nil, WithBatchSize(100)) + client, err := NewClient(nil, WithBatchSize(100)) require.NoError(t, err) assert.Equal(t, 100, client.batchSize) }) t.Run("WithMaxConcurrentSends", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMaxConcurrentSends(10)) + client, err := NewClient(nil, WithMaxConcurrentSends(10)) require.NoError(t, err) assert.Equal(t, 10, cap(client.maxConcurrentSends)) }) t.Run("WithBatchInterval", func(t *testing.T) { - client, err := NewBatchClient(nil, WithBatchInterval(100*time.Millisecond)) + client, err := NewClient(nil, WithBatchInterval(100*time.Millisecond)) require.NoError(t, err) assert.Equal(t, 100*time.Millisecond, client.batchInterval) }) t.Run("WithMessageBuffer", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1000)) + client, err := NewClient(nil, WithMessageBuffer(1000)) require.NoError(t, err) assert.Equal(t, 1000, cap(client.messageBuffer)) }) @@ -48,7 +48,7 @@ func TestNewBatchClient(t *testing.T) { func TestQueueMessage(t *testing.T) { t.Run("successfully queues a message", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(5)) + client, err := NewClient(nil, WithMessageBuffer(5)) require.NoError(t, err) event := &chipingress.CloudEventPb{ @@ -68,7 +68,7 @@ func TestQueueMessage(t *testing.T) { }) t.Run("drops message if buffer is full", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1)) + client, err := NewClient(nil, WithMessageBuffer(1)) require.NoError(t, err) require.NotNil(t, client) @@ -90,7 +90,7 @@ func TestQueueMessage(t *testing.T) { }) t.Run("handles nil event", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(5)) + client, err := NewClient(nil, WithMessageBuffer(5)) require.NoError(t, err) client.QueueMessage(nil, nil) @@ -117,7 +117,7 @@ func TestSendBatch(t *testing.T) { Return(&chipingress.PublishResponse{}, nil).Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + client, err := NewClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) messages := []*messageWithCallback{ @@ -141,7 +141,7 @@ func TestSendBatch(t *testing.T) { t.Run("doesn't publish empty batch", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + client, err := NewClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) client.sendBatch(t.Context(), []*messageWithCallback{}) @@ -166,7 +166,7 @@ func TestSendBatch(t *testing.T) { }). Times(3) - client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + client, err := NewClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) batch1 := []*messageWithCallback{ @@ -214,7 +214,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) + client, err := NewClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -252,7 +252,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) + client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -291,7 +291,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -331,7 +331,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) + client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) require.NoError(t, err) client.Start(t.Context()) @@ -355,7 +355,7 @@ func TestStart(t *testing.T) { t.Run("no flush when batch is empty", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -391,7 +391,7 @@ func TestStart(t *testing.T) { }). Times(3) - client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) + client, err := NewClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -435,7 +435,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(1)) + client, err := NewClient(mockClient, WithBatchSize(1)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -486,7 +486,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(1)) + client, err := NewClient(mockClient, WithBatchSize(1)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -536,7 +536,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(1)) + client, err := NewClient(mockClient, WithBatchSize(1)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -579,7 +579,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(3)) + client, err := NewClient(mockClient, WithBatchSize(3)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -658,7 +658,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) + client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -707,7 +707,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) + client, err := NewClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -762,7 +762,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -802,7 +802,7 @@ func TestCallbacks(t *testing.T) { func TestStop(t *testing.T) { t.Run("can call Stop multiple times without panic", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewBatchClient(mockClient, WithBatchSize(10)) + client, err := NewClient(mockClient, WithBatchSize(10)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -816,7 +816,7 @@ func TestStop(t *testing.T) { t.Run("QueueMessage returns error after Stop", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewBatchClient(mockClient, WithBatchSize(10)) + client, err := NewClient(mockClient, WithBatchSize(10)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) From fbd924621a0e65b9608d7e44128e740c704d2b03 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Thu, 15 Jan 2026 13:55:13 -0700 Subject: [PATCH 18/19] Fix linting errors --- pkg/chipingress/batch/buffer.go | 1 + pkg/chipingress/batch/client.go | 14 ++++++- pkg/chipingress/batch/client_test.go | 60 ++++++++++++++-------------- 3 files changed, 43 insertions(+), 32 deletions(-) diff --git a/pkg/chipingress/batch/buffer.go b/pkg/chipingress/batch/buffer.go index 91d3dade9..74b4e2298 100644 --- a/pkg/chipingress/batch/buffer.go +++ b/pkg/chipingress/batch/buffer.go @@ -1,3 +1,4 @@ +// Package batch provides a thread-safe batching client for chip ingress messages. package batch import ( diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index b0d14d6e6..782784c68 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) +// Client is a batching client that accumulates messages and sends them in batches. type Client struct { client chipingress.Client batchSize int @@ -26,9 +27,11 @@ type Client struct { batch *messageBatch } +// Opt is a functional option for configuring the batch Client. type Opt func(*Client) -func NewClient(client chipingress.Client, opts ...Opt) (*Client, error) { +// NewBatchClient creates a new batching client with the given options. +func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { c := &Client{ client: client, log: zap.NewNop().Sugar(), @@ -50,6 +53,7 @@ func NewClient(client chipingress.Client, opts ...Opt) (*Client, error) { return c, nil } +// Start begins processing messages from the queue and sending them in batches func (b *Client) Start(ctx context.Context) { go func() { timer := time.NewTimer(b.batchInterval) @@ -153,7 +157,6 @@ func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(err } func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) { - if len(messages) == 0 { return } @@ -200,42 +203,49 @@ func (b *Client) flush(batch []*messageWithCallback) { b.sendBatch(ctx, batch) } +// WithBatchSize sets the number of messages to accumulate before sending a batch func WithBatchSize(batchSize int) Opt { return func(c *Client) { c.batchSize = batchSize } } +// WithMaxConcurrentSends sets the maximum number of concurrent batch send operations func WithMaxConcurrentSends(maxConcurrentSends int) Opt { return func(c *Client) { c.maxConcurrentSends = make(chan struct{}, maxConcurrentSends) } } +// WithBatchInterval sets the maximum time to wait before sending an incomplete batch func WithBatchInterval(batchTimeout time.Duration) Opt { return func(c *Client) { c.batchInterval = batchTimeout } } +// WithShutdownTimeout sets the maximum time to wait for shutdown to complete func WithShutdownTimeout(shutdownTimeout time.Duration) Opt { return func(c *Client) { c.shutdownTimeout = shutdownTimeout } } +// WithMessageBuffer sets the size of the message queue buffer func WithMessageBuffer(messageBufferSize int) Opt { return func(c *Client) { c.messageBuffer = make(chan *messageWithCallback, messageBufferSize) } } +// WithMaxPublishTimeout sets the maximum time to wait for a batch publish operation func WithMaxPublishTimeout(maxPublishTimeout time.Duration) Opt { return func(c *Client) { c.maxPublishTimeout = maxPublishTimeout } } +// WithLogger sets the logger for the batch client func WithLogger(log *zap.SugaredLogger) Opt { return func(c *Client) { c.log = log diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index a9e94031e..d17eab955 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -16,31 +16,31 @@ import ( func TestNewBatchClient(t *testing.T) { t.Run("NewBatchClient", func(t *testing.T) { - client, err := NewClient(nil) + client, err := NewBatchClient(nil) require.NoError(t, err) assert.NotNil(t, client) }) t.Run("WithBatchSize", func(t *testing.T) { - client, err := NewClient(nil, WithBatchSize(100)) + client, err := NewBatchClient(nil, WithBatchSize(100)) require.NoError(t, err) assert.Equal(t, 100, client.batchSize) }) t.Run("WithMaxConcurrentSends", func(t *testing.T) { - client, err := NewClient(nil, WithMaxConcurrentSends(10)) + client, err := NewBatchClient(nil, WithMaxConcurrentSends(10)) require.NoError(t, err) assert.Equal(t, 10, cap(client.maxConcurrentSends)) }) t.Run("WithBatchInterval", func(t *testing.T) { - client, err := NewClient(nil, WithBatchInterval(100*time.Millisecond)) + client, err := NewBatchClient(nil, WithBatchInterval(100*time.Millisecond)) require.NoError(t, err) assert.Equal(t, 100*time.Millisecond, client.batchInterval) }) t.Run("WithMessageBuffer", func(t *testing.T) { - client, err := NewClient(nil, WithMessageBuffer(1000)) + client, err := NewBatchClient(nil, WithMessageBuffer(1000)) require.NoError(t, err) assert.Equal(t, 1000, cap(client.messageBuffer)) }) @@ -48,7 +48,7 @@ func TestNewBatchClient(t *testing.T) { func TestQueueMessage(t *testing.T) { t.Run("successfully queues a message", func(t *testing.T) { - client, err := NewClient(nil, WithMessageBuffer(5)) + client, err := NewBatchClient(nil, WithMessageBuffer(5)) require.NoError(t, err) event := &chipingress.CloudEventPb{ @@ -68,7 +68,7 @@ func TestQueueMessage(t *testing.T) { }) t.Run("drops message if buffer is full", func(t *testing.T) { - client, err := NewClient(nil, WithMessageBuffer(1)) + client, err := NewBatchClient(nil, WithMessageBuffer(1)) require.NoError(t, err) require.NotNil(t, client) @@ -78,8 +78,8 @@ func TestQueueMessage(t *testing.T) { Type: "test.event.type", } - client.QueueMessage(event, nil) - client.QueueMessage(event, nil) + _ = client.QueueMessage(event, nil) + _ = client.QueueMessage(event, nil) assert.Len(t, client.messageBuffer, 1) @@ -90,7 +90,7 @@ func TestQueueMessage(t *testing.T) { }) t.Run("handles nil event", func(t *testing.T) { - client, err := NewClient(nil, WithMessageBuffer(5)) + client, err := NewBatchClient(nil, WithMessageBuffer(5)) require.NoError(t, err) client.QueueMessage(nil, nil) @@ -114,10 +114,10 @@ func TestSendBatch(t *testing.T) { batch.Events[2].Id == "test-id-3" }), ). - Return(&chipingress.PublishResponse{}, nil).Run(func(args mock.Arguments) { close(done) }). + Return(&chipingress.PublishResponse{}, nil).Run(func(_ mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithMessageBuffer(5)) + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) messages := []*messageWithCallback{ @@ -141,7 +141,7 @@ func TestSendBatch(t *testing.T) { t.Run("doesn't publish empty batch", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewClient(mockClient, WithMessageBuffer(5)) + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) client.sendBatch(t.Context(), []*messageWithCallback{}) @@ -158,7 +158,7 @@ func TestSendBatch(t *testing.T) { mockClient. On("PublishBatch", mock.Anything, mock.Anything). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { + Run(func(_ mock.Arguments) { callCount++ if callCount == 3 { close(done) @@ -166,7 +166,7 @@ func TestSendBatch(t *testing.T) { }). Times(3) - client, err := NewClient(mockClient, WithMessageBuffer(5)) + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) batch1 := []*messageWithCallback{ @@ -214,7 +214,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -252,7 +252,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -291,7 +291,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -331,7 +331,7 @@ func TestStart(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) require.NoError(t, err) client.Start(t.Context()) @@ -355,7 +355,7 @@ func TestStart(t *testing.T) { t.Run("no flush when batch is empty", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -391,7 +391,7 @@ func TestStart(t *testing.T) { }). Times(3) - client, err := NewClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -435,7 +435,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(1)) + client, err := NewBatchClient(mockClient, WithBatchSize(1)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -486,7 +486,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(1)) + client, err := NewBatchClient(mockClient, WithBatchSize(1)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -536,7 +536,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(1)) + client, err := NewBatchClient(mockClient, WithBatchSize(1)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -579,7 +579,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(3)) + client, err := NewBatchClient(mockClient, WithBatchSize(3)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -658,7 +658,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -707,7 +707,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -762,7 +762,7 @@ func TestCallbacks(t *testing.T) { Run(func(args mock.Arguments) { close(done) }). Once() - client, err := NewClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -802,7 +802,7 @@ func TestCallbacks(t *testing.T) { func TestStop(t *testing.T) { t.Run("can call Stop multiple times without panic", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewClient(mockClient, WithBatchSize(10)) + client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) @@ -816,7 +816,7 @@ func TestStop(t *testing.T) { t.Run("QueueMessage returns error after Stop", func(t *testing.T) { mockClient := mocks.NewClient(t) - client, err := NewClient(mockClient, WithBatchSize(10)) + client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) ctx, cancel := context.WithCancel(t.Context()) From 86fb5df86ee5e336956106094d5038c45ba09840 Mon Sep 17 00:00:00 2001 From: hendoxc Date: Mon, 26 Jan 2026 08:23:40 -0700 Subject: [PATCH 19/19] Fixes linting --- pkg/chipingress/batch/client.go | 2 - pkg/chipingress/batch/client_test.go | 69 +++++++++++++++------------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 782784c68..96bfb6687 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -131,7 +131,6 @@ func (b *Client) Stop() { // Returns immediately with no blocking - drops message if channel is full. // Returns an error if the message was dropped. func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error { - if event == nil { return nil } @@ -189,7 +188,6 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) } }) }() - } func (b *Client) flush(batch []*messageWithCallback) { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index d17eab955..c1ce8e440 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -57,7 +57,8 @@ func TestQueueMessage(t *testing.T) { Type: "test.event.type", } - client.QueueMessage(event, nil) + err = client.QueueMessage(event, nil) + require.NoError(t, err) assert.Len(t, client.messageBuffer, 1) @@ -93,7 +94,8 @@ func TestQueueMessage(t *testing.T) { client, err := NewBatchClient(nil, WithMessageBuffer(5)) require.NoError(t, err) - client.QueueMessage(nil, nil) + err = client.QueueMessage(nil, nil) + require.NoError(t, err) assert.Empty(t, client.messageBuffer) }) } @@ -211,7 +213,7 @@ func TestStart(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) @@ -222,9 +224,12 @@ func TestStart(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}, nil) + err = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, err) + err = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, err) + err = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, err) select { case <-done: @@ -249,7 +254,7 @@ func TestStart(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) @@ -260,8 +265,8 @@ func TestStart(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) select { case <-done: @@ -288,7 +293,7 @@ func TestStart(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) @@ -298,8 +303,8 @@ func TestStart(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) - client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) time.Sleep(10 * time.Millisecond) @@ -328,7 +333,7 @@ func TestStart(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) @@ -383,7 +388,7 @@ func TestStart(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { + Run(func(_ mock.Arguments) { callCount++ if callCount == 3 { close(done) @@ -400,7 +405,7 @@ func TestStart(t *testing.T) { client.Start(ctx) for i := 1; i <= 6; i++ { - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-" + strconv.Itoa(i), Source: "test-source", Type: "test.event.type", @@ -432,7 +437,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(1)) @@ -443,7 +448,7 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type", @@ -483,7 +488,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, expectedErr). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(1)) @@ -494,7 +499,7 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type", @@ -533,7 +538,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(1)) @@ -545,7 +550,7 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) // Queue message with nil callback - should not panic - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type", @@ -576,7 +581,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(3)) @@ -587,7 +592,7 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type", @@ -595,7 +600,7 @@ func TestCallbacks(t *testing.T) { callback1Done <- err }) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-2", Source: "test-source", Type: "test.event.type", @@ -603,7 +608,7 @@ func TestCallbacks(t *testing.T) { callback2Done <- err }) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-3", Source: "test-source", Type: "test.event.type", @@ -655,7 +660,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) @@ -666,7 +671,7 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type", @@ -704,7 +709,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) @@ -715,13 +720,13 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type", }, nil) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-2", Source: "test-source", Type: "test.event.type", @@ -759,7 +764,7 @@ func TestCallbacks(t *testing.T) { }), ). Return(&chipingress.PublishResponse{}, nil). - Run(func(args mock.Arguments) { close(done) }). + Run(func(_ mock.Arguments) { close(done) }). Once() client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) @@ -769,7 +774,7 @@ func TestCallbacks(t *testing.T) { client.Start(ctx) - client.QueueMessage(&chipingress.CloudEventPb{ + _ = client.QueueMessage(&chipingress.CloudEventPb{ Id: "test-id-1", Source: "test-source", Type: "test.event.type",