From 7e50b176e5e03eed2a7fb4fe1c917be7a15ae812 Mon Sep 17 00:00:00 2001 From: "Simon B.Robert" Date: Wed, 5 Nov 2025 13:16:52 -0500 Subject: [PATCH 1/4] Change indexer storage interface to use native time --- indexer/pkg/api/handlers/v1/ccv_data.go | 5 ++--- indexer/pkg/api/handlers/v1/messages.go | 2 +- indexer/pkg/api/handlers/v1/types.go | 9 ++++++++ indexer/pkg/common/storage.go | 3 ++- indexer/pkg/storage/condition.go | 10 ++++----- indexer/pkg/storage/in_memory.go | 6 ++--- indexer/pkg/storage/in_memory_test.go | 22 +++++++++---------- indexer/pkg/storage/postgres.go | 6 +++-- indexer/pkg/storage/sink.go | 5 +++-- indexer/pkg/storage/sink_test.go | 18 +++++++-------- .../ccvstreamer/indexer_storage_streamer.go | 8 +++---- integration/storageaccess/indexer_adapter.go | 12 +++++----- integration/storageaccess/types.go | 5 +++-- protocol/request_types.go | 6 +++-- 14 files changed, 66 insertions(+), 51 deletions(-) diff --git a/indexer/pkg/api/handlers/v1/ccv_data.go b/indexer/pkg/api/handlers/v1/ccv_data.go index 7e0ec41ab..cc22d37c6 100644 --- a/indexer/pkg/api/handlers/v1/ccv_data.go +++ b/indexer/pkg/api/handlers/v1/ccv_data.go @@ -8,7 +8,6 @@ import ( "github.com/smartcontractkit/chainlink-ccv/indexer/pkg/api/utils" "github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common" - "github.com/smartcontractkit/chainlink-ccv/integration/storageaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" "github.com/smartcontractkit/chainlink-common/pkg/logger" ) @@ -28,7 +27,7 @@ func NewCCVDataV1Handler(storage common.IndexerStorage, lggr logger.Logger, moni } func (h *CCVDataV1Handler) Handle(c *gin.Context) { - req := storageaccess.VerifierResultsRequest{ + req := VerifierResultsV1Request{ Start: 0, End: time.Now().UnixMilli(), SourceChainSelectors: []protocol.ChainSelector{}, @@ -53,7 +52,7 @@ func (h *CCVDataV1Handler) Handle(c *gin.Context) { req.SourceChainSelectors = sourceChainSelectors req.DestChainSelectors = destChainSelectors - ccvData, err := h.storage.QueryCCVData(c.Request.Context(), req.Start, req.End, req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset) + ccvData, err := h.storage.QueryCCVData(c.Request.Context(), time.UnixMilli(req.Start), time.UnixMilli(req.End), req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return diff --git a/indexer/pkg/api/handlers/v1/messages.go b/indexer/pkg/api/handlers/v1/messages.go index 57d274e15..6aa7778ed 100644 --- a/indexer/pkg/api/handlers/v1/messages.go +++ b/indexer/pkg/api/handlers/v1/messages.go @@ -45,7 +45,7 @@ func (h *MessagesV1Handler) Handle(c *gin.Context) { req.SourceChainSelectors = sourceChainSelectors req.DestChainSelectors = destChainSelectors - verifications, err := h.storage.QueryCCVData(c.Request.Context(), req.Start, req.End, req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset) + verifications, err := h.storage.QueryCCVData(c.Request.Context(), time.UnixMilli(req.Start), time.UnixMilli(req.End), req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return diff --git a/indexer/pkg/api/handlers/v1/types.go b/indexer/pkg/api/handlers/v1/types.go index b67bced00..876f0ebbd 100644 --- a/indexer/pkg/api/handlers/v1/types.go +++ b/indexer/pkg/api/handlers/v1/types.go @@ -15,3 +15,12 @@ type MessagesV1Request struct { Limit uint64 `form:"limit"` Offset uint64 `form:"offset"` } + +type VerifierResultsV1Request struct { + SourceChainSelectors []protocol.ChainSelector // Excluded from form due to gin parsing + DestChainSelectors []protocol.ChainSelector // Excluded from form due to gin parsing + Start int64 `form:"start"` + End int64 `form:"end"` + Limit uint64 `form:"limit"` + Offset uint64 `form:"offset"` +} diff --git a/indexer/pkg/common/storage.go b/indexer/pkg/common/storage.go index 7d3842534..5b610f547 100644 --- a/indexer/pkg/common/storage.go +++ b/indexer/pkg/common/storage.go @@ -2,6 +2,7 @@ package common import ( "context" + "time" "github.com/smartcontractkit/chainlink-ccv/protocol" ) @@ -17,7 +18,7 @@ type IndexerStorageReader interface { // GetCCVData using the messageID for a o(1) lookup GetCCVData(ctx context.Context, messageID protocol.Bytes32) ([]protocol.CCVData, error) // QueryCCVData retrieves all CCVData that matches the filter set - QueryCCVData(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error) + QueryCCVData(ctx context.Context, start, end time.Time, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error) } type IndexerStorageWriter interface { diff --git a/indexer/pkg/storage/condition.go b/indexer/pkg/storage/condition.go index b1182fcf6..81a08df05 100644 --- a/indexer/pkg/storage/condition.go +++ b/indexer/pkg/storage/condition.go @@ -73,7 +73,7 @@ func RecentRead(duration time.Duration) ReadCondition { // shouldRead determines if this storage should be read based on the condition and query parameters. // For queries without time range (like GetCCVData), start and end should both be nil. -func (rc ReadCondition) shouldRead(queryStart, queryEnd *int64) bool { +func (rc ReadCondition) shouldRead(queryStart, queryEnd *time.Time) bool { switch rc.Type { case ReadAlways: return true @@ -90,12 +90,12 @@ func (rc ReadCondition) shouldRead(queryStart, queryEnd *int64) bool { // Storage range: [rc.StartUnix, rc.EndUnix] // If storage has a start time and query has an end time, check if query ends before storage starts - if rc.StartUnix != nil && queryEnd != nil && *queryEnd < *rc.StartUnix { + if rc.StartUnix != nil && queryEnd != nil && queryEnd.Before(time.UnixMilli(*rc.StartUnix)) { return false } // If storage has an end time and query has a start time, check if query starts after storage ends - if rc.EndUnix != nil && queryStart != nil && *queryStart > *rc.EndUnix { + if rc.EndUnix != nil && queryStart != nil && queryStart.After(time.UnixMilli(*rc.EndUnix)) { return false } @@ -117,12 +117,12 @@ func (rc ReadCondition) shouldRead(queryStart, queryEnd *int64) bool { // Recent range: [recentStart, now] // If query ends before recent period starts, skip this storage - if queryEnd != nil && *queryEnd < recentStart { + if queryEnd != nil && queryEnd.Before(time.UnixMilli(recentStart)) { return false } // If query starts after now, skip this storage (querying future data) - if queryStart != nil && *queryStart > now { + if queryStart != nil && queryStart.After(time.UnixMilli(now)) { return false } diff --git a/indexer/pkg/storage/in_memory.go b/indexer/pkg/storage/in_memory.go index 7fb0e9a53..76b4a1179 100644 --- a/indexer/pkg/storage/in_memory.go +++ b/indexer/pkg/storage/in_memory.go @@ -111,7 +111,7 @@ func (i *InMemoryStorage) GetCCVData(ctx context.Context, messageID protocol.Byt } // QueryCCVData retrieves all CCVData that matches the filter set. -func (i *InMemoryStorage) QueryCCVData(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error) { +func (i *InMemoryStorage) QueryCCVData(ctx context.Context, start, end time.Time, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]protocol.CCVData, error) { startQueryMetric := time.Now() i.mu.RLock() defer i.mu.RUnlock() @@ -126,8 +126,8 @@ func (i *InMemoryStorage) QueryCCVData(ctx context.Context, start, end int64, so } // Binary search for timestamp range - startIdx := i.findTimestampIndex(time.UnixMilli(start), func(ts, target int64) bool { return ts >= target }) - endIdx := i.findTimestampIndex(time.UnixMilli(end), func(ts, target int64) bool { return ts > target }) + startIdx := i.findTimestampIndex(start, func(ts, target int64) bool { return ts >= target }) + endIdx := i.findTimestampIndex(end, func(ts, target int64) bool { return ts > target }) if startIdx >= endIdx { return make(map[string][]protocol.CCVData), nil } diff --git a/indexer/pkg/storage/in_memory_test.go b/indexer/pkg/storage/in_memory_test.go index 33088a0ce..09c8ead68 100644 --- a/indexer/pkg/storage/in_memory_test.go +++ b/indexer/pkg/storage/in_memory_test.go @@ -88,7 +88,7 @@ func TestQueryCCVDataTimestampRange(t *testing.T) { } // Query for timestamp range 1500-3500 - results, err := storage.QueryCCVData(ctx, 1500, 3500, nil, nil, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(1500), time.UnixMilli(3500), nil, nil, 100, 0) require.NoError(t, err) // Should return ccvData2 and ccvData3 @@ -113,7 +113,7 @@ func TestQueryCCVDataWithSourceChainFilter(t *testing.T) { // Query for source chain 1 sourceChains := []protocol.ChainSelector{1} - results, err := storage.QueryCCVData(ctx, 0, time.Now().UnixMilli(), sourceChains, []protocol.ChainSelector{}, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()), sourceChains, []protocol.ChainSelector{}, 100, 0) require.NoError(t, err) // Should return ccvData1 and ccvData3 @@ -138,7 +138,7 @@ func TestQueryCCVDataWithDestChainFilter(t *testing.T) { // Query for dest chain 2 destChains := []protocol.ChainSelector{2} - results, err := storage.QueryCCVData(ctx, 0, time.Now().UnixMilli(), []protocol.ChainSelector{}, destChains, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()), []protocol.ChainSelector{}, destChains, 100, 0) require.NoError(t, err) // Should return ccvData1 and ccvData3 @@ -165,7 +165,7 @@ func TestQueryCCVDataWithBothChainFilters(t *testing.T) { // Query for source chain 1 AND dest chain 2 sourceChains := []protocol.ChainSelector{1} destChains := []protocol.ChainSelector{2} - results, err := storage.QueryCCVData(ctx, 0, 9999, sourceChains, destChains, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), sourceChains, destChains, 100, 0) require.NoError(t, err) // Should return ccvData1 and ccvData4 @@ -186,7 +186,7 @@ func TestQueryCCVDataPagination(t *testing.T) { } // Test pagination: limit=2, offset=1 - results, err := storage.QueryCCVData(ctx, 0, 9999, nil, nil, 2, 1) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, nil, 2, 1) require.NoError(t, err) // Should return 2 records (indices 1 and 2) @@ -198,7 +198,7 @@ func TestQueryCCVDataEmptyResult(t *testing.T) { ctx := context.Background() // Query with no data - results, err := storage.QueryCCVData(ctx, 0, 9999, nil, nil, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, nil, 100, 0) require.NoError(t, err) assert.Empty(t, results) } @@ -213,7 +213,7 @@ func TestQueryCCVDataNoMatchingTimestamp(t *testing.T) { require.NoError(t, err) // Query for timestamp range 2000-3000 (no matches) - results, err := storage.QueryCCVData(ctx, 2000, 3000, nil, nil, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(2000), time.UnixMilli(3000), nil, nil, 100, 0) require.NoError(t, err) assert.Empty(t, results) } @@ -229,7 +229,7 @@ func TestQueryCCVDataNoMatchingChainSelector(t *testing.T) { // Query for source chain 5 (no matches) sourceChains := []protocol.ChainSelector{5} - results, err := storage.QueryCCVData(ctx, 0, 9999, nil, sourceChains, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, sourceChains, 100, 0) require.NoError(t, err) assert.Empty(t, results) } @@ -255,7 +255,7 @@ func TestConcurrentAccess(t *testing.T) { } // Verify all data was inserted - results, err := storage.QueryCCVData(ctx, 0, 9999, nil, nil, 100, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), nil, nil, 100, 0) require.NoError(t, err) assert.Len(t, results, 10) } @@ -351,7 +351,7 @@ func BenchmarkQueryCCVDataTimestampRange(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - storage.QueryCCVData(ctx, 1500, 2500, nil, nil, 100, 0) + storage.QueryCCVData(ctx, time.UnixMilli(1500), time.UnixMilli(2500), nil, nil, 100, 0) } } @@ -372,7 +372,7 @@ func BenchmarkQueryCCVDataWithChainFilter(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - storage.QueryCCVData(ctx, 0, 9999, destChains, sourceChains, 100, 0) + storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(9999), destChains, sourceChains, 100, 0) } } diff --git a/indexer/pkg/storage/postgres.go b/indexer/pkg/storage/postgres.go index abe287939..004d83863 100644 --- a/indexer/pkg/storage/postgres.go +++ b/indexer/pkg/storage/postgres.go @@ -95,7 +95,7 @@ func (d *PostgresStorage) GetCCVData(ctx context.Context, messageID protocol.Byt // QueryCCVData retrieves all CCVData that matches the filter set with pagination. func (d *PostgresStorage) QueryCCVData( ctx context.Context, - start, end int64, + start, end time.Time, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64, ) (map[string][]protocol.CCVData, error) { @@ -103,6 +103,8 @@ func (d *PostgresStorage) QueryCCVData( d.mu.RLock() defer d.mu.RUnlock() + fmt.Printf("QueryCCVData called with start=%s, end=%s, sourceChainSelectors=%v, destChainSelectors=%v, limit=%d, offset=%d\n", start.Format(time.RFC3339), end.Format(time.RFC3339), sourceChainSelectors, destChainSelectors, limit, offset) + // Build dynamic query with filters query := ` SELECT @@ -121,7 +123,7 @@ func (d *PostgresStorage) QueryCCVData( WHERE timestamp >= $1 AND timestamp <= $2 ` - args := []any{time.UnixMilli(start), time.UnixMilli(end)} + args := []any{start, end} argCounter := 3 // Add source chain selector filter if provided diff --git a/indexer/pkg/storage/sink.go b/indexer/pkg/storage/sink.go index 676c727b6..cfaa3f629 100644 --- a/indexer/pkg/storage/sink.go +++ b/indexer/pkg/storage/sink.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "time" "github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common" "github.com/smartcontractkit/chainlink-ccv/protocol" @@ -119,7 +120,7 @@ func (d *Sink) GetCCVData(ctx context.Context, messageID protocol.Bytes32) ([]pr // overlaps with each storage's configured time range. func (d *Sink) QueryCCVData( ctx context.Context, - start, end int64, + start, end time.Time, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64, ) (map[string][]protocol.CCVData, error) { @@ -179,7 +180,7 @@ func (d *Sink) QueryCCVData( // If we didn't attempt any storages, return a specific error if attemptedCount == 0 { - return nil, fmt.Errorf("no storages eligible for read based on conditions (query time range: %d-%d)", start, end) + return nil, fmt.Errorf("no storages eligible for read based on conditions (query time range: %s-%s)", start.Format(time.RFC3339), end.Format(time.RFC3339)) } // All eligible storages failed, return the last error diff --git a/indexer/pkg/storage/sink_test.go b/indexer/pkg/storage/sink_test.go index e53ea85b1..ec8749e9b 100644 --- a/indexer/pkg/storage/sink_test.go +++ b/indexer/pkg/storage/sink_test.go @@ -171,7 +171,7 @@ func TestStorageSink_QueryFromFirstStorage(t *testing.T) { require.NoError(t, err) // Query from chain - results, err := chain.QueryCCVData(ctx, 900, 1100, nil, nil, 10, 0) + results, err := chain.QueryCCVData(ctx, time.UnixMilli(900), time.UnixMilli(1100), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1) } @@ -276,12 +276,12 @@ func TestStorageSink_TimeRangeCondition_RecentDataOnly(t *testing.T) { require.NoError(t, err) // Query for recent data - should use hot storage - results, err := chain.QueryCCVData(ctx, 9000, 10000, nil, nil, 10, 0) + results, err := chain.QueryCCVData(ctx, time.UnixMilli(9000), time.UnixMilli(10000), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find recent data in hot storage") // Query for old data - should skip hot storage and use cold storage - results, err = chain.QueryCCVData(ctx, 7000, 8500, nil, nil, 10, 0) + results, err = chain.QueryCCVData(ctx, time.UnixMilli(7000), time.UnixMilli(8500), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find old data in cold storage") } @@ -341,12 +341,12 @@ func TestStorageSink_TimeRangeCondition_HotAndCold(t *testing.T) { require.NoError(t, err) // Query recent time range - should only check hot storage - results, err := chain.QueryCCVData(ctx, now-200, now, nil, nil, 10, 0) + results, err := chain.QueryCCVData(ctx, time.UnixMilli(now-200), time.UnixMilli(now), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find data in hot storage") // Query old time range - should only check cold storage - results, err = chain.QueryCCVData(ctx, now-800000, now-650000, nil, nil, 10, 0) + results, err = chain.QueryCCVData(ctx, time.UnixMilli(now-800000), time.UnixMilli(now-650000), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find data in cold storage") } @@ -460,14 +460,14 @@ func TestStorageSink_RecentReadCondition(t *testing.T) { // Query for recent data (last 45 minutes) - should use hot storage queryStart := now - 45*60*1000 // 45 minutes ago in milliseconds queryEnd := now // Current time in milliseconds - results, err := chain.QueryCCVData(ctx, queryStart, queryEnd, nil, nil, 10, 0) + results, err := chain.QueryCCVData(ctx, time.UnixMilli(queryStart), time.UnixMilli(queryEnd), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find recent data in hot storage") // Query for old data (2-3 hours ago) - should skip hot storage and use cold storage queryStart = now - 3*60*60*1000 // 3 hours ago in milliseconds queryEnd = now - 90*60*1000 // 90 minutes ago in milliseconds - results, err = chain.QueryCCVData(ctx, queryStart, queryEnd, nil, nil, 10, 0) + results, err = chain.QueryCCVData(ctx, time.UnixMilli(queryStart), time.UnixMilli(queryEnd), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find old data in cold storage") } @@ -525,12 +525,12 @@ func TestStorageSink_RecentReadCondition_ShortDuration(t *testing.T) { require.NoError(t, err) // Query for very recent data (last 45 seconds) - should use very hot storage - results, err := chain.QueryCCVData(ctx, now-45*1000, now, nil, nil, 10, 0) + results, err := chain.QueryCCVData(ctx, time.UnixMilli(now-45*1000), time.UnixMilli(now), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find very recent data in very hot storage") // Query for slightly old data (5-10 minutes ago) - should skip very hot and use warm storage - results, err = chain.QueryCCVData(ctx, now-10*60*1000, now-4*60*1000, nil, nil, 10, 0) + results, err = chain.QueryCCVData(ctx, time.UnixMilli(now-10*60*1000), time.UnixMilli(now-4*60*1000), nil, nil, 10, 0) require.NoError(t, err) assert.Len(t, results, 1, "Should find slightly old data in warm storage") } diff --git a/integration/pkg/ccvstreamer/indexer_storage_streamer.go b/integration/pkg/ccvstreamer/indexer_storage_streamer.go index 7d87a4b38..cb06b24d7 100644 --- a/integration/pkg/ccvstreamer/indexer_storage_streamer.go +++ b/integration/pkg/ccvstreamer/indexer_storage_streamer.go @@ -32,7 +32,7 @@ func NewIndexerStorageStreamer( reader: indexerConfig.IndexerClient, lggr: lggr, queryLimit: indexerConfig.QueryLimit, - lastQueryTime: indexerConfig.LastQueryTime, + lastQueryTime: time.UnixMilli(indexerConfig.LastQueryTime), pollingInterval: indexerConfig.PollingInterval, backoff: indexerConfig.Backoff, } @@ -41,7 +41,7 @@ func NewIndexerStorageStreamer( type IndexerStorageStreamer struct { reader executor.MessageReader lggr logger.Logger - lastQueryTime int64 + lastQueryTime time.Time pollingInterval time.Duration backoff time.Duration queryLimit uint64 @@ -78,7 +78,7 @@ func (oss *IndexerStorageStreamer) Start( oss.running = false oss.mu.Unlock() }() - newtime := time.Now().UnixMilli() + newtime := time.Now() offset := uint64(0) for { select { @@ -149,7 +149,7 @@ func (oss *IndexerStorageStreamer) Start( } // Update time for next iteration - newtime = time.Now().UnixMilli() + newtime = time.Now() } } }() diff --git a/integration/storageaccess/indexer_adapter.go b/integration/storageaccess/indexer_adapter.go index b9e946ce3..82d646056 100644 --- a/integration/storageaccess/indexer_adapter.go +++ b/integration/storageaccess/indexer_adapter.go @@ -40,8 +40,8 @@ func NewIndexerAPIReader(lggr logger.Logger, indexerURI string) *IndexerAPIReade type queryParams struct { SourceChainSelectors []protocol.ChainSelector DestChainSelectors []protocol.ChainSelector - Start int64 - End int64 + Start time.Time + End time.Time Limit uint64 Offset uint64 } @@ -60,11 +60,11 @@ func (i *IndexerAPIReader) makeRequest(ctx context.Context, endpoint string, par } queryValues := url.Values{} - if params.Start != 0 { - queryValues.Add("start", strconv.FormatInt(params.Start, 10)) + if !params.Start.IsZero() { + queryValues.Add("start", strconv.FormatInt(params.Start.UnixMilli(), 10)) } - if params.End != 0 { - queryValues.Add("end", strconv.FormatInt(params.End, 10)) + if !params.End.IsZero() { + queryValues.Add("end", strconv.FormatInt(params.End.UnixMilli(), 10)) } if params.Limit != 0 { queryValues.Add("limit", strconv.FormatUint(params.Limit, 10)) diff --git a/integration/storageaccess/types.go b/integration/storageaccess/types.go index 8fd35c7fc..82f8611c0 100644 --- a/integration/storageaccess/types.go +++ b/integration/storageaccess/types.go @@ -2,6 +2,7 @@ package storageaccess import ( "context" + "time" "github.com/smartcontractkit/chainlink-ccv/protocol" ) @@ -16,8 +17,8 @@ type IndexerAPI interface { type VerifierResultsRequest struct { SourceChainSelectors []protocol.ChainSelector // Excluded from form due to gin parsing DestChainSelectors []protocol.ChainSelector // Excluded from form due to gin parsing - Start int64 `form:"start"` - End int64 `form:"end"` + Start time.Time `form:"start"` + End time.Time `form:"end"` Limit uint64 `form:"limit"` Offset uint64 `form:"offset"` } diff --git a/protocol/request_types.go b/protocol/request_types.go index 5a2d12b1e..fcbf5d4cb 100644 --- a/protocol/request_types.go +++ b/protocol/request_types.go @@ -1,10 +1,12 @@ package protocol +import "time" + type MessagesV1Request struct { SourceChainSelectors []ChainSelector // Excluded from form due to gin parsing DestChainSelectors []ChainSelector // Excluded from form due to gin parsing - Start int64 `form:"start"` - End int64 `form:"end"` + Start time.Time `form:"start"` + End time.Time `form:"end"` Limit uint64 `form:"limit"` Offset uint64 `form:"offset"` } From 68b3107d296147b1362bf8dd4f29c5f608df057d Mon Sep 17 00:00:00 2001 From: "Simon B.Robert" Date: Wed, 5 Nov 2025 13:20:51 -0500 Subject: [PATCH 2/4] Linting --- indexer/pkg/storage/postgres.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/indexer/pkg/storage/postgres.go b/indexer/pkg/storage/postgres.go index 004d83863..5e1cdadf7 100644 --- a/indexer/pkg/storage/postgres.go +++ b/indexer/pkg/storage/postgres.go @@ -103,8 +103,6 @@ func (d *PostgresStorage) QueryCCVData( d.mu.RLock() defer d.mu.RUnlock() - fmt.Printf("QueryCCVData called with start=%s, end=%s, sourceChainSelectors=%v, destChainSelectors=%v, limit=%d, offset=%d\n", start.Format(time.RFC3339), end.Format(time.RFC3339), sourceChainSelectors, destChainSelectors, limit, offset) - // Build dynamic query with filters query := ` SELECT From fb408c4896717864648b46c8e1466e287a70ddd4 Mon Sep 17 00:00:00 2001 From: "Simon B.Robert" Date: Wed, 5 Nov 2025 13:34:35 -0500 Subject: [PATCH 3/4] Fix build issue --- indexer/pkg/scanner/scanner_race_test.go | 8 ++++---- indexer/pkg/storage/in_memory_race_test.go | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/indexer/pkg/scanner/scanner_race_test.go b/indexer/pkg/scanner/scanner_race_test.go index e1862a4f6..d49a9896b 100644 --- a/indexer/pkg/scanner/scanner_race_test.go +++ b/indexer/pkg/scanner/scanner_race_test.go @@ -49,7 +49,7 @@ func TestScanner_Race_MultipleReadersConcurrent(t *testing.T) { // Verify messages were stored by querying all data assert.Eventually(t, func() bool { - results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 1000, 0) + results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 1000, 0) if err != nil { return false } @@ -113,7 +113,7 @@ func TestScanner_Race_StorageWriteConcurrency(t *testing.T) { // If there's a race condition, the test will fail with -race flag expectedMessages := numReaders * messagesPerReader require.Eventually(t, func() bool { - results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 1000, 0) + results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 1000, 0) if err != nil { return false } @@ -226,7 +226,7 @@ func TestScanner_Race_ChannelOperations(t *testing.T) { // Wait for readers to start processing and produce messages // Readers may disconnect quickly, so check for message storage instead require.Eventually(t, func() bool { - results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 100, 0) + results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 100, 0) if err != nil { return false } @@ -351,7 +351,7 @@ func TestScanner_Race_ContextCancellationDuringProcessing(t *testing.T) { // Wait for readers to start processing and produce some messages // (readers may disconnect quickly, so check for message storage) require.Eventually(t, func() bool { - results, err := setup.Storage.QueryCCVData(context.Background(), 0, time.Now().UnixMilli()+1000000, nil, nil, 100, 0) + results, err := setup.Storage.QueryCCVData(context.Background(), time.UnixMilli(0), time.UnixMilli(time.Now().UnixMilli()+1000000), nil, nil, 100, 0) if err != nil { return false } diff --git a/indexer/pkg/storage/in_memory_race_test.go b/indexer/pkg/storage/in_memory_race_test.go index 3620aec43..9ee6e74de 100644 --- a/indexer/pkg/storage/in_memory_race_test.go +++ b/indexer/pkg/storage/in_memory_race_test.go @@ -44,7 +44,7 @@ func TestInMemoryStorage_Race_ConcurrentInserts(t *testing.T) { wg.Wait() // Verify all data was inserted correctly - results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 1000, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0) require.NoError(t, err) assert.Equal(t, numGoroutines*insertsPerGoroutine, len(results)) } @@ -126,7 +126,7 @@ func TestInMemoryStorage_Race_ConcurrentQueries(t *testing.T) { destChains = []protocol.ChainSelector{protocol.ChainSelector((goroutineID + 1) % 5)} } - results, err := storage.QueryCCVData(ctx, start, end, sourceChains, destChains, 50, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(start), time.UnixMilli(end), sourceChains, destChains, 50, 0) assert.NoError(t, err) assert.NotNil(t, results) } @@ -167,7 +167,7 @@ func TestInMemoryStorage_Race_MixedReadsAndWrites(t *testing.T) { assert.NoError(t, err) } else { // Read - results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 10, uint64(goroutineID%5)) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 10, uint64(goroutineID%5)) assert.NoError(t, err) assert.NotNil(t, results) } @@ -178,7 +178,7 @@ func TestInMemoryStorage_Race_MixedReadsAndWrites(t *testing.T) { wg.Wait() // Verify storage is still consistent - results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 1000, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0) require.NoError(t, err) assert.NotEmpty(t, results) } @@ -224,7 +224,7 @@ func TestInMemoryStorage_Race_HeavyConcurrentLoad(t *testing.T) { start := int64(1000 + readerID*50) end := start + 5000 sourceChains := []protocol.ChainSelector{protocol.ChainSelector(readerID % 5)} - results, err := storage.QueryCCVData(ctx, start, end, sourceChains, nil, 20, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(start), time.UnixMilli(end), sourceChains, nil, 20, 0) assert.NoError(t, err) assert.NotNil(t, results) } else { @@ -241,7 +241,7 @@ func TestInMemoryStorage_Race_HeavyConcurrentLoad(t *testing.T) { wg.Wait() // Final verification - results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, 1000, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0) require.NoError(t, err) assert.NotEmpty(t, results) } @@ -345,7 +345,7 @@ func TestInMemoryStorage_Race_ChainSelectorIndexesUnderConcurrency(t *testing.T) // Verify chain selector queries work correctly for chainID := 0; chainID < 5; chainID++ { sourceChains := []protocol.ChainSelector{protocol.ChainSelector(chainID)} - results, err := storage.QueryCCVData(ctx, 0, 99999, sourceChains, nil, 1000, 0) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), sourceChains, nil, 1000, 0) require.NoError(t, err) assert.NotEmpty(t, results, "Should find results for chain selector %d", chainID) } @@ -386,7 +386,7 @@ func TestInMemoryStorage_Race_PaginationUnderConcurrency(t *testing.T) { for j := 0; j < 10; j++ { limit := uint64(5) offset := uint64(j * 5) - results, err := storage.QueryCCVData(ctx, 0, 99999, nil, nil, limit, offset) + results, err := storage.QueryCCVData(ctx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, limit, offset) assert.NoError(t, err) assert.NotNil(t, results) // Results can vary in size due to concurrent inserts, which is expected @@ -432,7 +432,7 @@ func TestInMemoryStorage_Race_ContextCancellation(t *testing.T) { // Verify storage is still in a consistent state freshCtx := context.Background() - results, err := storage.QueryCCVData(freshCtx, 0, 99999, nil, nil, 1000, 0) + results, err := storage.QueryCCVData(freshCtx, time.UnixMilli(0), time.UnixMilli(99999), nil, nil, 1000, 0) require.NoError(t, err) assert.NotNil(t, results) } From af34459134807987624a45485363b740fe307f27 Mon Sep 17 00:00:00 2001 From: "Simon B.Robert" Date: Wed, 5 Nov 2025 13:36:43 -0500 Subject: [PATCH 4/4] Linting --- indexer/pkg/scanner/scanner_race_test.go | 1 - indexer/pkg/storage/in_memory_race_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/indexer/pkg/scanner/scanner_race_test.go b/indexer/pkg/scanner/scanner_race_test.go index d49a9896b..bd1619901 100644 --- a/indexer/pkg/scanner/scanner_race_test.go +++ b/indexer/pkg/scanner/scanner_race_test.go @@ -1,5 +1,4 @@ //go:build race -// +build race package scanner diff --git a/indexer/pkg/storage/in_memory_race_test.go b/indexer/pkg/storage/in_memory_race_test.go index 9ee6e74de..5f5b902dd 100644 --- a/indexer/pkg/storage/in_memory_race_test.go +++ b/indexer/pkg/storage/in_memory_race_test.go @@ -1,5 +1,4 @@ //go:build race -// +build race package storage