Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c50e461
Adds chipingress batching client
hendoxc Jan 5, 2026
92e5a61
Adds unit-tests
hendoxc Jan 5, 2026
703498f
Removes copying of slice
hendoxc Jan 5, 2026
7ac8912
Adds zap logger
hendoxc Jan 5, 2026
24a80d6
Fixes mod file
hendoxc Jan 5, 2026
a005109
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 7, 2026
3cff2cc
FIxes linting
hendoxc Jan 7, 2026
21925ff
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 9, 2026
503b2a2
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 9, 2026
b432d18
Merge branch 'INFOPLAT-3099-chip-ingress-batching' of github.com:smar…
hendoxc Jan 12, 2026
f4fadd3
Rename timeout vars
hendoxc Jan 12, 2026
4f10dfa
Adds callback mechanism
hendoxc Jan 12, 2026
a19bff9
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 12, 2026
c717835
fixes linting
hendoxc Jan 12, 2026
cc94df3
Move callback execution into separate goroutine
hendoxc Jan 13, 2026
6980605
check for shutdown
hendoxc Jan 13, 2026
d476bab
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 14, 2026
5960cb9
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 14, 2026
9341854
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 15, 2026
45474e7
Merge branch 'INFOPLAT-3099-chip-ingress-batching' of github.com:smar…
hendoxc Jan 15, 2026
ad4482f
Ensures all callbacks complete after .Stop
hendoxc Jan 15, 2026
2ca60d3
Removes compression type
hendoxc Jan 15, 2026
5670f84
Adjust configuration
hendoxc Jan 15, 2026
e8073bc
Return errors from queue message
hendoxc Jan 15, 2026
c7404c0
Adds buffer
hendoxc Jan 15, 2026
13579fd
Correct shutdown protocol
hendoxc Jan 15, 2026
fbd9246
Fix linting errors
hendoxc Jan 15, 2026
cffe580
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 20, 2026
5745a0d
Merge branch 'main' into INFOPLAT-3099-chip-ingress-batching
hendoxc Jan 26, 2026
86fb5df
Fixes linting
hendoxc Jan 26, 2026
ab780a1
Merge branch 'INFOPLAT-3099-chip-ingress-batching' of github.com:smar…
hendoxc Jan 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions pkg/chipingress/batch/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Package batch provides a thread-safe batching client for chip ingress messages.
package batch

import (
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

// messageBatch is a thread-safe buffer for accumulating messages before sending as a batch
type messageBatch struct {
messages []*messageWithCallback
mu sync.Mutex
}

type messageWithCallback struct {
event *chipingress.CloudEventPb
callback func(error)
}

// newMessageBatch creates a new messageBatch with the given initial capacity
func newMessageBatch(capacity int) *messageBatch {
return &messageBatch{
messages: make([]*messageWithCallback, 0, capacity),
}
}

// Add appends a message to the batch
func (b *messageBatch) Add(msg *messageWithCallback) {
b.mu.Lock()
defer b.mu.Unlock()
b.messages = append(b.messages, msg)
}

// Len returns the current number of messages in the batch
func (b *messageBatch) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.messages)
}

// Clear removes all messages from the batch and returns a copy of them
func (b *messageBatch) Clear() []*messageWithCallback {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.messages) == 0 {
return nil
}

// Make a copy
result := make([]*messageWithCallback, len(b.messages))
copy(result, b.messages)
Comment on lines +52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could clone here:

Suggested change
result := make([]*messageWithCallback, len(b.messages))
copy(result, b.messages)
result := slices.Clone(b.messages)


// Reset the internal slice
b.messages = b.messages[:0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider freeing this memory in some cases so an outlier capacity doesn't leave a large buffer in the background indefinitely?


return result
}

// Values returns a copy of the current messages without clearing them
func (b *messageBatch) Values() []*messageWithCallback {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.messages) == 0 {
return nil
}

