Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions indexer/pkg/api/handlers/v1/ccv_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/api/utils"
"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/integration/storageaccess"
"github.com/smartcontractkit/chainlink-ccv/protocol"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)
Expand All @@ -28,7 +27,7 @@ func NewCCVDataV1Handler(storage common.IndexerStorage, lggr logger.Logger, moni
}

func (h *CCVDataV1Handler) Handle(c *gin.Context) {
req := storageaccess.VerifierResultsRequest{
req := VerifierResultsV1Request{
Start: 0,
End: time.Now().UnixMilli(),
SourceChainSelectors: []protocol.ChainSelector{},
Expand All @@ -53,7 +52,7 @@ func (h *CCVDataV1Handler) Handle(c *gin.Context) {
req.SourceChainSelectors = sourceChainSelectors
req.DestChainSelectors = destChainSelectors

ccvData, err := h.storage.QueryCCVData(c.Request.Context(), req.Start, req.End, req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset)
ccvData, err := h.storage.QueryCCVData(c.Request.Context(), time.UnixMilli(req.Start), time.UnixMilli(req.End), req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
Expand Down
2 changes: 1 addition & 1 deletion indexer/pkg/api/handlers/v1/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (h *MessagesV1Handler) Handle(c *gin.Context) {
req.SourceChainSelectors = sourceChainSelectors
req.DestChainSelectors = destChainSelectors

verifications, err := h.storage.QueryCCVData(c.Request.Context(), req.Start, req.End, req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset)
verifications, err := h.storage.QueryCCVData(c.Request.Context(), time.UnixMilli(req.Start), time.UnixMilli(req.End), req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
Expand Down
9 changes: 9 additions & 0 deletions indexer/pkg/api/handlers/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ type MessagesV1Request struct {
Limit uint64 `form:"limit"`
Offset uint64 `form:"offset"`
}

type VerifierResultsV1Request struct {
SourceChainSelectors []protocol.ChainSelector // Excluded from form due to gin parsing
DestChainSelectors []protocol.ChainSelector // Excluded from form due to gin parsing
Start int64 `form:"start"`
End int64 `form:"end"`
Limit uint64 `form:"limit"`
Offset uint64 `form:"offset"`
}
3 changes: 2 additions & 1 deletion indexer/pkg/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-ccv/protocol"
)
Expand All @@ -17,7 +18,7 @@ type IndexerStorageReader interface {
// GetCCVData using the messageID for a o(1) lookup
GetCCVData(ctx context.Context, messageID protocol.Bytes32) ([]protocol.CCVData, error)
// QueryCCVData retrieves all CCVData that matches the filter set
QueryCCVData(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error)
QueryCCVData(ctx context.Context, start, end time.Time, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error)
}

type IndexerStorageWriter interface {
Expand Down
9 changes: 4 additions & 5 deletions indexer/pkg/scanner/scanner_race_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build race
// +build race

package scanner

Expand Down Expand Up @@ -49,7 +48,7 @@ func TestScanner_Race_MultipleReadersConcurrent(t *testing.T) {

// Verify messages were stored by querying all data
assert.Eventually(t, func() bool {
results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 1000, 0)
results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 1000, 0)
if err != nil {
return false
}
Expand Down Expand Up @@ -113,7 +112,7 @@ func TestScanner_Race_StorageWriteConcurrency(t *testing.T) {
// If there's a race condition, the test will fail with -race flag
expectedMessages := numReaders * messagesPerReader
require.Eventually(t, func() bool {
results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 1000, 0)
results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 1000, 0)
if err != nil {
return false
}
Expand Down Expand Up @@ -226,7 +225,7 @@ func TestScanner_Race_ChannelOperations(t *testing.T) {
// Wait for readers to start processing and produce messages
// Readers may disconnect quickly, so check for message storage instead
require.Eventually(t, func() bool {
results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 100, 0)
results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 100, 0)
if err != nil {
return false
}
Expand Down Expand Up @@ -351,7 +350,7 @@ func TestScanner_Race_ContextCancellationDuringProcessing(t *testing.T) {
// Wait for readers to start processing and produce some messages
// (readers may disconnect quickly, so check for message storage)
require.Eventually(t, func() bool {
results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 100, 0)
results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 100, 0)
if err != nil {
return false
}
Expand Down
10 changes: 5 additions & 5 deletions indexer/pkg/storage/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func RecentRead(duration time.Duration) ReadCondition {

// shouldRead determines if this storage should be read based on the condition and query parameters.
// For queries without time range (like GetCCVData), start and end should both be nil.
func (rc ReadCondition) shouldRead(queryStart, queryEnd *int64) bool {
func (rc ReadCondition) shouldRead(queryStart, queryEnd *time.Time) bool {
switch rc.Type {
case ReadAlways:
return true
Expand All @@ -90,12 +90,12 @@ func (rc ReadCondition) shouldRead(queryStart, queryEnd *int64) bool {
// Storage range: [rc.StartUnix, rc.EndUnix]

// If storage has a start time and query has an end time, check if query ends before storage starts
if rc.StartUnix != nil && queryEnd != nil && *queryEnd < *rc.StartUnix {
if rc.StartUnix != nil && queryEnd != nil && queryEnd.Before(time.UnixMilli(*rc.StartUnix)) {
return false
}

// If storage has an end time and query has a start time, check if query starts after storage ends
if rc.EndUnix != nil && queryStart != nil && *queryStart > *rc.EndUnix {
if rc.EndUnix != nil && queryStart != nil && queryStart.After(time.UnixMilli(*rc.EndUnix)) {
return false
}

Expand All @@ -117,12 +117,12 @@ func (rc ReadCondition) shouldRead(queryStart, queryEnd *int64) bool {
// Recent range: [recentStart, now]

// If query ends before recent period starts, skip this storage
if queryEnd != nil && *queryEnd < recentStart {
if queryEnd != nil && queryEnd.Before(time.UnixMilli(recentStart)) {
return false
}

// If query starts after now, skip this storage (querying future data)
if queryStart != nil && *queryStart > now {
if queryStart != nil && queryStart.After(time.UnixMilli(now)) {
return false
}

Expand Down
6 changes: 3 additions & 3 deletions indexer/pkg/storage/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (i *InMemoryStorage) GetCCVData(ctx context.Context, messageID protocol.Byt
}

// QueryCCVData retrieves all CCVData that matches the filter set.
func (i *InMemoryStorage) QueryCCVData(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error) {
func (i *InMemoryStorage) QueryCCVData(ctx context.Context, start, end time.Time, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error) {
startQueryMetric := time.Now()
i.mu.RLock()
defer i.mu.RUnlock()
Expand All @@ -126,8 +126,8 @@ func (i *InMemoryStorage) QueryCCVData(ctx context.Context, start, end int64, so
}

// Binary search for timestamp range
startIdx := i.findTimestampIndex(time.UnixMilli(start), func(ts, target int64) bool { return ts >= target })
endIdx := i.findTimestampIndex(time.UnixMilli(end), func(ts, target int64) bool { return ts > target })
startIdx := i.findTimestampIndex(start, func(ts, target int64) bool { return ts >= target })
endIdx := i.findTimestampIndex(end, func(ts, target int64) bool { return ts > target })
if startIdx >= endIdx {
return make(map[string][]protocol.CCVData), nil
}
Expand Down
19 changes: 9 additions & 10 deletions indexer/pkg/storage/in_memory_race_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build race
// +build race

package storage

Expand Down Expand Up @@ -44,7 +43,7 @@ func TestInMemoryStorage_Race_ConcurrentInserts(t *testing.T) {
wg.Wait()

// Verify all data was inserted correctly
results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 1000, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0)
require.NoError(t, err)
assert.Equal(t, numGoroutines*insertsPerGoroutine, len(results))
}
Expand Down Expand Up @@ -126,7 +125,7 @@ func TestInMemoryStorage_Race_ConcurrentQueries(t *testing.T) {
destChains = []protocol.ChainSelector{protocol.ChainSelector((goroutineID + 1) % 5)}
}

results, err := storage.QueryCCVData(ctx, start, end, sourceChains, destChains, 50, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(start), time.UnixMilli(end), sourceChains, destChains, 50, 0)
assert.NoError(t, err)
assert.NotNil(t, results)
}
Expand Down Expand Up @@ -167,7 +166,7 @@ func TestInMemoryStorage_Race_MixedReadsAndWrites(t *testing.T) {
assert.NoError(t, err)
} else {
// Read
results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 10, uint64(goroutineID%5))
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 10, uint64(goroutineID%5))
assert.NoError(t, err)
assert.NotNil(t, results)
}
Expand All @@ -178,7 +177,7 @@ func TestInMemoryStorage_Race_MixedReadsAndWrites(t *testing.T) {
wg.Wait()

// Verify storage is still consistent
results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 1000, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0)
require.NoError(t, err)
assert.NotEmpty(t, results)
}
Expand Down Expand Up @@ -224,7 +223,7 @@ func TestInMemoryStorage_Race_HeavyConcurrentLoad(t *testing.T) {
start := int64(1000 + readerID*50)
end := start + 5000
sourceChains := []protocol.ChainSelector{protocol.ChainSelector(readerID % 5)}
results, err := storage.QueryCCVData(ctx, start, end, sourceChains, nil, 20, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(start), time.UnixMilli(end), sourceChains, nil, 20, 0)
assert.NoError(t, err)
assert.NotNil(t, results)
} else {
Expand All @@ -241,7 +240,7 @@ func TestInMemoryStorage_Race_HeavyConcurrentLoad(t *testing.T) {
wg.Wait()

// Final verification
results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 1000, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0)
require.NoError(t, err)
assert.NotEmpty(t, results)
}
Expand Down Expand Up @@ -345,7 +344,7 @@ func TestInMemoryStorage_Race_ChainSelectorIndexesUnderConcurrency(t *testing.T)
// Verify chain selector queries work correctly
for chainID := 0; chainID < 5; chainID++ {
sourceChains := []protocol.ChainSelector{protocol.ChainSelector(chainID)}
results, err := storage.QueryCCVData(ctx, 0, 99999, sourceChains, nil, 1000, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), sourceChains, nil, 1000, 0)
require.NoError(t, err)
assert.NotEmpty(t, results, "Should find results for chain selector %d", chainID)
}
Expand Down Expand Up @@ -386,7 +385,7 @@ func TestInMemoryStorage_Race_PaginationUnderConcurrency(t *testing.T) {
for j := 0; j < 10; j++ {
limit := uint64(5)
offset := uint64(j * 5)
results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, limit, offset)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, limit, offset)
assert.NoError(t, err)
assert.NotNil(t, results)
// Results can vary in size due to concurrent inserts, which is expected
Expand Down Expand Up @@ -432,7 +431,7 @@ func TestInMemoryStorage_Race_ContextCancellation(t *testing.T) {

// Verify storage is still in a consistent state
freshCtx := context.Background()
results, err := storage.QueryCCVData(freshCtx, 0, 99999, nil, nil, 1000, 0)
results, err := storage.QueryCCVData(freshCtx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0)
require.NoError(t, err)
assert.NotNil(t, results)
}
22 changes: 11 additions & 11 deletions indexer/pkg/storage/in_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestQueryCCVDataTimestampRange(t *testing.T) {
}

// Query for timestamp range 1500-3500
results, err := storage.QueryCCVData(ctx, 1500, 3500, nil, nil, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(1500), time.UnixMilli(3500), nil, nil, 100, 0)
require.NoError(t, err)

// Should return ccvData2 and ccvData3
Expand All @@ -113,7 +113,7 @@ func TestQueryCCVDataWithSourceChainFilter(t *testing.T) {

// Query for source chain 1
sourceChains := []protocol.ChainSelector{1}
results, err := storage.QueryCCVData(ctx, 0, time.Now().UnixMilli(), sourceChains, []protocol.ChainSelector{}, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()), sourceChains, []protocol.ChainSelector{}, 100, 0)
require.NoError(t, err)

// Should return ccvData1 and ccvData3
Expand All @@ -138,7 +138,7 @@ func TestQueryCCVDataWithDestChainFilter(t *testing.T) {

// Query for dest chain 2
destChains := []protocol.ChainSelector{2}
results, err := storage.QueryCCVData(ctx, 0, time.Now().UnixMilli(), []protocol.ChainSelector{}, destChains, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()), []protocol.ChainSelector{}, destChains, 100, 0)
require.NoError(t, err)

// Should return ccvData1 and ccvData3
Expand All @@ -165,7 +165,7 @@ func TestQueryCCVDataWithBothChainFilters(t *testing.T) {
// Query for source chain 1 AND dest chain 2
sourceChains := []protocol.ChainSelector{1}
destChains := []protocol.ChainSelector{2}
results, err := storage.QueryCCVData(ctx, 0, 9999, sourceChains, destChains, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), sourceChains, destChains, 100, 0)
require.NoError(t, err)

// Should return ccvData1 and ccvData4
Expand All @@ -186,7 +186,7 @@ func TestQueryCCVDataPagination(t *testing.T) {
}

// Test pagination: limit=2, offset=1
results, err := storage.QueryCCVData(ctx, 0, 9999, nil, nil, 2, 1)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, nil, 2, 1)
require.NoError(t, err)

// Should return 2 records (indices 1 and 2)
Expand All @@ -198,7 +198,7 @@ func TestQueryCCVDataEmptyResult(t *testing.T) {
ctx := context.Background()

// Query with no data
results, err := storage.QueryCCVData(ctx, 0, 9999, nil, nil, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, nil, 100, 0)
require.NoError(t, err)
assert.Empty(t, results)
}
Expand All @@ -213,7 +213,7 @@ func TestQueryCCVDataNoMatchingTimestamp(t *testing.T) {
require.NoError(t, err)

// Query for timestamp range 2000-3000 (no matches)
results, err := storage.QueryCCVData(ctx, 2000, 3000, nil, nil, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(2000), time.UnixMilli(3000), nil, nil, 100, 0)
require.NoError(t, err)
assert.Empty(t, results)
}
Expand All @@ -229,7 +229,7 @@ func TestQueryCCVDataNoMatchingChainSelector(t *testing.T) {

// Query for source chain 5 (no matches)
sourceChains := []protocol.ChainSelector{5}
results, err := storage.QueryCCVData(ctx, 0, 9999, nil, sourceChains, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, sourceChains, 100, 0)
require.NoError(t, err)
assert.Empty(t, results)
}
Expand All @@ -255,7 +255,7 @@ func TestConcurrentAccess(t *testing.T) {
}

// Verify all data was inserted
results, err := storage.QueryCCVData(ctx, 0, 9999, nil, nil, 100, 0)
results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, nil, 100, 0)
require.NoError(t, err)
assert.Len(t, results, 10)
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func BenchmarkQueryCCVDataTimestampRange(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.QueryCCVData(ctx, 1500, 2500, nil, nil, 100, 0)
storage.QueryCCVData(ctx, time.UnixMilli(1500), time.UnixMilli(2500), nil, nil, 100, 0)
}
}

Expand All @@ -372,7 +372,7 @@ func BenchmarkQueryCCVDataWithChainFilter(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.QueryCCVData(ctx, 0, 9999, destChains, sourceChains, 100, 0)
storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), destChains, sourceChains, 100, 0)
}
}

Expand Down
4 changes: 2 additions & 2 deletions indexer/pkg/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *PostgresStorage) GetCCVData(ctx context.Context, messageID protocol.Byt
// QueryCCVData retrieves all CCVData that matches the filter set with pagination.
func (d *PostgresStorage) QueryCCVData(
ctx context.Context,
start, end int64,
start, end time.Time,
sourceChainSelectors, destChainSelectors []protocol.ChainSelector,
limit, offset uint64,
) (map[string][]protocol.CCVData, error) {
Expand All @@ -121,7 +121,7 @@ func (d *PostgresStorage) QueryCCVData(
WHERE timestamp >= $1 AND timestamp <= $2
`

args := []any{time.UnixMilli(start), time.UnixMilli(end)}
args := []any{start, end}
argCounter := 3

// Add source chain selector filter if provided
Expand Down
5 changes: 3 additions & 2 deletions indexer/pkg/storage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
"time"

"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/protocol"
Expand Down Expand Up @@ -119,7 +120,7 @@ func (d *Sink) GetCCVData(ctx context.Context, messageID protocol.Bytes32) ([]pr
// overlaps with each storage's configured time range.
func (d *Sink) QueryCCVData(
ctx context.Context,
start, end int64,
start, end time.Time,
sourceChainSelectors, destChainSelectors []protocol.ChainSelector,
limit, offset uint64,
) (map[string][]protocol.CCVData, error) {
Expand Down Expand Up @@ -179,7 +180,7 @@ func (d *Sink) QueryCCVData(

// If we didn't attempt any storages, return a specific error
if attemptedCount == 0 {
return nil, fmt.Errorf("no storages eligible for read based on conditions (query time range: %d-%d)", start, end)
return nil, fmt.Errorf("no storages eligible for read based on conditions (query time range: %s-%s)", start.Format(time.RFC3339), end.Format(time.RFC3339))
}

// All eligible storages failed, return the last error
Expand Down
Loading
Loading