diff --git a/cmd/ingest.go b/cmd/ingest.go index 314ae3615..6db9f32c3 100644 --- a/cmd/ingest.go +++ b/cmd/ingest.go @@ -61,6 +61,38 @@ func (c *ingestCmd) Command() *cobra.Command { FlagDefault: "oldest_ingest_ledger", Required: true, }, + { + Name: "backfill-workers", + Usage: "Maximum concurrent workers for backfill processing. Defaults to number of CPUs. Lower values reduce RAM usage at cost of throughput.", + OptType: types.Int, + ConfigKey: &cfg.BackfillWorkers, + FlagDefault: 0, + Required: false, + }, + { + Name: "backfill-batch-size", + Usage: "Number of ledgers per batch during backfill. Defaults to 250. Lower values reduce RAM usage at cost of more DB transactions.", + OptType: types.Int, + ConfigKey: &cfg.BackfillBatchSize, + FlagDefault: 250, + Required: false, + }, + { + Name: "backfill-db-insert-batch-size", + Usage: "Number of ledgers to process before flushing buffer to DB during backfill. Defaults to 100. Lower values reduce RAM usage at cost of more DB transactions.", + OptType: types.Int, + ConfigKey: &cfg.BackfillDBInsertBatchSize, + FlagDefault: 100, + Required: false, + }, + { + Name: "catchup-threshold", + Usage: "Number of ledgers behind network tip that triggers fast catchup via backfilling. Defaults to 100.", + OptType: types.Int, + ConfigKey: &cfg.CatchupThreshold, + FlagDefault: 100, + Required: false, + }, { Name: "archive-url", Usage: "Archive URL for history archives", diff --git a/internal/data/ingest_store.go b/internal/data/ingest_store.go index f1126d5b7..c28de9295 100644 --- a/internal/data/ingest_store.go +++ b/internal/data/ingest_store.go @@ -27,17 +27,17 @@ func (m *IngestStoreModel) Get(ctx context.Context, cursorName string) (uint32, start := time.Now() err := m.DB.GetContext(ctx, &lastSyncedLedger, `SELECT value FROM ingest_store WHERE key = $1`, cursorName) duration := time.Since(start).Seconds() - m.MetricsService.ObserveDBQueryDuration("GetLatestLedgerSynced", "ingest_store", duration) + m.MetricsService.ObserveDBQueryDuration("Get", "ingest_store", duration) // First run, key does not exist yet if errors.Is(err, sql.ErrNoRows) { - m.MetricsService.IncDBQuery("GetLatestLedgerSynced", "ingest_store") + m.MetricsService.IncDBQuery("Get", "ingest_store") return 0, nil } if err != nil { - m.MetricsService.IncDBQueryError("GetLatestLedgerSynced", "ingest_store", utils.GetDBErrorType(err)) + m.MetricsService.IncDBQueryError("Get", "ingest_store", utils.GetDBErrorType(err)) return 0, fmt.Errorf("getting latest ledger synced for cursor %s: %w", cursorName, err) } - m.MetricsService.IncDBQuery("GetLatestLedgerSynced", "ingest_store") + m.MetricsService.IncDBQuery("Get", "ingest_store") return lastSyncedLedger, nil } @@ -50,13 +50,12 @@ func (m *IngestStoreModel) Update(ctx context.Context, dbTx db.Transaction, curs start := time.Now() _, err := dbTx.ExecContext(ctx, query, cursorName, ledger) duration := time.Since(start).Seconds() - m.MetricsService.ObserveDBQueryDuration("UpdateLatestLedgerSynced", "ingest_store", duration) + m.MetricsService.ObserveDBQueryDuration("Update", "ingest_store", duration) if err != nil { - m.MetricsService.IncDBQueryError("UpdateLatestLedgerSynced", "ingest_store", utils.GetDBErrorType(err)) + m.MetricsService.IncDBQueryError("Update", "ingest_store", utils.GetDBErrorType(err)) return fmt.Errorf("updating last synced ledger to %d: %w", ledger, err) } - m.MetricsService.IncDBQuery("UpdateLatestLedgerSynced", "ingest_store") - + m.MetricsService.IncDBQuery("Update", "ingest_store") return nil } diff --git a/internal/data/ingest_store_test.go b/internal/data/ingest_store_test.go index 695348e41..ef8bd907e 100644 --- a/internal/data/ingest_store_test.go +++ b/internal/data/ingest_store_test.go @@ -52,8 +52,8 @@ func Test_IngestStoreModel_GetLatestLedgerSynced(t *testing.T) { mockMetricsService := metrics.NewMockMetricsService() mockMetricsService. - On("ObserveDBQueryDuration", "GetLatestLedgerSynced", "ingest_store", mock.Anything).Return(). - On("IncDBQuery", "GetLatestLedgerSynced", "ingest_store").Return() + On("ObserveDBQueryDuration", "Get", "ingest_store", mock.Anything).Return(). + On("IncDBQuery", "Get", "ingest_store").Return() defer mockMetricsService.AssertExpectations(t) m := &IngestStoreModel{ @@ -109,8 +109,8 @@ func Test_IngestStoreModel_UpdateLatestLedgerSynced(t *testing.T) { mockMetricsService := metrics.NewMockMetricsService() mockMetricsService. - On("ObserveDBQueryDuration", "UpdateLatestLedgerSynced", "ingest_store", mock.Anything).Return().Once(). - On("IncDBQuery", "UpdateLatestLedgerSynced", "ingest_store").Return().Once() + On("ObserveDBQueryDuration", "Update", "ingest_store", mock.Anything).Return().Once(). + On("IncDBQuery", "Update", "ingest_store").Return().Once() defer mockMetricsService.AssertExpectations(t) m := &IngestStoreModel{ diff --git a/internal/data/operations.go b/internal/data/operations.go index 5d3990d60..02b88b18b 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -241,7 +241,7 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [ func (m *OperationModel) BatchInsert( ctx context.Context, sqlExecuter db.SQLExecuter, - operations []types.Operation, + operations []*types.Operation, stellarAddressesByOpID map[int64]set.Set[string], ) ([]int64, error) { if sqlExecuter == nil { diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index 1c4607134..a25e59dc3 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -60,7 +60,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []types.Transaction{tx1, tx2}, map[string]set.Set[string]{ + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{ tx1.Hash: set.NewSet(kp1.Address()), tx2.Hash: set.NewSet(kp2.Address()), }) @@ -84,7 +84,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { testCases := []struct { name string useDBTx bool - operations []types.Operation + operations []*types.Operation stellarAddressesByOpID map[int64]set.Set[string] wantAccountLinks map[int64][]string wantErrContains string @@ -93,7 +93,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { { name: "🟢successful_insert_without_dbTx", useDBTx: false, - operations: []types.Operation{op1, op2}, + operations: []*types.Operation{&op1, &op2}, stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address(), kp1.Address(), kp1.Address(), kp1.Address()), op2.ID: set.NewSet(kp2.Address(), kp2.Address())}, wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}, op2.ID: {kp2.Address()}}, wantErrContains: "", @@ -102,7 +102,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { { name: "🟢successful_insert_with_dbTx", useDBTx: true, - operations: []types.Operation{op1}, + operations: []*types.Operation{&op1}, stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address())}, wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}}, wantErrContains: "", @@ -111,7 +111,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { { name: "🟢empty_input", useDBTx: false, - operations: []types.Operation{}, + operations: []*types.Operation{}, stellarAddressesByOpID: map[int64]set.Set[string]{}, wantAccountLinks: map[int64][]string{}, wantErrContains: "", @@ -120,7 +120,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { { name: "🟡duplicate_operation", useDBTx: false, - operations: []types.Operation{op1, op1}, + operations: []*types.Operation{&op1, &op1}, stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address())}, wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}}, wantErrContains: "", diff --git a/internal/data/statechanges_test.go b/internal/data/statechanges_test.go index ddb4dc7d3..d21f97afc 100644 --- a/internal/data/statechanges_test.go +++ b/internal/data/statechanges_test.go @@ -60,7 +60,7 @@ func TestStateChangeModel_BatchInsert(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []types.Transaction{tx1, tx2}, map[string]set.Set[string]{ + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{ tx1.Hash: set.NewSet(kp1.Address()), tx2.Hash: set.NewSet(kp2.Address()), }) diff --git a/internal/data/transactions.go b/internal/data/transactions.go index 1d47a1562..1cefab84d 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -167,7 +167,7 @@ func (m *TransactionModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs func (m *TransactionModel) BatchInsert( ctx context.Context, sqlExecuter db.SQLExecuter, - txs []types.Transaction, + txs []*types.Transaction, stellarAddressesByTxHash map[string]set.Set[string], ) ([]string, error) { if sqlExecuter == nil { diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index 92d4f3bc8..5d56ae2df 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -58,7 +58,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { testCases := []struct { name string useDBTx bool - txs []types.Transaction + txs []*types.Transaction stellarAddressesByHash map[string]set.Set[string] wantAccountLinks map[string][]string wantErrContains string @@ -67,7 +67,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { { name: "🟢successful_insert_without_dbTx", useDBTx: false, - txs: []types.Transaction{tx1, tx2}, + txs: []*types.Transaction{&tx1, &tx2}, stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address()), tx2.Hash: set.NewSet(kp2.Address())}, wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}, tx2.Hash: {kp2.Address()}}, wantErrContains: "", @@ -76,7 +76,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { { name: "🟢successful_insert_with_dbTx", useDBTx: true, - txs: []types.Transaction{tx1}, + txs: []*types.Transaction{&tx1}, stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address())}, wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}}, wantErrContains: "", @@ -85,7 +85,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { { name: "🟢empty_input", useDBTx: false, - txs: []types.Transaction{}, + txs: []*types.Transaction{}, stellarAddressesByHash: map[string]set.Set[string]{}, wantAccountLinks: map[string][]string{}, wantErrContains: "", @@ -94,7 +94,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { { name: "🟡duplicate_transaction", useDBTx: false, - txs: []types.Transaction{tx1, tx1}, + txs: []*types.Transaction{&tx1, &tx1}, stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address())}, wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}}, wantErrContains: "", diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 8d58e1180..e41c78f36 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -33,14 +33,15 @@ type IndexerBufferInterface interface { GetAllParticipants() []string GetNumberOfTransactions() int GetNumberOfOperations() int - GetTransactions() []types.Transaction - GetOperations() []types.Operation + GetTransactions() []*types.Transaction + GetOperations() []*types.Operation GetStateChanges() []types.StateChange GetTrustlineChanges() []types.TrustlineChange GetContractChanges() []types.ContractChange PushContractChange(contractChange types.ContractChange) PushTrustlineChange(trustlineChange types.TrustlineChange) - MergeBuffer(other IndexerBufferInterface) + Merge(other IndexerBufferInterface) + Clear() } type TokenTransferProcessorInterface interface { @@ -123,7 +124,7 @@ func (i *Indexer) ProcessLedgerTransactions(ctx context.Context, transactions [] // Merge buffers and count participants totalParticipants := 0 for idx, buffer := range txnBuffers { - ledgerBuffer.MergeBuffer(buffer) + ledgerBuffer.Merge(buffer) totalParticipants += participantCounts[idx] } diff --git a/internal/indexer/indexer_buffer.go b/internal/indexer/indexer_buffer.go index aae3f7c13..c03502a25 100644 --- a/internal/indexer/indexer_buffer.go +++ b/internal/indexer/indexer_buffer.go @@ -127,13 +127,13 @@ func (b *IndexerBuffer) GetNumberOfOperations() int { // GetTransactions returns all unique transactions. // Thread-safe: uses read lock. -func (b *IndexerBuffer) GetTransactions() []types.Transaction { +func (b *IndexerBuffer) GetTransactions() []*types.Transaction { b.mu.RLock() defer b.mu.RUnlock() - txs := make([]types.Transaction, 0, len(b.txByHash)) + txs := make([]*types.Transaction, 0, len(b.txByHash)) for _, txPtr := range b.txByHash { - txs = append(txs, *txPtr) + txs = append(txs, txPtr) } return txs @@ -195,15 +195,14 @@ func (b *IndexerBuffer) PushOperation(participant string, operation types.Operat } // GetOperations returns all unique operations from the canonical storage. -// Returns values (not pointers) for API compatibility. // Thread-safe: uses read lock. -func (b *IndexerBuffer) GetOperations() []types.Operation { +func (b *IndexerBuffer) GetOperations() []*types.Operation { b.mu.RLock() defer b.mu.RUnlock() - ops := make([]types.Operation, 0, len(b.opByID)) + ops := make([]*types.Operation, 0, len(b.opByID)) for _, opPtr := range b.opByID { - ops = append(ops, *opPtr) + ops = append(ops, opPtr) } return ops } @@ -260,7 +259,17 @@ func (b *IndexerBuffer) GetStateChanges() []types.StateChange { return b.stateChanges } -// MergeBuffer merges another IndexerBuffer into this buffer. This is used to combine +// GetAllParticipants returns all unique participants (Stellar addresses) that have been +// recorded during transaction, operation, and state change processing. +// Thread-safe: uses read lock. +func (b *IndexerBuffer) GetAllParticipants() []string { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.allParticipants.ToSlice() +} + +// Merge merges another IndexerBuffer into this buffer. This is used to combine // per-ledger or per-transaction buffers into a single buffer for batch DB insertion. // // MERGE STRATEGY: @@ -280,7 +289,7 @@ func (b *IndexerBuffer) GetStateChanges() []types.StateChange { // Zero temporary allocations - uses direct map/set manipulation. // // Thread-safe: acquires write lock on this buffer, read lock on other buffer. -func (b *IndexerBuffer) MergeBuffer(other IndexerBufferInterface) { +func (b *IndexerBuffer) Merge(other IndexerBufferInterface) { b.mu.Lock() defer b.mu.Unlock() @@ -296,24 +305,28 @@ func (b *IndexerBuffer) MergeBuffer(other IndexerBufferInterface) { // Merge transactions (canonical storage) - this establishes our canonical pointers maps.Copy(b.txByHash, otherBuffer.txByHash) for txHash, otherParticipants := range otherBuffer.participantsByTxHash { - if _, exists := b.participantsByTxHash[txHash]; !exists { - b.participantsByTxHash[txHash] = set.NewSet[string]() - } - // Iterate other's set, add participants from OUR txByHash - for participant := range otherParticipants.Iter() { - b.participantsByTxHash[txHash].Add(participant) // O(1) Add + if existing, exists := b.participantsByTxHash[txHash]; exists { + // Merge into existing set - iterate and add (Union creates new set) + for participant := range otherParticipants.Iter() { + existing.Add(participant) + } + } else { + // Clone the set instead of creating empty + iterating + b.participantsByTxHash[txHash] = otherParticipants.Clone() } } // Merge operations (canonical storage) maps.Copy(b.opByID, otherBuffer.opByID) for opID, otherParticipants := range otherBuffer.participantsByOpID { - if _, exists := b.participantsByOpID[opID]; !exists { - b.participantsByOpID[opID] = set.NewSet[string]() - } - // Iterate other's set, add canonical pointers from OUR opByID - for participant := range otherParticipants.Iter() { - b.participantsByOpID[opID].Add(participant) // O(1) Add + if existing, exists := b.participantsByOpID[opID]; exists { + // Merge into existing set - iterate and add (Union creates new set) + for participant := range otherParticipants.Iter() { + existing.Add(participant) + } + } else { + // Clone the set instead of creating empty + iterating + b.participantsByOpID[opID] = otherParticipants.Clone() } } @@ -332,12 +345,24 @@ func (b *IndexerBuffer) MergeBuffer(other IndexerBufferInterface) { } } -// GetAllParticipants returns all unique participants (Stellar addresses) that have been -// recorded during transaction, operation, and state change processing. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetAllParticipants() []string { - b.mu.RLock() - defer b.mu.RUnlock() +// Clear resets the buffer to its initial empty state while preserving allocated capacity. +// Use this to reuse the buffer after flushing data to the database during backfill. +// Thread-safe: acquires write lock. +func (b *IndexerBuffer) Clear() { + b.mu.Lock() + defer b.mu.Unlock() - return b.allParticipants.ToSlice() + // Clear maps (keep allocated backing arrays) + clear(b.txByHash) + clear(b.participantsByTxHash) + clear(b.opByID) + clear(b.participantsByOpID) + + // Reset slices (reuse underlying arrays by slicing to zero) + b.stateChanges = b.stateChanges[:0] + b.trustlineChanges = b.trustlineChanges[:0] + b.contractChanges = b.contractChanges[:0] + + // Clear all participants set + b.allParticipants.Clear() } diff --git a/internal/indexer/indexer_buffer_test.go b/internal/indexer/indexer_buffer_test.go index 80568efad..12991ea57 100644 --- a/internal/indexer/indexer_buffer_test.go +++ b/internal/indexer/indexer_buffer_test.go @@ -45,7 +45,7 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) // Assert GetAllTransactions - assert.ElementsMatch(t, []types.Transaction{tx1, tx2}, indexerBuffer.GetTransactions()) + assert.ElementsMatch(t, []*types.Transaction{&tx1, &tx2}, indexerBuffer.GetTransactions()) }) t.Run("🟢 concurrent pushes", func(t *testing.T) { @@ -268,7 +268,7 @@ func TestIndexerBuffer_GetAllTransactions(t *testing.T) { allTxs := indexerBuffer.GetTransactions() require.Len(t, allTxs, 2) - assert.ElementsMatch(t, []types.Transaction{tx1, tx2}, allTxs) + assert.ElementsMatch(t, []*types.Transaction{&tx1, &tx2}, allTxs) }) } @@ -303,7 +303,7 @@ func TestIndexerBuffer_GetAllOperations(t *testing.T) { allOps := indexerBuffer.GetOperations() require.Len(t, allOps, 2) - assert.ElementsMatch(t, []types.Operation{op1, op2}, allOps) + assert.ElementsMatch(t, []*types.Operation{&op1, &op2}, allOps) }) } @@ -428,12 +428,12 @@ func TestIndexerBuffer_GetAllParticipants(t *testing.T) { }) } -func TestIndexerBuffer_MergeBuffer(t *testing.T) { +func TestIndexerBuffer_Merge(t *testing.T) { t.Run("🟢 merge empty buffers", func(t *testing.T) { buffer1 := NewIndexerBuffer() buffer2 := NewIndexerBuffer() - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) assert.Equal(t, 0, buffer1.GetNumberOfTransactions()) assert.Len(t, buffer1.GetStateChanges(), 0) }) @@ -448,12 +448,12 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer1.PushTransaction("alice", tx1) buffer2.PushTransaction("bob", tx2) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) // Verify transactions allTxs := buffer1.GetTransactions() assert.Len(t, allTxs, 2) - assert.ElementsMatch(t, []types.Transaction{tx1, tx2}, allTxs) + assert.ElementsMatch(t, []*types.Transaction{&tx1, &tx2}, allTxs) // Verify transaction participants txParticipants := buffer1.GetTransactionsParticipants() @@ -472,12 +472,12 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer1.PushOperation("alice", op1, tx1) buffer2.PushOperation("bob", op2, tx1) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) // Verify operations allOps := buffer1.GetOperations() assert.Len(t, allOps, 2) - assert.ElementsMatch(t, []types.Operation{op1, op2}, allOps) + assert.ElementsMatch(t, []*types.Operation{&op1, &op2}, allOps) // Verify operation participants opParticipants := buffer1.GetOperationsParticipants() @@ -498,7 +498,7 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer1.PushStateChange(tx, op, sc1) buffer2.PushStateChange(tx, op, sc2) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) // Verify state changes allStateChanges := buffer1.GetStateChanges() @@ -524,7 +524,7 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer2.PushTransaction("charlie", tx2) buffer2.PushOperation("bob", op1, tx1) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) // Verify transactions allTxs := buffer1.GetTransactions() @@ -552,7 +552,7 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer2.PushOperation("bob", op1, tx1) buffer2.PushStateChange(tx1, op1, sc1) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) assert.Equal(t, 1, buffer1.GetNumberOfTransactions()) assert.Len(t, buffer1.GetOperations(), 1) @@ -566,7 +566,7 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { tx1 := types.Transaction{Hash: "tx_hash_1"} buffer1.PushTransaction("alice", tx1) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) assert.Equal(t, 1, buffer1.GetNumberOfTransactions()) }) @@ -589,12 +589,12 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { go func() { defer wg.Done() - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) }() go func() { defer wg.Done() - buffer1.MergeBuffer(buffer3) + buffer1.Merge(buffer3) }() wg.Wait() @@ -624,7 +624,7 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer2.PushOperation("bob", op2, tx2) buffer2.PushStateChange(tx2, op2, sc2) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) // Verify transactions allTxs := buffer1.GetTransactions() @@ -662,7 +662,7 @@ func TestIndexerBuffer_MergeBuffer(t *testing.T) { buffer2.PushTransaction("charlie", tx2) buffer2.PushTransaction("dave", tx2) - buffer1.MergeBuffer(buffer2) + buffer1.Merge(buffer2) // Verify all participants merged allParticipants := buffer1.GetAllParticipants() diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 216d34576..6f3df4944 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -83,6 +83,18 @@ type Configs struct { // EnableParticipantFiltering controls whether to filter ingested data by pre-registered accounts. // When false (default), all data is stored. When true, only data for pre-registered accounts is stored. EnableParticipantFiltering bool + // BackfillWorkers limits concurrent batch processing during backfill. + // Defaults to runtime.NumCPU(). Lower values reduce RAM usage. + BackfillWorkers int + // BackfillBatchSize is the number of ledgers processed per batch during backfill. + // Defaults to 250. Lower values reduce RAM usage at cost of more DB transactions. + BackfillBatchSize int + // BackfillDBInsertBatchSize is the number of ledgers to process before flushing to DB. + // Defaults to 50. Lower values reduce RAM usage at cost of more DB transactions. + BackfillDBInsertBatchSize int + // CatchupThreshold is the number of ledgers behind network tip that triggers fast catchup. + // Defaults to 100. + CatchupThreshold int } func Ingest(cfg Configs) error { @@ -208,6 +220,10 @@ func setupDeps(cfg Configs) (services.IngestService, error) { SkipTxMeta: cfg.SkipTxMeta, SkipTxEnvelope: cfg.SkipTxEnvelope, EnableParticipantFiltering: cfg.EnableParticipantFiltering, + BackfillWorkers: cfg.BackfillWorkers, + BackfillBatchSize: cfg.BackfillBatchSize, + BackfillDBInsertBatchSize: cfg.BackfillDBInsertBatchSize, + CatchupThreshold: cfg.CatchupThreshold, }) if err != nil { return nil, fmt.Errorf("instantiating ingest service: %w", err) diff --git a/internal/integrationtests/catchup_test.go b/internal/integrationtests/catchup_test.go new file mode 100644 index 000000000..39e8ad2e7 --- /dev/null +++ b/internal/integrationtests/catchup_test.go @@ -0,0 +1,102 @@ +// catchup_test.go tests the catchup backfilling during live ingestion. +package integrationtests + +import ( + "context" + "time" + + "github.com/stellar/go/support/log" + "github.com/stretchr/testify/suite" + + "github.com/stellar/wallet-backend/internal/integrationtests/infrastructure" +) + +// CatchupTestSuite tests the automatic catchup backfilling that occurs when +// the ingest service falls behind the network tip by more than the catchup threshold. +type CatchupTestSuite struct { + suite.Suite + testEnv *infrastructure.TestEnvironment +} + +// TestCatchupDuringLiveIngestion validates that when the ingest service is behind +// the network by more than the catchup threshold, it triggers parallel backfilling +// to catch up efficiently. +func (suite *CatchupTestSuite) TestCatchupDuringLiveIngestion() { + ctx := context.Background() + containers := suite.testEnv.Containers + + // Phase 1: Record initial state (live ingest is already running) + log.Ctx(ctx).Info("Phase 1: Recording initial state") + + initialOldest, err := containers.GetIngestCursor(ctx, "oldest_ingest_ledger") + suite.Require().NoError(err, "failed to get initial oldest_ingest_ledger cursor") + + initialLatest, err := containers.GetIngestCursor(ctx, "latest_ingest_ledger") + suite.Require().NoError(err, "failed to get initial latest_ingest_ledger cursor") + + log.Ctx(ctx).Infof("Initial state: oldest=%d, latest=%d", initialOldest, initialLatest) + + // Phase 2: Stop ingest container to simulate falling behind + log.Ctx(ctx).Info("Phase 2: Stopping ingest container") + + err = containers.StopIngestContainer(ctx) + suite.Require().NoError(err, "failed to stop ingest container") + + // Phase 3: Wait for network to advance beyond catchup threshold + // We'll use a low threshold of 5 ledgers for faster testing + log.Ctx(ctx).Info("Phase 3: Waiting for network to advance") + + targetLedger := initialLatest + 20 // Need to be behind by more than catchup threshold (5) + err = containers.WaitForNetworkAdvance(ctx, suite.testEnv.RPCService, targetLedger, 2*time.Minute) + suite.Require().NoError(err, "network did not advance to target ledger") + + log.Ctx(ctx).Infof("Network advanced to ledger %d", targetLedger) + + // Phase 4: Restart ingest container with low catchup threshold + log.Ctx(ctx).Info("Phase 4: Restarting ingest container with low catchup threshold") + + err = containers.RestartIngestContainer(ctx, map[string]string{ + "CATCHUP_THRESHOLD": "5", // Trigger catchup if behind by 5+ ledgers + }) + suite.Require().NoError(err, "failed to restart ingest container") + + // Phase 5: Wait for catchup to complete + log.Ctx(ctx).Info("Phase 5: Waiting for catchup to complete") + + err = containers.WaitForLatestLedgerToReach(ctx, targetLedger, 3*time.Minute) + suite.Require().NoError(err, "catchup did not complete in time") + + log.Ctx(ctx).Info("Catchup completed") + + // Phase 6: Validate results + log.Ctx(ctx).Info("Phase 6: Validating results") + + // Verify catchup was triggered by checking logs + logs, err := containers.GetIngestContainerLogs(ctx) + suite.Require().NoError(err, "failed to get ingest container logs") + suite.Assert().Contains(logs, "Doing optimized catchup to the tip", + "should see catchup triggered log message") + + // Verify oldest cursor stayed the same (catchup doesn't change oldest) + newOldest, err := containers.GetIngestCursor(ctx, "oldest_ingest_ledger") + suite.Require().NoError(err, "failed to get new oldest_ingest_ledger cursor") + suite.Assert().Equal(initialOldest, newOldest, + "oldest cursor should not change during catchup") + + // Verify latest cursor caught up + newLatest, err := containers.GetIngestCursor(ctx, "latest_ingest_ledger") + suite.Require().NoError(err, "failed to get new latest_ingest_ledger cursor") + suite.Assert().GreaterOrEqual(newLatest, targetLedger, + "should have caught up to target ledger") + + // Verify no gaps exist in the ledger range we caught up + gapCount, err := containers.GetLedgerGapCount(ctx, initialLatest, newLatest) + suite.Require().NoError(err, "failed to get ledger gap count") + suite.Assert().Equal(0, gapCount, + "should have no gaps after catchup") + + log.Ctx(ctx).Infof("Validation passed: oldest=%d, latest=%d (was %d), gaps=%d", + newOldest, newLatest, initialLatest, gapCount) + + log.Ctx(ctx).Info("All catchup validations passed successfully!") +} diff --git a/internal/integrationtests/infrastructure/backfill_helpers.go b/internal/integrationtests/infrastructure/backfill_helpers.go index a393cdb39..ec5532dc7 100644 --- a/internal/integrationtests/infrastructure/backfill_helpers.go +++ b/internal/integrationtests/infrastructure/backfill_helpers.go @@ -147,6 +147,59 @@ func (s *SharedContainers) GetStateChangeCountForLedgerRange(ctx context.Context return count, nil } +// GetLedgerGapCount counts the number of missing ledgers (gaps) in the transactions table +// within the specified ledger range. Returns 0 if there are no gaps. +func (s *SharedContainers) GetLedgerGapCount(ctx context.Context, startLedger, endLedger uint32) (int, error) { + dbURL, err := s.GetWalletDBConnectionString(ctx) + if err != nil { + return 0, fmt.Errorf("getting database connection string: %w", err) + } + db, err := sql.Open("postgres", dbURL) + if err != nil { + return 0, fmt.Errorf("opening database connection: %w", err) + } + defer db.Close() //nolint:errcheck + + // Count the number of distinct ledgers that have transactions in the range + // Then compare with expected count to find gaps + var distinctLedgers int + query := ` + SELECT COUNT(DISTINCT ledger_number) + FROM transactions + WHERE ledger_number BETWEEN $1 AND $2 + ` + err = db.QueryRowContext(ctx, query, startLedger, endLedger).Scan(&distinctLedgers) + if err != nil { + return 0, fmt.Errorf("counting distinct ledgers: %w", err) + } + + // Note: Not all ledgers will have transactions, so we can't simply compare + // against expected range. Instead, use window function to find actual gaps. + var gapCount int + gapQuery := ` + WITH ledger_sequence AS ( + SELECT DISTINCT ledger_number + FROM transactions + WHERE ledger_number BETWEEN $1 AND $2 + ORDER BY ledger_number + ), + gaps AS ( + SELECT + ledger_number, + LEAD(ledger_number) OVER (ORDER BY ledger_number) AS next_ledger, + LEAD(ledger_number) OVER (ORDER BY ledger_number) - ledger_number - 1 AS gap_size + FROM ledger_sequence + ) + SELECT COALESCE(SUM(gap_size), 0) FROM gaps WHERE gap_size > 0 + ` + err = db.QueryRowContext(ctx, gapQuery, startLedger, endLedger).Scan(&gapCount) + if err != nil { + return 0, fmt.Errorf("counting ledger gaps: %w", err) + } + + return gapCount, nil +} + // WaitForBackfillCompletion polls until the oldest_ingest_ledger cursor reaches the expected value. func (s *SharedContainers) WaitForBackfillCompletion(ctx context.Context, expectedOldestLedger uint32, timeout time.Duration) error { ticker := time.NewTicker(2 * time.Second) diff --git a/internal/integrationtests/infrastructure/restart_ingest.go b/internal/integrationtests/infrastructure/restart_ingest.go index 8fe679a49..c5a47097b 100644 --- a/internal/integrationtests/infrastructure/restart_ingest.go +++ b/internal/integrationtests/infrastructure/restart_ingest.go @@ -3,7 +3,13 @@ package infrastructure import ( "context" "fmt" + "io" "sync/atomic" + "time" + + "github.com/stellar/go/support/log" + + "github.com/stellar/wallet-backend/internal/services" ) // restartCounter is used to generate unique container names for restarted ingest containers @@ -37,3 +43,97 @@ func (s *SharedContainers) RestartIngestContainer(ctx context.Context, extraEnv return nil } + +// StopIngestContainer stops the ingest container without terminating it. +// This allows the network to advance while the container is stopped. +func (s *SharedContainers) StopIngestContainer(ctx context.Context) error { + if s.WalletBackendContainer.Ingest == nil { + return fmt.Errorf("ingest container is not running") + } + + if err := s.WalletBackendContainer.Ingest.Stop(ctx, nil); err != nil { + return fmt.Errorf("stopping ingest container: %w", err) + } + + log.Ctx(ctx).Info("🛑 Stopped ingest container") + return nil +} + +// GetIngestContainerLogs returns the logs from the ingest container. +func (s *SharedContainers) GetIngestContainerLogs(ctx context.Context) (string, error) { + if s.WalletBackendContainer.Ingest == nil { + return "", fmt.Errorf("ingest container is not running") + } + + logsReader, err := s.WalletBackendContainer.Ingest.Logs(ctx) + if err != nil { + return "", fmt.Errorf("getting ingest container logs: %w", err) + } + defer logsReader.Close() //nolint:errcheck + + logBytes, err := io.ReadAll(logsReader) + if err != nil { + return "", fmt.Errorf("reading ingest container logs: %w", err) + } + + return string(logBytes), nil +} + +// WaitForNetworkAdvance polls the RPC service until the network reaches the target ledger. +func (s *SharedContainers) WaitForNetworkAdvance(ctx context.Context, rpcService services.RPCService, targetLedger uint32, timeout time.Duration) error { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + timeoutChan := time.After(timeout) + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled: %w", ctx.Err()) + case <-timeoutChan: + return fmt.Errorf("network did not reach ledger %d within %v", targetLedger, timeout) + case <-ticker.C: + health, err := rpcService.GetHealth() + if err != nil { + log.Ctx(ctx).Warnf("Error getting RPC health during network advance wait: %v", err) + continue + } + + currentLedger := health.LatestLedger + log.Ctx(ctx).Infof("Network advance progress: current=%d, target=%d", currentLedger, targetLedger) + + if currentLedger >= targetLedger { + log.Ctx(ctx).Infof("Network reached target ledger %d", targetLedger) + return nil + } + } + } +} + +// WaitForLatestLedgerToReach polls the database until the latest_ingest_ledger cursor reaches the target. +func (s *SharedContainers) WaitForLatestLedgerToReach(ctx context.Context, targetLedger uint32, timeout time.Duration) error { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + timeoutChan := time.After(timeout) + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled: %w", ctx.Err()) + case <-timeoutChan: + return fmt.Errorf("latest_ingest_ledger did not reach %d within %v", targetLedger, timeout) + case <-ticker.C: + latest, err := s.GetIngestCursor(ctx, "latest_ingest_ledger") + if err != nil { + log.Ctx(ctx).Warnf("Error getting latest cursor during catchup wait: %v", err) + continue + } + + log.Ctx(ctx).Infof("Catchup progress: latest=%d, target=%d", latest, targetLedger) + + if latest >= targetLedger { + log.Ctx(ctx).Infof("Catchup completed: latest_ingest_ledger=%d reached target=%d", latest, targetLedger) + return nil + } + } + } +} diff --git a/internal/integrationtests/main_test.go b/internal/integrationtests/main_test.go index e1a2aedf8..fc0859e1b 100644 --- a/internal/integrationtests/main_test.go +++ b/internal/integrationtests/main_test.go @@ -42,6 +42,13 @@ func TestIntegrationTests(t *testing.T) { }) }) + // Test catchup backfilling during live ingestion + t.Run("CatchupTestSuite", func(t *testing.T) { + suite.Run(t, &CatchupTestSuite{ + testEnv: testEnv, + }) + }) + // Phase 1: Validate balances from checkpoint before fixture transactions t.Run("AccountBalancesAfterCheckpointTestSuite", func(t *testing.T) { suite.Run(t, &AccountBalancesAfterCheckpointTestSuite{ diff --git a/internal/serve/httphandler/health_test.go b/internal/serve/httphandler/health_test.go index f51d326a1..d294336aa 100644 --- a/internal/serve/httphandler/health_test.go +++ b/internal/serve/httphandler/health_test.go @@ -31,8 +31,8 @@ func TestHealthHandler_GetHealth(t *testing.T) { defer dbConnectionPool.Close() mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("ObserveDBQueryDuration", "GetLatestLedgerSynced", "ingest_store", mock.AnythingOfType("float64")).Return() - mockMetricsService.On("IncDBQuery", "GetLatestLedgerSynced", "ingest_store").Return() + mockMetricsService.On("ObserveDBQueryDuration", "Get", "ingest_store", mock.AnythingOfType("float64")).Return() + mockMetricsService.On("IncDBQuery", "Get", "ingest_store").Return() defer mockMetricsService.AssertExpectations(t) models, err := data.NewModels(dbConnectionPool, mockMetricsService) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 6c7305e22..a34e4be4d 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/fnv" "io" + "runtime" "sort" "strings" "time" @@ -31,8 +32,6 @@ import ( var ErrAlreadyInSync = errors.New("ingestion is already in sync") const ( - // BackfillBatchSize is the number of ledgers processed per parallel batch during backfill. - BackfillBatchSize uint32 = 250 // HistoricalBufferLedgers is the number of ledgers to keep before latestRPCLedger // to avoid racing with live finalization during parallel processing. HistoricalBufferLedgers uint32 = 5 @@ -71,6 +70,10 @@ type IngestServiceConfig struct { SkipTxMeta bool SkipTxEnvelope bool EnableParticipantFiltering bool + BackfillWorkers int + BackfillBatchSize int + BackfillDBInsertBatchSize int + CatchupThreshold int } // BackfillBatch represents a contiguous range of ledgers to process as a unit. @@ -144,6 +147,9 @@ type ingestService struct { archive historyarchive.ArchiveInterface enableParticipantFiltering bool backfillPool pond.Pool + backfillBatchSize uint32 + backfillDBInsertBatchSize uint32 + catchupThreshold uint32 } func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) { @@ -151,7 +157,13 @@ func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) { ledgerIndexerPool := pond.NewPool(0) cfg.MetricsService.RegisterPoolMetrics("ledger_indexer", ledgerIndexerPool) - backfillPool := pond.NewPool(0) + // Create backfill pool with bounded size to control memory usage. + // Default to NumCPU if not specified. + backfillWorkers := cfg.BackfillWorkers + if backfillWorkers <= 0 { + backfillWorkers = runtime.NumCPU() + } + backfillPool := pond.NewPool(backfillWorkers) cfg.MetricsService.RegisterPoolMetrics("backfill", backfillPool) return &ingestService{ @@ -174,6 +186,9 @@ func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) { archive: cfg.Archive, enableParticipantFiltering: cfg.EnableParticipantFiltering, backfillPool: backfillPool, + backfillBatchSize: uint32(cfg.BackfillBatchSize), + backfillDBInsertBatchSize: uint32(cfg.BackfillDBInsertBatchSize), + catchupThreshold: uint32(cfg.CatchupThreshold), }, nil } @@ -185,7 +200,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u case IngestionModeLive: return m.startLiveIngestion(ctx) case IngestionModeBackfill: - return m.startBackfilling(ctx, startLedger, endLedger) + return m.startBackfilling(ctx, startLedger, endLedger, BackfillModeHistorical) default: return fmt.Errorf("unsupported ingestion mode %q, must be %q or %q", m.ingestionMode, IngestionModeLive, IngestionModeBackfill) } @@ -290,9 +305,9 @@ func (m *ingestService) getLedgerTransactions(ctx context.Context, xdrLedgerClos // filteredIngestionData holds the filtered data for ingestion type filteredIngestionData struct { - txs []types.Transaction + txs []*types.Transaction txParticipants map[string]set.Set[string] - ops []types.Operation + ops []*types.Operation opParticipants map[int64]set.Set[string] stateChanges []types.StateChange } @@ -302,9 +317,9 @@ type filteredIngestionData struct { // If a transaction/operation has ANY registered participant, it is included with ALL its participants. func (m *ingestService) filterByRegisteredAccounts( ctx context.Context, - txs []types.Transaction, + txs []*types.Transaction, txParticipants map[string]set.Set[string], - ops []types.Operation, + ops []*types.Operation, opParticipants map[int64]set.Set[string], stateChanges []types.StateChange, allParticipants []string, @@ -329,7 +344,7 @@ func (m *ingestService) filterByRegisteredAccounts( } } - filteredTxs := make([]types.Transaction, 0, txHashesToInclude.Cardinality()) + filteredTxs := make([]*types.Transaction, 0, txHashesToInclude.Cardinality()) filteredTxParticipants := make(map[string]set.Set[string]) for _, tx := range txs { if txHashesToInclude.Contains(tx.Hash) { @@ -349,7 +364,7 @@ func (m *ingestService) filterByRegisteredAccounts( } } - filteredOps := make([]types.Operation, 0, opIDsToInclude.Cardinality()) + filteredOps := make([]*types.Operation, 0, opIDsToInclude.Cardinality()) filteredOpParticipants := make(map[int64]set.Set[string]) for _, op := range ops { if opIDsToInclude.Contains(op.ID) { @@ -434,7 +449,7 @@ func (m *ingestService) ingestProcessedData(ctx context.Context, indexerBuffer i } // insertTransactions batch inserts transactions with their participants into the database. -func (m *ingestService) insertTransactions(ctx context.Context, dbTx db.Transaction, txs []types.Transaction, stellarAddressesByTxHash map[string]set.Set[string]) error { +func (m *ingestService) insertTransactions(ctx context.Context, dbTx db.Transaction, txs []*types.Transaction, stellarAddressesByTxHash map[string]set.Set[string]) error { if len(txs) == 0 { return nil } @@ -447,7 +462,7 @@ func (m *ingestService) insertTransactions(ctx context.Context, dbTx db.Transact } // insertOperations batch inserts operations with their participants into the database. -func (m *ingestService) insertOperations(ctx context.Context, dbTx db.Transaction, ops []types.Operation, stellarAddressesByOpID map[int64]set.Set[string]) error { +func (m *ingestService) insertOperations(ctx context.Context, dbTx db.Transaction, ops []*types.Operation, stellarAddressesByOpID map[int64]set.Set[string]) error { if len(ops) == 0 { return nil } @@ -491,7 +506,7 @@ func (m *ingestService) recordStateChangeMetrics(stateChanges []types.StateChang } // unlockChannelAccounts unlocks the channel accounts associated with the given transaction XDRs. -func (m *ingestService) unlockChannelAccounts(ctx context.Context, dbTx db.Transaction, txs []types.Transaction) error { +func (m *ingestService) unlockChannelAccounts(ctx context.Context, dbTx db.Transaction, txs []*types.Transaction) error { if len(txs) == 0 { return nil } diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index 6fb1e5fb4..44e722e1b 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -3,19 +3,32 @@ package services import ( "context" "fmt" - "sync" "time" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/indexer" ) -// startBackfilling processes historical ledgers in the specified range, -// identifying gaps and processing them in parallel batches. -func (m *ingestService) startBackfilling(ctx context.Context, startLedger, endLedger uint32) error { +// BackfillMode indicates the purpose of backfilling. +type BackfillMode int + +const ( + // BackfillModeHistorical fills gaps within already-ingested ledger range. + BackfillModeHistorical BackfillMode = iota + // BackfillModeCatchup fills forward gaps to catch up to network tip. + BackfillModeCatchup +) + +// startBackfilling processes ledgers in the specified range, identifying gaps +// and processing them in parallel batches. The mode parameter determines: +// - BackfillModeHistorical: fills gaps within already-ingested range +// - BackfillModeCatchup: catches up to network tip from latest ingested ledger +func (m *ingestService) startBackfilling(ctx context.Context, startLedger, endLedger uint32, mode BackfillMode) error { if startLedger > endLedger { return fmt.Errorf("start ledger cannot be greater than end ledger") } @@ -24,20 +37,36 @@ func (m *ingestService) startBackfilling(ctx context.Context, startLedger, endLe if err != nil { return fmt.Errorf("getting latest ledger cursor: %w", err) } - if endLedger > latestIngestedLedger { - return fmt.Errorf("end ledger %d cannot be greater than latest ingested ledger %d for backfilling", endLedger, latestIngestedLedger) + + // Validate based on mode + switch mode { + case BackfillModeHistorical: + if endLedger > latestIngestedLedger { + return fmt.Errorf("end ledger %d cannot be greater than latest ingested ledger %d for backfilling", endLedger, latestIngestedLedger) + } + case BackfillModeCatchup: + if startLedger != latestIngestedLedger+1 { + return fmt.Errorf("catchup must start from ledger %d (latestIngestedLedger + 1), got %d", latestIngestedLedger+1, startLedger) + } } - gaps, err := m.calculateBackfillGaps(ctx, startLedger, endLedger) - if err != nil { - return fmt.Errorf("calculating backfill gaps: %w", err) + // Determine gaps to fill based on mode + var gaps []data.LedgerRange + if mode == BackfillModeCatchup { + // For catchup, treat entire range as a single gap (no existing data in this range) + gaps = []data.LedgerRange{{GapStart: startLedger, GapEnd: endLedger}} + } else { + gaps, err = m.calculateBackfillGaps(ctx, startLedger, endLedger) + if err != nil { + return fmt.Errorf("calculating backfill gaps: %w", err) + } } if len(gaps) == 0 { log.Ctx(ctx).Infof("No gaps to backfill in range [%d - %d]", startLedger, endLedger) return nil } - backfillBatches := m.splitGapsIntoBatches(gaps, BackfillBatchSize) + backfillBatches := m.splitGapsIntoBatches(gaps) startTime := time.Now() results := m.processBackfillBatchesParallel(ctx, backfillBatches) duration := time.Since(startTime) @@ -47,9 +76,16 @@ func (m *ingestService) startBackfilling(ctx context.Context, startLedger, endLe return fmt.Errorf("backfilling failed: %d/%d batches failed", len(analysis.failedBatches), len(backfillBatches)) } - // Update oldest ledger cursor on success if configured - if err := m.updateOldestLedgerCursor(ctx, startLedger); err != nil { - return fmt.Errorf("updating cursor: %w", err) + // Update cursors based on mode + switch mode { + case BackfillModeHistorical: + if err := m.updateOldestLedgerCursor(ctx, startLedger); err != nil { + return fmt.Errorf("updating oldest cursor: %w", err) + } + case BackfillModeCatchup: + if err := m.updateLatestLedgerCursorAfterCatchup(ctx, endLedger); err != nil { + return fmt.Errorf("updating latest cursor after catchup: %w", err) + } } log.Ctx(ctx).Infof("Backfilling completed in %v: %d batches, %d ledgers", duration, analysis.successCount, analysis.totalLedgers) @@ -119,13 +155,13 @@ func (m *ingestService) calculateBackfillGaps(ctx context.Context, startLedger, } // splitGapsIntoBatches divides ledger gaps into fixed-size batches for parallel processing. -func (m *ingestService) splitGapsIntoBatches(gaps []data.LedgerRange, batchSize uint32) []BackfillBatch { +func (m *ingestService) splitGapsIntoBatches(gaps []data.LedgerRange) []BackfillBatch { var batches []BackfillBatch for _, gap := range gaps { start := gap.GapStart for start <= gap.GapEnd { - end := min(start+batchSize-1, gap.GapEnd) + end := min(start+m.backfillBatchSize-1, gap.GapEnd) batches = append(batches, BackfillBatch{ StartLedger: start, EndLedger: end, @@ -141,16 +177,11 @@ func (m *ingestService) splitGapsIntoBatches(gaps []data.LedgerRange, batchSize func (m *ingestService) processBackfillBatchesParallel(ctx context.Context, batches []BackfillBatch) []BackfillResult { results := make([]BackfillResult, len(batches)) group := m.backfillPool.NewGroupContext(ctx) - var mu sync.Mutex for i, batch := range batches { - idx := i - b := batch group.Submit(func() { - result := m.processSingleBatch(ctx, b) - mu.Lock() - results[idx] = result - mu.Unlock() + result := m.processSingleBatch(ctx, batch) + results[i] = result }) } @@ -187,6 +218,11 @@ func (m *ingestService) processSingleBatch(ctx context.Context, batch BackfillBa return result } + // Process each ledger in the batch using a single shared buffer. + // Periodically flush to DB to control memory usage. + batchBuffer := indexer.NewIndexerBuffer() + ledgersInBuffer := uint32(0) + // Process each ledger in the batch sequentially for ledgerSeq := batch.StartLedger; ledgerSeq <= batch.EndLedger; ledgerSeq++ { ledgerMeta, err := m.getLedgerWithRetry(ctx, backend, ledgerSeq) @@ -196,24 +232,61 @@ func (m *ingestService) processSingleBatch(ctx context.Context, batch BackfillBa return result } - err = m.processLedger(ctx, ledgerMeta) + err = m.processBackfillLedger(ctx, ledgerMeta, batchBuffer) if err != nil { result.Error = fmt.Errorf("processing ledger %d: %w", ledgerSeq, err) result.Duration = time.Since(start) return result } - result.LedgersCount++ + ledgersInBuffer++ + + // Flush buffer periodically to control memory usage + if ledgersInBuffer >= m.backfillDBInsertBatchSize { + if err := m.ingestProcessedData(ctx, batchBuffer); err != nil { + result.Error = fmt.Errorf("ingesting data for ledgers ending at %d: %w", ledgerSeq, err) + result.Duration = time.Since(start) + return result + } + batchBuffer.Clear() + ledgersInBuffer = 0 + } + } + + // Flush remaining data in buffer + if ledgersInBuffer > 0 { + if err := m.ingestProcessedData(ctx, batchBuffer); err != nil { + result.Error = fmt.Errorf("ingesting final data for batch [%d - %d]: %w", batch.StartLedger, batch.EndLedger, err) + result.Duration = time.Since(start) + return result + } } result.Duration = time.Since(start) - m.metricsService.ObserveIngestionPhaseDuration("backfill_batch", result.Duration.Seconds()) - log.Ctx(ctx).Infof("Batch [%d-%d] completed: %d ledgers in %v", + log.Ctx(ctx).Infof("Batch [%d - %d] completed: %d ledgers in %v", batch.StartLedger, batch.EndLedger, result.LedgersCount, result.Duration) return result } +// processBackfillLedger processes a ledger and populates the provided buffer. +func (m *ingestService) processBackfillLedger(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, buffer *indexer.IndexerBuffer) error { + ledgerSeq := ledgerMeta.LedgerSequence() + + // Get transactions from ledger + transactions, err := m.getLedgerTransactions(ctx, ledgerMeta) + if err != nil { + return fmt.Errorf("getting transactions for ledger %d: %w", ledgerSeq, err) + } + + // Process transactions and populate buffer (combined collection + processing) + _, err = m.ledgerIndexer.ProcessLedgerTransactions(ctx, transactions, buffer) + if err != nil { + return fmt.Errorf("processing transactions for ledger %d: %w", ledgerSeq, err) + } + return nil +} + // updateOldestLedgerCursor updates the oldest ledger cursor during backfill with metrics tracking. func (m *ingestService) updateOldestLedgerCursor(ctx context.Context, currentLedger uint32) error { cursorStart := time.Now() @@ -230,3 +303,20 @@ func (m *ingestService) updateOldestLedgerCursor(ctx context.Context, currentLed m.metricsService.ObserveIngestionPhaseDuration("oldest_cursor_update", time.Since(cursorStart).Seconds()) return nil } + +// updateLatestLedgerCursorAfterCatchup updates the latest ledger cursor after catchup completes. +func (m *ingestService) updateLatestLedgerCursorAfterCatchup(ctx context.Context, ledger uint32) error { + cursorStart := time.Now() + err := db.RunInTransaction(ctx, m.models.DB, nil, func(dbTx db.Transaction) error { + if updateErr := m.models.IngestStore.Update(ctx, dbTx, m.latestLedgerCursorName, ledger); updateErr != nil { + return fmt.Errorf("updating latest synced ledger: %w", updateErr) + } + m.metricsService.SetLatestLedgerIngested(float64(ledger)) + return nil + }) + if err != nil { + return fmt.Errorf("updating cursors: %w", err) + } + m.metricsService.ObserveIngestionPhaseDuration("catchup_cursor_update", time.Since(cursorStart).Seconds()) + return nil +} diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 293e8d784..b09e55445 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -50,6 +50,23 @@ func (m *ingestService) startLiveIngestion(ctx context.Context) error { if err != nil { return fmt.Errorf("initializing cursors: %w", err) } + } else { + // If we already have data in the DB, we will do an optimized catchup by parallely backfilling the ledgers. + health, err := m.rpcService.GetHealth() + if err != nil { + return fmt.Errorf("getting health check result from RPC: %w", err) + } + + networkLatestLedger := health.LatestLedger + if networkLatestLedger > startLedger && (networkLatestLedger-startLedger) >= m.catchupThreshold { + log.Ctx(ctx).Infof("Wallet backend has fallen behind network tip by %d ledgers. Doing optimized catchup to the tip: %d", networkLatestLedger-startLedger, networkLatestLedger) + err := m.startBackfilling(ctx, startLedger, networkLatestLedger, BackfillModeCatchup) + if err != nil { + return fmt.Errorf("catching up to network tip: %w", err) + } + // Update startLedger to continue from where catchup ended + startLedger = networkLatestLedger + 1 + } } // Start unbounded ingestion from latest ledger ingested onwards diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 8e4c7c26e..dd5d5da0c 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -2,8 +2,10 @@ package services import ( "context" + "fmt" "testing" + "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/network" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" @@ -238,7 +240,8 @@ func Test_ingestService_splitGapsIntoBatches(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result := svc.splitGapsIntoBatches(tc.gaps, tc.batchSize) + svc.backfillBatchSize = tc.batchSize + result := svc.splitGapsIntoBatches(tc.gaps) assert.Equal(t, tc.expected, result) }) } @@ -405,3 +408,136 @@ func Test_ingestService_calculateBackfillGaps(t *testing.T) { }) } } + +func Test_BackfillMode_Validation(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + + testCases := []struct { + name string + mode BackfillMode + startLedger uint32 + endLedger uint32 + latestIngested uint32 + expectValidationError bool + errorContains string + }{ + { + name: "historical_mode_valid_range", + mode: BackfillModeHistorical, + startLedger: 50, + endLedger: 80, + latestIngested: 100, + expectValidationError: false, + }, + { + name: "historical_mode_end_exceeds_latest", + mode: BackfillModeHistorical, + startLedger: 50, + endLedger: 150, + latestIngested: 100, + expectValidationError: true, + errorContains: "end ledger 150 cannot be greater than latest ingested ledger 100", + }, + { + name: "catchup_mode_valid_range", + mode: BackfillModeCatchup, + startLedger: 101, + endLedger: 150, + latestIngested: 100, + expectValidationError: false, + }, + { + name: "catchup_mode_start_not_next_ledger", + mode: BackfillModeCatchup, + startLedger: 105, + endLedger: 150, + latestIngested: 100, + expectValidationError: true, + errorContains: "catchup must start from ledger 101", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Clean up + _, err := dbConnectionPool.ExecContext(ctx, "DELETE FROM transactions") + require.NoError(t, err) + _, err = dbConnectionPool.ExecContext(ctx, "DELETE FROM ingest_store") + require.NoError(t, err) + + // Set up latest ingested ledger cursor + _, err = dbConnectionPool.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ('latest_ledger_cursor', $1)`, + tc.latestIngested) + require.NoError(t, err) + _, err = dbConnectionPool.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ('oldest_ledger_cursor', $1)`, + tc.latestIngested) + require.NoError(t, err) + + mockMetricsService := metrics.NewMockMetricsService() + mockMetricsService.On("RegisterPoolMetrics", "ledger_indexer", mock.Anything).Return() + mockMetricsService.On("RegisterPoolMetrics", "backfill", mock.Anything).Return() + mockMetricsService.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("IncDBQuery", mock.Anything, mock.Anything).Return().Maybe() + defer mockMetricsService.AssertExpectations(t) + + models, err := data.NewModels(dbConnectionPool, mockMetricsService) + require.NoError(t, err) + + mockAppTracker := apptracker.MockAppTracker{} + mockRPCService := RPCServiceMock{} + mockRPCService.On("NetworkPassphrase").Return(network.TestNetworkPassphrase).Maybe() + mockChAccStore := &store.ChannelAccountStoreMock{} + mockLedgerBackend := &LedgerBackendMock{} + mockArchive := &HistoryArchiveMock{} + + // Create a mock ledger backend factory that returns an error immediately + // This allows validation to pass but stops batch processing early + mockBackendFactory := func(ctx context.Context) (ledgerbackend.LedgerBackend, error) { + return nil, fmt.Errorf("mock backend factory error") + } + + svc, err := NewIngestService(IngestServiceConfig{ + IngestionMode: IngestionModeBackfill, + Models: models, + LatestLedgerCursorName: "latest_ledger_cursor", + OldestLedgerCursorName: "oldest_ledger_cursor", + AppTracker: &mockAppTracker, + RPCService: &mockRPCService, + LedgerBackend: mockLedgerBackend, + LedgerBackendFactory: mockBackendFactory, + ChannelAccountStore: mockChAccStore, + MetricsService: mockMetricsService, + GetLedgersLimit: defaultGetLedgersLimit, + Network: network.TestNetworkPassphrase, + NetworkPassphrase: network.TestNetworkPassphrase, + Archive: mockArchive, + SkipTxMeta: false, + SkipTxEnvelope: false, + BackfillBatchSize: 100, + }) + require.NoError(t, err) + + err = svc.startBackfilling(ctx, tc.startLedger, tc.endLedger, tc.mode) + if tc.expectValidationError { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errorContains) + } else { + // For valid cases, validation passes but batch processing fails + // The error will be wrapped as "backfilling failed: X/Y batches failed" + require.Error(t, err) + assert.Contains(t, err.Error(), "backfilling failed") + // Ensure it's NOT a validation error + assert.NotContains(t, err.Error(), "cannot be greater than latest ingested ledger") + assert.NotContains(t, err.Error(), "catchup must start from ledger") + } + }) + } +}