result := make([]*messageWithCallback, len(b.messages))
copy(result, b.messages)
return result
Comment on lines +70 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result := make([]*messageWithCallback, len(b.messages))
copy(result, b.messages)
return result
return slices.Clone(b.messages)

}
183 changes: 183 additions & 0 deletions pkg/chipingress/batch/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package batch

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

func TestMessageBatch(t *testing.T) {
t.Run("newMessageBatch creates empty batch", func(t *testing.T) {
batch := newMessageBatch(10)
require.NotNil(t, batch)
assert.Equal(t, 0, batch.Len())
})

t.Run("Add appends messages", func(t *testing.T) {
batch := newMessageBatch(10)

msg1 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-1"},
}
msg2 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-2"},
}

batch.Add(msg1)
assert.Equal(t, 1, batch.Len())

batch.Add(msg2)
assert.Equal(t, 2, batch.Len())
})

t.Run("Clear returns copy and empties batch", func(t *testing.T) {
batch := newMessageBatch(10)

msg1 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-1"},
}
msg2 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-2"},
}

batch.Add(msg1)
batch.Add(msg2)

result := batch.Clear()
require.NotNil(t, result)
assert.Len(t, result, 2)
assert.Equal(t, "test-1", result[0].event.Id)
assert.Equal(t, "test-2", result[1].event.Id)

assert.Equal(t, 0, batch.Len())
})

t.Run("Clear on empty batch returns nil", func(t *testing.T) {
batch := newMessageBatch(10)
result := batch.Clear()
assert.Nil(t, result)
})

t.Run("Values returns copy without clearing", func(t *testing.T) {
batch := newMessageBatch(10)

msg1 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-1"},
}
msg2 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-2"},
}

batch.Add(msg1)
batch.Add(msg2)

result := batch.Values()
require.NotNil(t, result)
assert.Len(t, result, 2)
assert.Equal(t, "test-1", result[0].event.Id)
assert.Equal(t, "test-2", result[1].event.Id)

// Batch should still have messages after Values
assert.Equal(t, 2, batch.Len())
})

t.Run("Values on empty batch returns nil", func(t *testing.T) {
batch := newMessageBatch(10)
result := batch.Values()
assert.Nil(t, result)
})

t.Run("Values returns slice copy with same pointers", func(t *testing.T) {
batch := newMessageBatch(10)

msg1 := &messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-1"},
}

batch.Add(msg1)
result := batch.Values()
require.Len(t, result, 1)

assert.Equal(t, msg1, result[0])
assert.Same(t, msg1.event, result[0].event)

batch.Add(&messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test-2"},
})
assert.Len(t, result, 1)
assert.Equal(t, 2, batch.Len())
})

t.Run("concurrent Add operations are safe", func(t *testing.T) {
batch := newMessageBatch(100)
var wg sync.WaitGroup
numGoroutines := 10
messagesPerGoroutine := 10

for range numGoroutines {
wg.Go(func() {
for range messagesPerGoroutine {
batch.Add(&messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test"},
})
}
})
}

wg.Wait()

assert.Equal(t, numGoroutines*messagesPerGoroutine, batch.Len())
})

t.Run("concurrent Add and Clear operations are safe", func(t *testing.T) {
batch := newMessageBatch(100)
var wg sync.WaitGroup
numAdders := 5
numClears := 3

for range numAdders {
wg.Go(func() {
for range 20 {
batch.Add(&messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test"},
})
}
})
}

// Start clearers
for range numClears {
wg.Go(func() {
for range 10 {
batch.Clear()
}
})
}

wg.Wait()
assert.GreaterOrEqual(t, batch.Len(), 0)
})

t.Run("concurrent Len operations are safe", func(t *testing.T) {
batch := newMessageBatch(100)
var wg sync.WaitGroup
for range 10 {
batch.Add(&messageWithCallback{
event: &chipingress.CloudEventPb{Id: "test"},
})
}

for range 100 {
wg.Go(func() {
length := batch.Len()
assert.GreaterOrEqual(t, length, 0)
})
}

wg.Wait()
})
}
Loading
Loading