Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,38 @@ func (c *ingestCmd) Command() *cobra.Command {
FlagDefault: "oldest_ingest_ledger",
Required: true,
},
{
Name: "backfill-workers",
Usage: "Maximum concurrent workers for backfill processing. Defaults to number of CPUs. Lower values reduce RAM usage at cost of throughput.",
OptType: types.Int,
ConfigKey: &cfg.BackfillWorkers,
FlagDefault: 0,
Required: false,
},
{
Name: "backfill-batch-size",
Usage: "Number of ledgers per batch during backfill. Defaults to 250. Lower values reduce RAM usage at cost of more DB transactions.",
OptType: types.Int,
ConfigKey: &cfg.BackfillBatchSize,
FlagDefault: 250,
Required: false,
},
{
Name: "backfill-db-insert-batch-size",
Usage: "Number of ledgers to process before flushing buffer to DB during backfill. Defaults to 100. Lower values reduce RAM usage at cost of more DB transactions.",
OptType: types.Int,
ConfigKey: &cfg.BackfillDBInsertBatchSize,
FlagDefault: 100,
Required: false,
},
{
Name: "catchup-threshold",
Usage: "Number of ledgers behind network tip that triggers fast catchup via backfilling. Defaults to 100.",
OptType: types.Int,
ConfigKey: &cfg.CatchupThreshold,
FlagDefault: 100,
Required: false,
},
{
Name: "archive-url",
Usage: "Archive URL for history archives",
Expand Down
15 changes: 7 additions & 8 deletions internal/data/ingest_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ func (m *IngestStoreModel) Get(ctx context.Context, cursorName string) (uint32,
start := time.Now()
err := m.DB.GetContext(ctx, &lastSyncedLedger, `SELECT value FROM ingest_store WHERE key = $1`, cursorName)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("GetLatestLedgerSynced", "ingest_store", duration)
m.MetricsService.ObserveDBQueryDuration("Get", "ingest_store", duration)
// First run, key does not exist yet
if errors.Is(err, sql.ErrNoRows) {
m.MetricsService.IncDBQuery("GetLatestLedgerSynced", "ingest_store")
m.MetricsService.IncDBQuery("Get", "ingest_store")
return 0, nil
}
if err != nil {
m.MetricsService.IncDBQueryError("GetLatestLedgerSynced", "ingest_store", utils.GetDBErrorType(err))
m.MetricsService.IncDBQueryError("Get", "ingest_store", utils.GetDBErrorType(err))
return 0, fmt.Errorf("getting latest ledger synced for cursor %s: %w", cursorName, err)
}
m.MetricsService.IncDBQuery("GetLatestLedgerSynced", "ingest_store")
m.MetricsService.IncDBQuery("Get", "ingest_store")

return lastSyncedLedger, nil
}
Expand All @@ -50,13 +50,12 @@ func (m *IngestStoreModel) Update(ctx context.Context, dbTx db.Transaction, curs
start := time.Now()
_, err := dbTx.ExecContext(ctx, query, cursorName, ledger)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("UpdateLatestLedgerSynced", "ingest_store", duration)
m.MetricsService.ObserveDBQueryDuration("Update", "ingest_store", duration)
if err != nil {
m.MetricsService.IncDBQueryError("UpdateLatestLedgerSynced", "ingest_store", utils.GetDBErrorType(err))
m.MetricsService.IncDBQueryError("Update", "ingest_store", utils.GetDBErrorType(err))
return fmt.Errorf("updating last synced ledger to %d: %w", ledger, err)
}
m.MetricsService.IncDBQuery("UpdateLatestLedgerSynced", "ingest_store")

m.MetricsService.IncDBQuery("Update", "ingest_store")
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions internal/data/ingest_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func Test_IngestStoreModel_GetLatestLedgerSynced(t *testing.T) {

mockMetricsService := metrics.NewMockMetricsService()
mockMetricsService.
On("ObserveDBQueryDuration", "GetLatestLedgerSynced", "ingest_store", mock.Anything).Return().
On("IncDBQuery", "GetLatestLedgerSynced", "ingest_store").Return()
On("ObserveDBQueryDuration", "Get", "ingest_store", mock.Anything).Return().
On("IncDBQuery", "Get", "ingest_store").Return()
defer mockMetricsService.AssertExpectations(t)

m := &IngestStoreModel{
Expand Down Expand Up @@ -109,8 +109,8 @@ func Test_IngestStoreModel_UpdateLatestLedgerSynced(t *testing.T) {

mockMetricsService := metrics.NewMockMetricsService()
mockMetricsService.
On("ObserveDBQueryDuration", "UpdateLatestLedgerSynced", "ingest_store", mock.Anything).Return().Once().
On("IncDBQuery", "UpdateLatestLedgerSynced", "ingest_store").Return().Once()
On("ObserveDBQueryDuration", "Update", "ingest_store", mock.Anything).Return().Once().
On("IncDBQuery", "Update", "ingest_store").Return().Once()
defer mockMetricsService.AssertExpectations(t)

m := &IngestStoreModel{
Expand Down
2 changes: 1 addition & 1 deletion internal/data/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [
func (m *OperationModel) BatchInsert(
ctx context.Context,
sqlExecuter db.SQLExecuter,
operations []types.Operation,
operations []*types.Operation,
stellarAddressesByOpID map[int64]set.Set[string],
) ([]int64, error) {
if sqlExecuter == nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/data/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) {
sqlxDB, err := dbConnectionPool.SqlxDB(ctx)
require.NoError(t, err)
txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)}
_, err = txModel.BatchInsert(ctx, nil, []types.Transaction{tx1, tx2}, map[string]set.Set[string]{
_, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{
tx1.Hash: set.NewSet(kp1.Address()),
tx2.Hash: set.NewSet(kp2.Address()),
})
Expand All @@ -84,7 +84,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) {
testCases := []struct {
name string
useDBTx bool
operations []types.Operation
operations []*types.Operation
stellarAddressesByOpID map[int64]set.Set[string]
wantAccountLinks map[int64][]string
wantErrContains string
Expand All @@ -93,7 +93,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) {
{
name: "🟢successful_insert_without_dbTx",
useDBTx: false,
operations: []types.Operation{op1, op2},
operations: []*types.Operation{&op1, &op2},
stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address(), kp1.Address(), kp1.Address(), kp1.Address()), op2.ID: set.NewSet(kp2.Address(), kp2.Address())},
wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}, op2.ID: {kp2.Address()}},
wantErrContains: "",
Expand All @@ -102,7 +102,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) {
{
name: "🟢successful_insert_with_dbTx",
useDBTx: true,
operations: []types.Operation{op1},
operations: []*types.Operation{&op1},
stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address())},
wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}},
wantErrContains: "",
Expand All @@ -111,7 +111,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) {
{
name: "🟢empty_input",
useDBTx: false,
operations: []types.Operation{},
operations: []*types.Operation{},
stellarAddressesByOpID: map[int64]set.Set[string]{},
wantAccountLinks: map[int64][]string{},
wantErrContains: "",
Expand All @@ -120,7 +120,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) {
{
name: "🟡duplicate_operation",
useDBTx: false,
operations: []types.Operation{op1, op1},
operations: []*types.Operation{&op1, &op1},
stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address())},
wantAccountLinks: map[int64][]string{op1.ID: {kp1.Address()}},
wantErrContains: "",
Expand Down
2 changes: 1 addition & 1 deletion internal/data/statechanges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestStateChangeModel_BatchInsert(t *testing.T) {
sqlxDB, err := dbConnectionPool.SqlxDB(ctx)
require.NoError(t, err)
txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)}
_, err = txModel.BatchInsert(ctx, nil, []types.Transaction{tx1, tx2}, map[string]set.Set[string]{
_, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{
tx1.Hash: set.NewSet(kp1.Address()),
tx2.Hash: set.NewSet(kp2.Address()),
})
Expand Down
2 changes: 1 addition & 1 deletion internal/data/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (m *TransactionModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs
func (m *TransactionModel) BatchInsert(
ctx context.Context,
sqlExecuter db.SQLExecuter,
txs []types.Transaction,
txs []*types.Transaction,
stellarAddressesByTxHash map[string]set.Set[string],
) ([]string, error) {
if sqlExecuter == nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/data/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) {
testCases := []struct {
name string
useDBTx bool
txs []types.Transaction
txs []*types.Transaction
stellarAddressesByHash map[string]set.Set[string]
wantAccountLinks map[string][]string
wantErrContains string
Expand All @@ -67,7 +67,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) {
{
name: "🟢successful_insert_without_dbTx",
useDBTx: false,
txs: []types.Transaction{tx1, tx2},
txs: []*types.Transaction{&tx1, &tx2},
stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address()), tx2.Hash: set.NewSet(kp2.Address())},
wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}, tx2.Hash: {kp2.Address()}},
wantErrContains: "",
Expand All @@ -76,7 +76,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) {
{
name: "🟢successful_insert_with_dbTx",
useDBTx: true,
txs: []types.Transaction{tx1},
txs: []*types.Transaction{&tx1},
stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address())},
wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}},
wantErrContains: "",
Expand All @@ -85,7 +85,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) {
{
name: "🟢empty_input",
useDBTx: false,
txs: []types.Transaction{},
txs: []*types.Transaction{},
stellarAddressesByHash: map[string]set.Set[string]{},
wantAccountLinks: map[string][]string{},
wantErrContains: "",
Expand All @@ -94,7 +94,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) {
{
name: "🟡duplicate_transaction",
useDBTx: false,
txs: []types.Transaction{tx1, tx1},
txs: []*types.Transaction{&tx1, &tx1},
stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address())},
wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}},
wantErrContains: "",
Expand Down
9 changes: 5 additions & 4 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ type IndexerBufferInterface interface {
GetAllParticipants() []string
GetNumberOfTransactions() int
GetNumberOfOperations() int
GetTransactions() []types.Transaction
GetOperations() []types.Operation
GetTransactions() []*types.Transaction
GetOperations() []*types.Operation
GetStateChanges() []types.StateChange
GetTrustlineChanges() []types.TrustlineChange
GetContractChanges() []types.ContractChange
PushContractChange(contractChange types.ContractChange)
PushTrustlineChange(trustlineChange types.TrustlineChange)
MergeBuffer(other IndexerBufferInterface)
Merge(other IndexerBufferInterface)
Clear()
}

