diff --git a/pkg/chipingress/batch/buffer.go b/pkg/chipingress/batch/buffer.go new file mode 100644 index 000000000..74b4e2298 --- /dev/null +++ b/pkg/chipingress/batch/buffer.go @@ -0,0 +1,73 @@ +// Package batch provides a thread-safe batching client for chip ingress messages. +package batch + +import ( + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +// messageBatch is a thread-safe buffer for accumulating messages before sending as a batch +type messageBatch struct { + messages []*messageWithCallback + mu sync.Mutex +} + +type messageWithCallback struct { + event *chipingress.CloudEventPb + callback func(error) +} + +// newMessageBatch creates a new messageBatch with the given initial capacity +func newMessageBatch(capacity int) *messageBatch { + return &messageBatch{ + messages: make([]*messageWithCallback, 0, capacity), + } +} + +// Add appends a message to the batch +func (b *messageBatch) Add(msg *messageWithCallback) { + b.mu.Lock() + defer b.mu.Unlock() + b.messages = append(b.messages, msg) +} + +// Len returns the current number of messages in the batch +func (b *messageBatch) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.messages) +} + +// Clear removes all messages from the batch and returns a copy of them +func (b *messageBatch) Clear() []*messageWithCallback { + b.mu.Lock() + defer b.mu.Unlock() + + if len(b.messages) == 0 { + return nil + } + + // Make a copy + result := make([]*messageWithCallback, len(b.messages)) + copy(result, b.messages) + + // Reset the internal slice + b.messages = b.messages[:0] + + return result +} + +// Values returns a copy of the current messages without clearing them +func (b *messageBatch) Values() []*messageWithCallback { + b.mu.Lock() + defer b.mu.Unlock() + + if len(b.messages) == 0 { + return nil + } + + result := make([]*messageWithCallback, len(b.messages)) + copy(result, b.messages) + return result +} diff --git a/pkg/chipingress/batch/buffer_test.go b/pkg/chipingress/batch/buffer_test.go new file mode 100644 index 000000000..1fc6290c8 --- /dev/null +++ b/pkg/chipingress/batch/buffer_test.go @@ -0,0 +1,183 @@ +package batch + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +func TestMessageBatch(t *testing.T) { + t.Run("newMessageBatch creates empty batch", func(t *testing.T) { + batch := newMessageBatch(10) + require.NotNil(t, batch) + assert.Equal(t, 0, batch.Len()) + }) + + t.Run("Add appends messages", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + msg2 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + } + + batch.Add(msg1) + assert.Equal(t, 1, batch.Len()) + + batch.Add(msg2) + assert.Equal(t, 2, batch.Len()) + }) + + t.Run("Clear returns copy and empties batch", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + msg2 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + } + + batch.Add(msg1) + batch.Add(msg2) + + result := batch.Clear() + require.NotNil(t, result) + assert.Len(t, result, 2) + assert.Equal(t, "test-1", result[0].event.Id) + assert.Equal(t, "test-2", result[1].event.Id) + + assert.Equal(t, 0, batch.Len()) + }) + + t.Run("Clear on empty batch returns nil", func(t *testing.T) { + batch := newMessageBatch(10) + result := batch.Clear() + assert.Nil(t, result) + }) + + t.Run("Values returns copy without clearing", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + msg2 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + } + + batch.Add(msg1) + batch.Add(msg2) + + result := batch.Values() + require.NotNil(t, result) + assert.Len(t, result, 2) + assert.Equal(t, "test-1", result[0].event.Id) + assert.Equal(t, "test-2", result[1].event.Id) + + // Batch should still have messages after Values + assert.Equal(t, 2, batch.Len()) + }) + + t.Run("Values on empty batch returns nil", func(t *testing.T) { + batch := newMessageBatch(10) + result := batch.Values() + assert.Nil(t, result) + }) + + t.Run("Values returns slice copy with same pointers", func(t *testing.T) { + batch := newMessageBatch(10) + + msg1 := &messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-1"}, + } + + batch.Add(msg1) + result := batch.Values() + require.Len(t, result, 1) + + assert.Equal(t, msg1, result[0]) + assert.Same(t, msg1.event, result[0].event) + + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test-2"}, + }) + assert.Len(t, result, 1) + assert.Equal(t, 2, batch.Len()) + }) + + t.Run("concurrent Add operations are safe", func(t *testing.T) { + batch := newMessageBatch(100) + var wg sync.WaitGroup + numGoroutines := 10 + messagesPerGoroutine := 10 + + for range numGoroutines { + wg.Go(func() { + for range messagesPerGoroutine { + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test"}, + }) + } + }) + } + + wg.Wait() + + assert.Equal(t, numGoroutines*messagesPerGoroutine, batch.Len()) + }) + + t.Run("concurrent Add and Clear operations are safe", func(t *testing.T) { + batch := newMessageBatch(100) + var wg sync.WaitGroup + numAdders := 5 + numClears := 3 + + for range numAdders { + wg.Go(func() { + for range 20 { + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test"}, + }) + } + }) + } + + // Start clearers + for range numClears { + wg.Go(func() { + for range 10 { + batch.Clear() + } + }) + } + + wg.Wait() + assert.GreaterOrEqual(t, batch.Len(), 0) + }) + + t.Run("concurrent Len operations are safe", func(t *testing.T) { + batch := newMessageBatch(100) + var wg sync.WaitGroup + for range 10 { + batch.Add(&messageWithCallback{ + event: &chipingress.CloudEventPb{Id: "test"}, + }) + } + + for range 100 { + wg.Go(func() { + length := batch.Len() + assert.GreaterOrEqual(t, length, 0) + }) + } + + wg.Wait() + }) +} diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go new file mode 100644 index 000000000..96bfb6687 --- /dev/null +++ b/pkg/chipingress/batch/client.go @@ -0,0 +1,251 @@ +package batch + +import ( + "context" + "errors" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +// Client is a batching client that accumulates messages and sends them in batches. +type Client struct { + client chipingress.Client + batchSize int + maxConcurrentSends chan struct{} + batchInterval time.Duration + maxPublishTimeout time.Duration + messageBuffer chan *messageWithCallback + shutdownChan chan struct{} + log *zap.SugaredLogger + callbackWg sync.WaitGroup + shutdownTimeout time.Duration + shutdownOnce sync.Once + batch *messageBatch +} + +// Opt is a functional option for configuring the batch Client. +type Opt func(*Client) + +// NewBatchClient creates a new batching client with the given options. +func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { + c := &Client{ + client: client, + log: zap.NewNop().Sugar(), + batchSize: 10, + maxConcurrentSends: make(chan struct{}, 1), + messageBuffer: make(chan *messageWithCallback, 200), + batchInterval: 100 * time.Millisecond, + maxPublishTimeout: 5 * time.Second, + shutdownChan: make(chan struct{}), + callbackWg: sync.WaitGroup{}, + shutdownTimeout: 5 * time.Second, + batch: newMessageBatch(10), + } + + for _, opt := range opts { + opt(c) + } + + return c, nil +} + +// Start begins processing messages from the queue and sending them in batches +func (b *Client) Start(ctx context.Context) { + go func() { + timer := time.NewTimer(b.batchInterval) + timer.Stop() + + for { + select { + case <-ctx.Done(): + // ensure: + // - current batch is flushed + // - all current network calls are completed + // - all callbacks are completed + b.Stop() + return + case <-b.shutdownChan: // this can only happen if .Stop() + // since this was called from Stop, the remaining batch will be flushed alredy + return + case msg := <-b.messageBuffer: + if b.batch.Len() == 0 { + timer.Reset(b.batchInterval) + } + + b.batch.Add(msg) + + if b.batch.Len() >= b.batchSize { + batchToSend := b.batch.Clear() + timer.Stop() + b.sendBatch(ctx, batchToSend) + } + case <-timer.C: + if b.batch.Len() > 0 { + batchToSend := b.batch.Clear() + b.sendBatch(ctx, batchToSend) + } + } + } + }() +} + +// Stop ensures: +// - current batch is flushed +// - all current network calls are completed +// - all callbacks are completed +// Forcibly shutdowns down after timeout if not completed. +func (b *Client) Stop() { + b.shutdownOnce.Do(func() { + close(b.shutdownChan) + + done := make(chan struct{}) + go func() { + // flush remaining batch + b.flush(b.batch.Clear()) + // wait for pending sends by getting all semaphore slots + for range cap(b.maxConcurrentSends) { + b.maxConcurrentSends <- struct{}{} + } + // wait for all callbacks to complete + b.callbackWg.Wait() + close(done) + }() + + select { + case <-done: + // All successfully shutdown + case <-time.After(b.shutdownTimeout): + b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout) + } + }) +} + +// QueueMessage queues a single message to the batch client with an optional callback. +// The callback will be invoked after the batch containing this message is sent. +// The callback receives an error parameter (nil on success). +// Callbacks are invoked from goroutines +// Returns immediately with no blocking - drops message if channel is full. +// Returns an error if the message was dropped. +func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error { + if event == nil { + return nil + } + + // Check shutdown first to avoid race with buffer send + select { + case <-b.shutdownChan: + return errors.New("client is shutdown") + default: + } + + msg := &messageWithCallback{ + event: event, + callback: callback, + } + + select { + case b.messageBuffer <- msg: + return nil + default: + return errors.New("message buffer is full") + } +} + +func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) { + if len(messages) == 0 { + return + } + + // acquire semaphore, limiting concurrent sends + b.maxConcurrentSends <- struct{}{} + + go func() { + defer func() { <-b.maxConcurrentSends }() + // this is specifically to prevent long running network calls + ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) + defer cancel() + + events := make([]*chipingress.CloudEventPb, len(messages)) + for i, msg := range messages { + events[i] = msg.event + } + + _, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events}) + if err != nil { + b.log.Errorw("failed to publish batch", "error", err) + } + // the callbacks are placed in their own goroutine to not block releasing the semaphore + // we use a wait group, to ensure all callbacks are completed if .Stop() is called. + b.callbackWg.Go(func() { + for _, msg := range messages { + if msg.callback != nil { + msg.callback(err) + } + } + }) + }() +} + +func (b *Client) flush(batch []*messageWithCallback) { + if len(batch) == 0 { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), b.maxPublishTimeout) + defer cancel() + + b.sendBatch(ctx, batch) +} + +// WithBatchSize sets the number of messages to accumulate before sending a batch +func WithBatchSize(batchSize int) Opt { + return func(c *Client) { + c.batchSize = batchSize + } +} + +// WithMaxConcurrentSends sets the maximum number of concurrent batch send operations +func WithMaxConcurrentSends(maxConcurrentSends int) Opt { + return func(c *Client) { + c.maxConcurrentSends = make(chan struct{}, maxConcurrentSends) + } +} + +// WithBatchInterval sets the maximum time to wait before sending an incomplete batch +func WithBatchInterval(batchTimeout time.Duration) Opt { + return func(c *Client) { + c.batchInterval = batchTimeout + } +} + +// WithShutdownTimeout sets the maximum time to wait for shutdown to complete +func WithShutdownTimeout(shutdownTimeout time.Duration) Opt { + return func(c *Client) { + c.shutdownTimeout = shutdownTimeout + } +} + +// WithMessageBuffer sets the size of the message queue buffer +func WithMessageBuffer(messageBufferSize int) Opt { + return func(c *Client) { + c.messageBuffer = make(chan *messageWithCallback, messageBufferSize) + } +} + +// WithMaxPublishTimeout sets the maximum time to wait for a batch publish operation +func WithMaxPublishTimeout(maxPublishTimeout time.Duration) Opt { + return func(c *Client) { + c.maxPublishTimeout = maxPublishTimeout + } +} + +// WithLogger sets the logger for the batch client +func WithLogger(log *zap.SugaredLogger) Opt { + return func(c *Client) { + c.log = log + } +} diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go new file mode 100644 index 000000000..c1ce8e440 --- /dev/null +++ b/pkg/chipingress/batch/client_test.go @@ -0,0 +1,852 @@ +package batch + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" +) + +func TestNewBatchClient(t *testing.T) { + t.Run("NewBatchClient", func(t *testing.T) { + client, err := NewBatchClient(nil) + require.NoError(t, err) + assert.NotNil(t, client) + }) + + t.Run("WithBatchSize", func(t *testing.T) { + client, err := NewBatchClient(nil, WithBatchSize(100)) + require.NoError(t, err) + assert.Equal(t, 100, client.batchSize) + }) + + t.Run("WithMaxConcurrentSends", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMaxConcurrentSends(10)) + require.NoError(t, err) + assert.Equal(t, 10, cap(client.maxConcurrentSends)) + }) + + t.Run("WithBatchInterval", func(t *testing.T) { + client, err := NewBatchClient(nil, WithBatchInterval(100*time.Millisecond)) + require.NoError(t, err) + assert.Equal(t, 100*time.Millisecond, client.batchInterval) + }) + + t.Run("WithMessageBuffer", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMessageBuffer(1000)) + require.NoError(t, err) + assert.Equal(t, 1000, cap(client.messageBuffer)) + }) +} + +func TestQueueMessage(t *testing.T) { + t.Run("successfully queues a message", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMessageBuffer(5)) + require.NoError(t, err) + + event := &chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + } + + err = client.QueueMessage(event, nil) + require.NoError(t, err) + + assert.Len(t, client.messageBuffer, 1) + + received := <-client.messageBuffer + assert.Equal(t, event.Id, received.event.Id) + assert.Equal(t, event.Source, received.event.Source) + assert.Equal(t, event.Type, received.event.Type) + }) + + t.Run("drops message if buffer is full", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMessageBuffer(1)) + require.NoError(t, err) + require.NotNil(t, client) + + event := &chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + } + + _ = client.QueueMessage(event, nil) + _ = client.QueueMessage(event, nil) + + assert.Len(t, client.messageBuffer, 1) + + received := <-client.messageBuffer + assert.Equal(t, event.Id, received.event.Id) + assert.Equal(t, event.Source, received.event.Source) + assert.Equal(t, event.Type, received.event.Type) + }) + + t.Run("handles nil event", func(t *testing.T) { + client, err := NewBatchClient(nil, WithMessageBuffer(5)) + require.NoError(t, err) + + err = client.QueueMessage(nil, nil) + require.NoError(t, err) + assert.Empty(t, client.messageBuffer) + }) +} + +func TestSendBatch(t *testing.T) { + t.Run("successfully sends a batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + // verify the batch contains the expected events + return len(batch.Events) == 3 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" && + batch.Events[2].Id == "test-id-3" + }), + ). + Return(&chipingress.PublishResponse{}, nil).Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + require.NoError(t, err) + + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}}, + {event: &chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}}, + {event: &chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}}, + } + + client.sendBatch(t.Context(), messages) + + // wait for the internal goroutine to complete + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("doesn't publish empty batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + require.NoError(t, err) + + client.sendBatch(t.Context(), []*messageWithCallback{}) + + mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything) + }) + + t.Run("sends multiple batches successfully", func(t *testing.T) { + mockClient := mocks.NewClient(t) + + done := make(chan struct{}) + callCount := 0 + + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { + callCount++ + if callCount == 3 { + close(done) + } + }). + Times(3) + + client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) + require.NoError(t, err) + + batch1 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "batch1-id-1", Source: "test-source", Type: "test.event.type"}}, + } + batch2 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "batch2-id-1", Source: "test-source", Type: "test.event.type"}}, + {event: &chipingress.CloudEventPb{Id: "batch2-id-2", Source: "test-source", Type: "test.event.type"}}, + } + batch3 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "batch3-id-1", Source: "test-source", Type: "test.event.type"}}, + } + + client.sendBatch(context.Background(), batch1) + client.sendBatch(context.Background(), batch2) + client.sendBatch(context.Background(), batch3) + + // wait for the internal goroutines to complete + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for multiple batches to be sent") + } + + mockClient.AssertExpectations(t) + }) +} + +func TestStart(t *testing.T) { + t.Run("batch size trigger", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 3 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" && + batch.Events[2].Id == "test-id-3" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(3), WithBatchInterval(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + err = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, err) + err = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, err) + err = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-3", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("timeout trigger", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent after timeout") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("context cancellation flushes pending batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.MatchedBy(func(ctx context.Context) bool { + return ctx != nil + }), + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + _ = client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + + time.Sleep(10 * time.Millisecond) + + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for flush on context cancellation") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("stop flushes pending batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 && + batch.Events[0].Id == "test-id-1" && + batch.Events[1].Id == "test-id-2" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(100*time.Millisecond), WithMessageBuffer(10)) + require.NoError(t, err) + + client.Start(t.Context()) + + queued1 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil) + queued2 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil) + require.NoError(t, queued1) + require.NoError(t, queued2) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch timeout to trigger") + } + + client.Stop() + + mockClient.AssertExpectations(t) + }) + + t.Run("no flush when batch is empty", func(t *testing.T) { + mockClient := mocks.NewClient(t) + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + client.Start(ctx) + + time.Sleep(10 * time.Millisecond) + + cancel() + + time.Sleep(50 * time.Millisecond) + + mockClient.AssertNotCalled(t, "PublishBatch") + }) + + t.Run("multiple batches via size trigger", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callCount := 0 + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { + callCount++ + if callCount == 3 { + close(done) + } + }). + Times(3) + + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + for i := 1; i <= 6; i++ { + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-" + strconv.Itoa(i), + Source: "test-source", + Type: "test.event.type", + }, nil) + } + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for multiple batches to be sent") + } + + mockClient.AssertExpectations(t) + }) +} + +func TestCallbacks(t *testing.T) { + t.Run("callback invoked on successful send", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + // wait for callback + select { + case err := <-callbackDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callback receives error on failed send", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + expectedErr := assert.AnError + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, expectedErr). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + // wait for batch to be sent + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + // wait for callback to be invoked with error + select { + case err := <-callbackDone: + assert.Equal(t, expectedErr, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("nil callback works without panic", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + // Queue message with nil callback - should not panic + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, nil) + + // wait for batch to be sent + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("multiple messages with different callbacks", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callback1Done := make(chan error, 1) + callback2Done := make(chan error, 1) + callback3Done := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 3 + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(3)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callback1Done <- err + }) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-2", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callback2Done <- err + }) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-3", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callback3Done <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + select { + case err := <-callback1Done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback 1") + } + + select { + case err := <-callback2Done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback 2") + } + + select { + case err := <-callback3Done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback 3") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callback invoked for timeout-triggered batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(50*time.Millisecond)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + // wait for callback + select { + case err := <-callbackDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callback invoked for size-triggered batch", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 2 + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(2), WithBatchInterval(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, nil) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-2", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for batch to be sent") + } + + select { + case err := <-callbackDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) + + t.Run("callbacks invoked on context cancellation", func(t *testing.T) { + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, 1) + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) == 1 && + batch.Events[0].Id == "test-id-1" + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + + client.Start(ctx) + + _ = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, func(err error) { + callbackDone <- err + }) + + time.Sleep(10 * time.Millisecond) + + // cancel context to trigger flush + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for flush on cancellation") + } + + select { + case err := <-callbackDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for callback") + } + + mockClient.AssertExpectations(t) + }) +} + +func TestStop(t *testing.T) { + t.Run("can call Stop multiple times without panic", func(t *testing.T) { + mockClient := mocks.NewClient(t) + client, err := NewBatchClient(mockClient, WithBatchSize(10)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + client.Stop() + client.Stop() + client.Stop() + }) + + t.Run("QueueMessage returns error after Stop", func(t *testing.T) { + mockClient := mocks.NewClient(t) + client, err := NewBatchClient(mockClient, WithBatchSize(10)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + client.Start(ctx) + + // Queue message before stop - should succeed + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-1", + Source: "test-source", + Type: "test.event.type", + }, nil) + require.NoError(t, err) + + // Stop the client + client.Stop() + + // Queue message after stop - should fail + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "test-id-2", + Source: "test-source", + Type: "test.event.type", + }, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "shutdown") + }) +} diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index dfd843e80..6b259460a 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -117,7 +117,7 @@ func TestNewEvent(t *testing.T) { assert.Equal(t, "example-subject", event.Subject()) assert.Equal(t, attributes["time"].(time.Time).UTC(), event.Time()) assert.NotEmpty(t, event.Extensions()["recordedtime"]) - assert.True(t, event.Extensions()["recordedtime"].(ce.Timestamp).Time.After(attributes["time"].(time.Time))) + assert.True(t, event.Extensions()["recordedtime"].(ce.Timestamp).After(attributes["time"].(time.Time))) // Assert the event data was set as expected var resultProto pb.PingResponse diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 5599238cb..d8d4a7db1 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -1,8 +1,6 @@ module github.com/smartcontractkit/chainlink-common/pkg/chipingress -go 1.24.5 - -toolchain go1.24.10 +go 1.25.3 require ( github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.16.1 @@ -12,6 +10,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 + go.uber.org/zap v1.27.0 google.golang.org/grpc v1.75.0 google.golang.org/protobuf v1.36.8 ) @@ -28,7 +27,6 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect @@ -36,4 +34,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +// Retracted due to initial development versions retract [v1.0.0, v1.0.1]