type TokenTransferProcessorInterface interface {
Expand Down Expand Up @@ -123,7 +124,7 @@ func (i *Indexer) ProcessLedgerTransactions(ctx context.Context, transactions []
// Merge buffers and count participants
totalParticipants := 0
for idx, buffer := range txnBuffers {
ledgerBuffer.MergeBuffer(buffer)
ledgerBuffer.Merge(buffer)
totalParticipants += participantCounts[idx]
}

Expand Down
81 changes: 53 additions & 28 deletions internal/indexer/indexer_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func (b *IndexerBuffer) GetNumberOfOperations() int {

// GetTransactions returns all unique transactions.
// Thread-safe: uses read lock.
func (b *IndexerBuffer) GetTransactions() []types.Transaction {
func (b *IndexerBuffer) GetTransactions() []*types.Transaction {
b.mu.RLock()
defer b.mu.RUnlock()

txs := make([]types.Transaction, 0, len(b.txByHash))
txs := make([]*types.Transaction, 0, len(b.txByHash))
for _, txPtr := range b.txByHash {
txs = append(txs, *txPtr)
txs = append(txs, txPtr)
}

return txs
Expand Down Expand Up @@ -195,15 +195,14 @@ func (b *IndexerBuffer) PushOperation(participant string, operation types.Operat
}

// GetOperations returns all unique operations from the canonical storage.
// Returns values (not pointers) for API compatibility.
// Thread-safe: uses read lock.
func (b *IndexerBuffer) GetOperations() []types.Operation {
func (b *IndexerBuffer) GetOperations() []*types.Operation {
b.mu.RLock()
defer b.mu.RUnlock()

ops := make([]types.Operation, 0, len(b.opByID))
ops := make([]*types.Operation, 0, len(b.opByID))
for _, opPtr := range b.opByID {
ops = append(ops, *opPtr)
ops = append(ops, opPtr)
}
return ops
}
Expand Down Expand Up @@ -260,7 +259,17 @@ func (b *IndexerBuffer) GetStateChanges() []types.StateChange {
return b.stateChanges
}

// MergeBuffer merges another IndexerBuffer into this buffer. This is used to combine
// GetAllParticipants returns all unique participants (Stellar addresses) that have been
// recorded during transaction, operation, and state change processing.
// Thread-safe: uses read lock.
func (b *IndexerBuffer) GetAllParticipants() []string {
b.mu.RLock()
defer b.mu.RUnlock()

return b.allParticipants.ToSlice()
}

// Merge merges another IndexerBuffer into this buffer. This is used to combine
// per-ledger or per-transaction buffers into a single buffer for batch DB insertion.
//
// MERGE STRATEGY:
Expand All @@ -280,7 +289,7 @@ func (b *IndexerBuffer) GetStateChanges() []types.StateChange {
// Zero temporary allocations - uses direct map/set manipulation.
//
// Thread-safe: acquires write lock on this buffer, read lock on other buffer.
func (b *IndexerBuffer) MergeBuffer(other IndexerBufferInterface) {
func (b *IndexerBuffer) Merge(other IndexerBufferInterface) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -296,24 +305,28 @@ func (b *IndexerBuffer) MergeBuffer(other IndexerBufferInterface) {
// Merge transactions (canonical storage) - this establishes our canonical pointers
maps.Copy(b.txByHash, otherBuffer.txByHash)
for txHash, otherParticipants := range otherBuffer.participantsByTxHash {
if _, exists := b.participantsByTxHash[txHash]; !exists {
b.participantsByTxHash[txHash] = set.NewSet[string]()
}
// Iterate other's set, add participants from OUR txByHash
for participant := range otherParticipants.Iter() {
b.participantsByTxHash[txHash].Add(participant) // O(1) Add
if existing, exists := b.participantsByTxHash[txHash]; exists {
// Merge into existing set - iterate and add (Union creates new set)
for participant := range otherParticipants.Iter() {
existing.Add(participant)
}
} else {
// Clone the set instead of creating empty + iterating
b.participantsByTxHash[txHash] = otherParticipants.Clone()
}
}

// Merge operations (canonical storage)
maps.Copy(b.opByID, otherBuffer.opByID)
for opID, otherParticipants := range otherBuffer.participantsByOpID {
if _, exists := b.participantsByOpID[opID]; !exists {
b.participantsByOpID[opID] = set.NewSet[string]()
}
// Iterate other's set, add canonical pointers from OUR opByID
for participant := range otherParticipants.Iter() {
b.participantsByOpID[opID].Add(participant) // O(1) Add
if existing, exists := b.participantsByOpID[opID]; exists {
// Merge into existing set - iterate and add (Union creates new set)
for participant := range otherParticipants.Iter() {
existing.Add(participant)
}
} else {
// Clone the set instead of creating empty + iterating
b.participantsByOpID[opID] = otherParticipants.Clone()
}
}

Expand All @@ -332,12 +345,24 @@ func (b *IndexerBuffer) MergeBuffer(other IndexerBufferInterface) {
}
}

// GetAllParticipants returns all unique participants (Stellar addresses) that have been
// recorded during transaction, operation, and state change processing.
// Thread-safe: uses read lock.
func (b *IndexerBuffer) GetAllParticipants() []string {
b.mu.RLock()
defer b.mu.RUnlock()
// Clear resets the buffer to its initial empty state while preserving allocated capacity.
// Use this to reuse the buffer after flushing data to the database during backfill.
// Thread-safe: acquires write lock.
func (b *IndexerBuffer) Clear() {
b.mu.Lock()
defer b.mu.Unlock()

return b.allParticipants.ToSlice()
// Clear maps (keep allocated backing arrays)
clear(b.txByHash)
clear(b.participantsByTxHash)
clear(b.opByID)
clear(b.participantsByOpID)

// Reset slices (reuse underlying arrays by slicing to zero)
b.stateChanges = b.stateChanges[:0]
b.trustlineChanges = b.trustlineChanges[:0]
b.contractChanges = b.contractChanges[:0]

// Clear all participants set
b.allParticipants.Clear()
}
Loading
Loading