diff --git a/internal/data/accounts.go b/internal/data/accounts.go index 9dc18f96a..54fd55c5f 100644 --- a/internal/data/accounts.go +++ b/internal/data/accounts.go @@ -154,23 +154,23 @@ func (m *AccountModel) IsAccountFeeBumpEligible(ctx context.Context, address str return exists, nil } -// BatchGetByTxHashes gets the accounts that are associated with the given transaction hashes. -func (m *AccountModel) BatchGetByTxHashes(ctx context.Context, txHashes []string, columns string) ([]*types.AccountWithTxHash, error) { +// BatchGetByToIDs gets the accounts that are associated with the given transaction ToIDs. +func (m *AccountModel) BatchGetByToIDs(ctx context.Context, toIDs []int64, columns string) ([]*types.AccountWithToID, error) { query := ` - SELECT account_id AS stellar_address, tx_hash + SELECT account_id AS stellar_address, tx_to_id FROM transactions_accounts - WHERE tx_hash = ANY($1)` - var accounts []*types.AccountWithTxHash + WHERE tx_to_id = ANY($1)` + var accounts []*types.AccountWithToID start := time.Now() - err := m.DB.SelectContext(ctx, &accounts, query, pq.Array(txHashes)) + err := m.DB.SelectContext(ctx, &accounts, query, pq.Array(toIDs)) duration := time.Since(start).Seconds() - m.MetricsService.ObserveDBQueryDuration("BatchGetByTxHashes", "accounts", duration) - m.MetricsService.ObserveDBBatchSize("BatchGetByTxHashes", "accounts", len(txHashes)) + m.MetricsService.ObserveDBQueryDuration("BatchGetByToIDs", "accounts", duration) + m.MetricsService.ObserveDBBatchSize("BatchGetByToIDs", "accounts", len(toIDs)) if err != nil { - m.MetricsService.IncDBQueryError("BatchGetByTxHashes", "accounts", utils.GetDBErrorType(err)) - return nil, fmt.Errorf("getting accounts by transaction hashes: %w", err) + m.MetricsService.IncDBQueryError("BatchGetByToIDs", "accounts", utils.GetDBErrorType(err)) + return nil, fmt.Errorf("getting accounts by transaction ToIDs: %w", err) } - m.MetricsService.IncDBQuery("BatchGetByTxHashes", "accounts") + m.MetricsService.IncDBQuery("BatchGetByToIDs", "accounts") return accounts, nil } diff --git a/internal/data/accounts_test.go b/internal/data/accounts_test.go index 507e0425f..6a3641fe3 100644 --- a/internal/data/accounts_test.go +++ b/internal/data/accounts_test.go @@ -230,7 +230,7 @@ func TestAccountModelGet(t *testing.T) { assert.Equal(t, address, account.StellarAddress) } -func TestAccountModelBatchGetByTxHashes(t *testing.T) { +func TestAccountModelBatchGetByToIDs(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) @@ -238,9 +238,9 @@ func TestAccountModelBatchGetByTxHashes(t *testing.T) { defer dbConnectionPool.Close() mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("ObserveDBQueryDuration", "BatchGetByTxHashes", "accounts", mock.Anything).Return() - mockMetricsService.On("IncDBQuery", "BatchGetByTxHashes", "accounts").Return() - mockMetricsService.On("ObserveDBBatchSize", "BatchGetByTxHashes", "accounts", mock.Anything).Return().Maybe() + mockMetricsService.On("ObserveDBQueryDuration", "BatchGetByToIDs", "accounts", mock.Anything).Return() + mockMetricsService.On("IncDBQuery", "BatchGetByToIDs", "accounts").Return() + mockMetricsService.On("ObserveDBBatchSize", "BatchGetByToIDs", "accounts", mock.Anything).Return().Maybe() defer mockMetricsService.AssertExpectations(t) m := &AccountModel{ @@ -251,33 +251,33 @@ func TestAccountModelBatchGetByTxHashes(t *testing.T) { ctx := context.Background() address1 := keypair.MustRandom().Address() address2 := keypair.MustRandom().Address() - txHash1 := "tx1" - txHash2 := "tx2" + toID1 := int64(1) + toID2 := int64(2) // Insert test accounts _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", address1, address2) require.NoError(t, err) // Insert test transactions first - _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at) VALUES ($1, 1, 'env1', 100, 'TransactionResultCodeTxSuccess', 'meta1', 1, NOW()), ($2, 2, 'env2', 200, 'TransactionResultCodeTxSuccess', 'meta2', 2, NOW())", txHash1, txHash2) + _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions (hash, to_id, envelope_xdr, fee_charged, result_code, meta_xdr, ledger_number, ledger_created_at) VALUES ('tx1', $1, 'env1', 100, 'TransactionResultCodeTxSuccess', 'meta1', 1, NOW()), ('tx2', $2, 'env2', 200, 'TransactionResultCodeTxSuccess', 'meta2', 2, NOW())", toID1, toID2) require.NoError(t, err) // Insert test transactions_accounts links - _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (tx_hash, account_id) VALUES ($1, $2), ($3, $4)", txHash1, address1, txHash2, address2) + _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2), ($3, $4)", toID1, address1, toID2, address2) require.NoError(t, err) - // Test BatchGetByTxHash function - accounts, err := m.BatchGetByTxHashes(ctx, []string{txHash1, txHash2}, "") + // Test BatchGetByToIDs function + accounts, err := m.BatchGetByToIDs(ctx, []int64{toID1, toID2}, "") require.NoError(t, err) assert.Len(t, accounts, 2) - // Verify accounts are returned with correct tx_hash - addressSet := make(map[string]string) + // Verify accounts are returned with correct to_id + addressSet := make(map[string]int64) for _, acc := range accounts { - addressSet[acc.StellarAddress] = acc.TxHash + addressSet[acc.StellarAddress] = acc.ToID } - assert.Equal(t, txHash1, addressSet[address1]) - assert.Equal(t, txHash2, addressSet[address2]) + assert.Equal(t, toID1, addressSet[address1]) + assert.Equal(t, toID2, addressSet[address2]) } func TestAccountModelBatchGetByOperationIDs(t *testing.T) { @@ -313,7 +313,7 @@ func TestAccountModelBatchGetByOperationIDs(t *testing.T) { require.NoError(t, err) // Insert test operations first - _, err = m.DB.ExecContext(ctx, "INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES ($1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, NOW()), ($2, 'tx2', 'payment', 'xdr2', 'op_success', true, 2, NOW())", operationID1, operationID2) + _, err = m.DB.ExecContext(ctx, "INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES ($1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, NOW()), ($2, 'tx2', 'PAYMENT', 'xdr2', 'op_success', true, 2, NOW())", operationID1, operationID2) require.NoError(t, err) // Insert test operations_accounts links @@ -404,7 +404,7 @@ func TestAccountModelBatchGetByStateChangeIDs(t *testing.T) { require.NoError(t, err) // Insert test operations first - _, err = m.DB.ExecContext(ctx, "INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, NOW()), (2, 'tx2', 'payment', 'xdr2', 'op_success', true, 2, NOW())") + _, err = m.DB.ExecContext(ctx, "INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, NOW()), (2, 'tx2', 'PAYMENT', 'xdr2', 'op_success', true, 2, NOW())") require.NoError(t, err) // Insert test state changes that reference the accounts @@ -413,8 +413,8 @@ func TestAccountModelBatchGetByStateChangeIDs(t *testing.T) { to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash ) VALUES - ($1, $2, 'CREDIT', NOW(), 1, $3, 1, 'tx1'), - ($4, $5, 'DEBIT', NOW(), 2, $6, 2, 'tx2') + ($1, $2, 'BALANCE', NOW(), 1, $3, 1, 'tx1'), + ($4, $5, 'BALANCE', NOW(), 2, $6, 2, 'tx2') `, toID1, stateChangeOrder1, address1, toID2, stateChangeOrder2, address2) require.NoError(t, err) diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index 2edba8631..66417a27a 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -91,9 +91,9 @@ func Test_OperationModel_BatchInsert(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{ - tx1.Hash: set.NewSet(kp1.Address()), - tx2.Hash: set.NewSet(kp2.Address()), + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ + tx1.ToID: set.NewSet(kp1.Address()), + tx2.ToID: set.NewSet(kp2.Address()), }) require.NoError(t, err) @@ -278,9 +278,9 @@ func Test_OperationModel_BatchCopy(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{ - tx1.Hash: set.NewSet(kp1.Address()), - tx2.Hash: set.NewSet(kp2.Address()), + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ + tx1.ToID: set.NewSet(kp1.Address()), + tx2.ToID: set.NewSet(kp2.Address()), }) require.NoError(t, err) @@ -518,9 +518,9 @@ func TestOperationModel_GetAll(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1), - (3, 'tx3', 'payment', 'xdr3', 'op_success', true, 3, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1), + (3, 'tx3', 'PAYMENT', 'xdr3', 'op_success', true, 3, $1) `, now) require.NoError(t, err) @@ -565,12 +565,12 @@ func TestOperationModel_BatchGetByTxHashes(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1), - (3, 'tx1', 'payment', 'xdr3', 'op_success', true, 3, $1), - (4, 'tx1', 'manage_offer', 'xdr4', 'op_success', true, 4, $1), - (5, 'tx2', 'payment', 'xdr5', 'op_success', true, 5, $1), - (6, 'tx3', 'trust_line', 'xdr6', 'op_success', true, 6, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1), + (3, 'tx1', 'PAYMENT', 'xdr3', 'op_success', true, 3, $1), + (4, 'tx1', 'MANAGE_SELL_OFFER', 'xdr4', 'op_success', true, 4, $1), + (5, 'tx2', 'PAYMENT', 'xdr5', 'op_success', true, 5, $1), + (6, 'tx3', 'CHANGE_TRUST', 'xdr6', 'op_success', true, 6, $1) `, now) require.NoError(t, err) @@ -752,9 +752,9 @@ func TestOperationModel_BatchGetByTxHash(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1), - (3, 'tx1', 'payment', 'xdr3', 'op_success', true, 3, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1), + (3, 'tx1', 'PAYMENT', 'xdr3', 'op_success', true, 3, $1) `, now) require.NoError(t, err) @@ -806,9 +806,9 @@ func TestOperationModel_BatchGetByAccountAddresses(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1), - (3, 'tx3', 'payment', 'xdr3', 'op_success', true, 3, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1), + (3, 'tx3', 'PAYMENT', 'xdr3', 'op_success', true, 3, $1) `, now) require.NoError(t, err) @@ -853,8 +853,8 @@ func TestOperationModel_GetByID(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1) `, now) require.NoError(t, err) @@ -917,19 +917,19 @@ func TestOperationModel_BatchGetByStateChangeIDs(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1), - (3, 'tx3', 'payment', 'xdr3', 'op_success', true, 3, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1), + (3, 'tx3', 'PAYMENT', 'xdr3', 'op_success', true, 3, $1) `, now) require.NoError(t, err) // Create test state changes _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) - VALUES - (1, 1, 'credit', $1, 1, $2, 1, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 2, 'tx2'), - (3, 1, 'credit', $1, 3, $2, 1, 'tx3') + VALUES + (1, 1, 'BALANCE', $1, 1, $2, 1, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 2, 'tx2'), + (3, 1, 'BALANCE', $1, 3, $2, 1, 'tx3') `, now, address) require.NoError(t, err) diff --git a/internal/data/statechanges_test.go b/internal/data/statechanges_test.go index 93f75e90c..e9cd92a88 100644 --- a/internal/data/statechanges_test.go +++ b/internal/data/statechanges_test.go @@ -107,9 +107,9 @@ func TestStateChangeModel_BatchInsert(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{ - tx1.Hash: set.NewSet(kp1.Address()), - tx2.Hash: set.NewSet(kp2.Address()), + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ + tx1.ToID: set.NewSet(kp1.Address()), + tx2.ToID: set.NewSet(kp2.Address()), }) require.NoError(t, err) @@ -265,9 +265,9 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[string]set.Set[string]{ - tx1.Hash: set.NewSet(kp1.Address()), - tx2.Hash: set.NewSet(kp2.Address()), + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1, &tx2}, map[int64]set.Set[string]{ + tx1.ToID: set.NewSet(kp1.Address()), + tx2.ToID: set.NewSet(kp2.Address()), }) require.NoError(t, err) @@ -503,9 +503,9 @@ func TestStateChangeModel_BatchGetByAccountAddress(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) VALUES - (1, 1, 'credit', $1, 1, $2, 123, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 456, 'tx2'), - (3, 1, 'credit', $1, 3, $3, 789, 'tx3') + (1, 1, 'BALANCE', $1, 1, $2, 123, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 456, 'tx2'), + (3, 1, 'BALANCE', $1, 3, $3, 789, 'tx3') `, now, address1, address2) require.NoError(t, err) @@ -807,9 +807,9 @@ func TestStateChangeModel_GetAll(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) VALUES - (1, 1, 'credit', $1, 1, $2, 123, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 456, 'tx2'), - (3, 1, 'credit', $1, 3, $2, 789, 'tx3') + (1, 1, 'BALANCE', $1, 1, $2, 123, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 456, 'tx2'), + (3, 1, 'BALANCE', $1, 3, $2, 789, 'tx3') `, now, address) require.NoError(t, err) @@ -854,13 +854,13 @@ func TestStateChangeModel_BatchGetByTxHashes(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) VALUES - (1, 1, 'credit', $1, 1, $2, 123, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 456, 'tx2'), - (3, 1, 'credit', $1, 3, $2, 789, 'tx1'), - (4, 1, 'debit', $1, 4, $2, 101, 'tx1'), - (5, 1, 'credit', $1, 5, $2, 102, 'tx2'), - (6, 1, 'debit', $1, 6, $2, 103, 'tx3'), - (7, 1, 'credit', $1, 7, $2, 104, 'tx2') + (1, 1, 'BALANCE', $1, 1, $2, 123, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 456, 'tx2'), + (3, 1, 'BALANCE', $1, 3, $2, 789, 'tx1'), + (4, 1, 'BALANCE', $1, 4, $2, 101, 'tx1'), + (5, 1, 'BALANCE', $1, 5, $2, 102, 'tx2'), + (6, 1, 'BALANCE', $1, 6, $2, 103, 'tx3'), + (7, 1, 'BALANCE', $1, 7, $2, 104, 'tx2') `, now, address) require.NoError(t, err) @@ -1066,9 +1066,9 @@ func TestStateChangeModel_BatchGetByOperationIDs(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) VALUES - (1, 1, 'credit', $1, 1, $2, 123, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 456, 'tx2'), - (3, 1, 'credit', $1, 3, $2, 123, 'tx3') + (1, 1, 'BALANCE', $1, 1, $2, 123, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 456, 'tx2'), + (3, 1, 'BALANCE', $1, 3, $2, 123, 'tx3') `, now, address) require.NoError(t, err) @@ -1125,10 +1125,10 @@ func TestStateChangeModel_BatchGetByTxHash(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) VALUES - (1, 1, 'credit', $1, 1, $2, 123, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 124, 'tx1'), - (3, 1, 'credit', $1, 3, $2, 125, 'tx1'), - (4, 1, 'debit', $1, 4, $2, 456, 'tx2') + (1, 1, 'BALANCE', $1, 1, $2, 123, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 124, 'tx1'), + (3, 1, 'BALANCE', $1, 3, $2, 125, 'tx1'), + (4, 1, 'BALANCE', $1, 4, $2, 456, 'tx2') `, now, address) require.NoError(t, err) diff --git a/internal/data/transactions.go b/internal/data/transactions.go index 6258b3821..e002db767 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -88,7 +88,7 @@ func (m *TransactionModel) BatchGetByAccountAddress(ctx context.Context, account TableName: "transactions", CursorColumn: "to_id", JoinTable: "transactions_accounts", - JoinCondition: "transactions_accounts.tx_hash = transactions.hash", + JoinCondition: "transactions_accounts.tx_to_id = transactions.to_id", Columns: columns, AccountAddress: accountAddress, Limit: limit, @@ -170,7 +170,7 @@ func (m *TransactionModel) BatchInsert( ctx context.Context, sqlExecuter db.SQLExecuter, txs []*types.Transaction, - stellarAddressesByTxHash map[string]set.Set[string], + stellarAddressesByToID map[int64]set.Set[string], ) ([]string, error) { if sqlExecuter == nil { sqlExecuter = m.DB @@ -199,11 +199,12 @@ func (m *TransactionModel) BatchInsert( isFeeBumps[i] = t.IsFeeBump } - // 2. Flatten the stellarAddressesByTxHash into parallel slices - var txHashes, stellarAddresses []string - for txHash, addresses := range stellarAddressesByTxHash { + // 2. Flatten the stellarAddressesByToID into parallel slices + var txToIDs []int64 + var stellarAddresses []string + for toID, addresses := range stellarAddressesByToID { for address := range addresses.Iter() { - txHashes = append(txHashes, txHash) + txToIDs = append(txToIDs, toID) stellarAddresses = append(stellarAddresses, address) } } @@ -229,19 +230,19 @@ func (m *TransactionModel) BatchInsert( UNNEST($8::timestamptz[]) AS ledger_created_at, UNNEST($9::boolean[]) AS is_fee_bump ) t - ON CONFLICT (hash) DO NOTHING + ON CONFLICT (to_id) DO NOTHING RETURNING hash ), -- Insert transactions_accounts links inserted_transactions_accounts AS ( INSERT INTO transactions_accounts - (tx_hash, account_id) + (tx_to_id, account_id) SELECT - ta.tx_hash, ta.account_id + ta.tx_to_id, ta.account_id FROM ( SELECT - UNNEST($10::text[]) AS tx_hash, + UNNEST($10::bigint[]) AS tx_to_id, UNNEST($11::text[]) AS account_id ) ta ON CONFLICT DO NOTHING @@ -263,7 +264,7 @@ func (m *TransactionModel) BatchInsert( pq.Array(ledgerNumbers), pq.Array(ledgerCreatedAts), pq.Array(isFeeBumps), - pq.Array(txHashes), + pq.Array(txToIDs), pq.Array(stellarAddresses), ) duration := time.Since(start).Seconds() @@ -298,7 +299,7 @@ func (m *TransactionModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, - stellarAddressesByTxHash map[string]set.Set[string], + stellarAddressesByToID map[int64]set.Set[string], ) (int, error) { if len(txs) == 0 { return 0, nil @@ -335,19 +336,19 @@ func (m *TransactionModel) BatchCopy( } // COPY transactions_accounts using pgx binary format with native pgtype types - if len(stellarAddressesByTxHash) > 0 { + if len(stellarAddressesByToID) > 0 { var taRows [][]any - for txHash, addresses := range stellarAddressesByTxHash { - txHashPgtype := pgtype.Text{String: txHash, Valid: true} + for toID, addresses := range stellarAddressesByToID { + toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} for _, addr := range addresses.ToSlice() { - taRows = append(taRows, []any{txHashPgtype, pgtype.Text{String: addr, Valid: true}}) + taRows = append(taRows, []any{toIDPgtype, pgtype.Text{String: addr, Valid: true}}) } } _, err = pgxTx.CopyFrom( ctx, pgx.Identifier{"transactions_accounts"}, - []string{"tx_hash", "account_id"}, + []string{"tx_to_id", "account_id"}, pgx.CopyFromRows(taRows), ) if err != nil { diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index f36f4acb3..9d113f501 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -9,6 +9,7 @@ import ( set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" "github.com/stellar/go-stellar-sdk/keypair" + "github.com/stellar/go-stellar-sdk/toid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -20,32 +21,36 @@ import ( ) // generateTestTransactions creates n test transactions for benchmarking. -func generateTestTransactions(n int, startToID int64) ([]*types.Transaction, map[string]set.Set[string]) { +// Uses toid.New to generate realistic ToIDs based on ledger sequence and transaction index. +func generateTestTransactions(n int, startLedger int32) ([]*types.Transaction, map[int64]set.Set[string]) { txs := make([]*types.Transaction, n) - addressesByHash := make(map[string]set.Set[string]) + addressesByToID := make(map[int64]set.Set[string]) now := time.Now() for i := 0; i < n; i++ { - hash := fmt.Sprintf("e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760-%d", startToID+int64(i)) + ledgerSeq := startLedger + int32(i) + txIndex := int32(1) // First transaction in each ledger + toID := toid.New(ledgerSeq, txIndex, 0).ToInt64() + hash := fmt.Sprintf("e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760-%d", toID) envelope := "AAAAAgAAAAB/NpQ+s+cP+ztX7ryuKgXrxowZPHd4qAxhseOye/JeUgAehIAC2NL/AAflugAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAwAAAAFQQUxMAAAAAKHc4IKbcW8HPPgy3zOhuqv851y72nfLGa0HVXxIRNzHAAAAAAAAAAAAQ3FwMxshxQAfwV8AAAAAYTGQ3QAAAAAAAAAMAAAAAAAAAAFQQUxMAAAAAKHc4IKbcW8HPPgy3zOhuqv851y72nfLGa0HVXxIRNzHAAAAAAAGXwFksiHwAEXz8QAAAABhoaQjAAAAAAAAAAF78l5SAAAAQD7LgvZA8Pdvfh5L2b9B9RC7DlacGBJuOchuZDHQdVD1P0bn6nGQJXxDDI4oN76J49JxB7bIgDVim39MU43MOgE=" meta := "AAAAAwAAAAAAAAAEAAAAAwM6nhwAAAAAAAAAAJjy0MY1CPlZ/co80nzufVmo4gd7NqWMb+RiGiPhiviJAAAAC4SozKUDMWgAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAQM6nhwAAAAAAAAAAJjy0MY1CPlZ/co80nzufVmo4gd7NqWMb+RiGiPhiviJAAAAC4SozKUDMWgAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAwM6LTkAAAAAAAAAAKl6DQcpepRdTbO/Vw4hYBENfE/95GevM7SNA0ftK0gtAAAAA8Kuf0AC+zZCAAAATAAAAAMAAAABAAAAAMRxxkNwYslQaok0LlOKGtpATS9Bzx06JV9DIffG4OF1AAAAAAAAAAlsb2JzdHIuY28AAAABAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAAAAAAAAAAAMAAAAAAyZ54QAAAABmrTXCAAAAAAAAAAEDOp4cAAAAAAAAAACpeg0HKXqUXU2zv1cOIWARDXxP/eRnrzO0jQNH7StILQAAAAPCrn9AAvs2QgAAAE0AAAADAAAAAQAAAADEccZDcGLJUGqJNC5TihraQE0vQc8dOiVfQyH3xuDhdQAAAAAAAAAJbG9ic3RyLmNvAAAAAQAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAAAAAAAAAAADAAAAAAM6nhwAAAAAZyGdHwAAAAAAAAABAAAABAAAAAMDOp4cAAAAAAAAAACpeg0HKXqUXU2zv1cOIWARDXxP/eRnrzO0jQNH7StILQAAAAPCrn9AAvs2QgAAAE0AAAADAAAAAQAAAADEccZDcGLJUGqJNC5TihraQE0vQc8dOiVfQyH3xuDhdQAAAAAAAAAJbG9ic3RyLmNvAAAAAQAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAAAAAAAAAAADAAAAAAM6nhwAAAAAZyGdHwAAAAAAAAABAzqeHAAAAAAAAAAAqXoNByl6lF1Ns79XDiFgEQ18T/3kZ68ztI0DR+0rSC0AAAACmKiNQAL7NkIAAABNAAAAAwAAAAEAAAAAxHHGQ3BiyVBqiTQuU4oa2kBNL0HPHTolX0Mh98bg4XUAAAAAAAAACWxvYnN0ci5jbwAAAAEAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAAAAAAAAAwAAAAADOp4cAAAAAGchnR8AAAAAAAAAAwM6nZoAAAAAAAAAALKxMozkOH3rgpz3/u3+93wsR4p6z4K82HmJ5NTuaZbYAAACZaqAwoIBqycyAABVlQAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAAAAAAAAAAAMAAAAAAzqSNgAAAABnIVdaAAAAAAAAAAEDOp4cAAAAAAAAAACysTKM5Dh964Kc9/7t/vd8LEeKes+CvNh5ieTU7mmW2AAAAmbUhrSCAasnMgAAVZUAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAAAAAAAAAAADAAAAAAM6kjYAAAAAZyFXWgAAAAAAAAAAAAAAAA==" address := keypair.MustRandom().Address() txs[i] = &types.Transaction{ Hash: hash, - ToID: startToID + int64(i), + ToID: toID, EnvelopeXDR: &envelope, FeeCharged: int64(100 * (i + 1)), ResultCode: "TransactionResultCodeTxSuccess", MetaXDR: &meta, - LedgerNumber: uint32(i + 1), + LedgerNumber: uint32(ledgerSeq), LedgerCreatedAt: now, IsFeeBump: false, } - addressesByHash[hash] = set.NewSet(address) + addressesByToID[toID] = set.NewSet(address) } - return txs, addressesByHash + return txs, addressesByToID } func Test_TransactionModel_BatchInsert(t *testing.T) { @@ -94,8 +99,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { name string useDBTx bool txs []*types.Transaction - stellarAddressesByHash map[string]set.Set[string] - wantAccountLinks map[string][]string + stellarAddressesByToID map[int64]set.Set[string] + wantAccountLinks map[int64][]string wantErrContains string wantHashes []string }{ @@ -103,8 +108,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { name: "🟢successful_insert_without_dbTx", useDBTx: false, txs: []*types.Transaction{&tx1, &tx2}, - stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address()), tx2.Hash: set.NewSet(kp2.Address())}, - wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}, tx2.Hash: {kp2.Address()}}, + stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address()), tx2.ToID: set.NewSet(kp2.Address())}, + wantAccountLinks: map[int64][]string{tx1.ToID: {kp1.Address()}, tx2.ToID: {kp2.Address()}}, wantErrContains: "", wantHashes: []string{tx1.Hash, tx2.Hash}, }, @@ -112,8 +117,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { name: "🟢successful_insert_with_dbTx", useDBTx: true, txs: []*types.Transaction{&tx1}, - stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address())}, - wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}}, + stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address())}, + wantAccountLinks: map[int64][]string{tx1.ToID: {kp1.Address()}}, wantErrContains: "", wantHashes: []string{tx1.Hash}, }, @@ -121,8 +126,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { name: "🟢empty_input", useDBTx: false, txs: []*types.Transaction{}, - stellarAddressesByHash: map[string]set.Set[string]{}, - wantAccountLinks: map[string][]string{}, + stellarAddressesByToID: map[int64]set.Set[string]{}, + wantAccountLinks: map[int64][]string{}, wantErrContains: "", wantHashes: nil, }, @@ -130,8 +135,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { name: "🟡duplicate_transaction", useDBTx: false, txs: []*types.Transaction{&tx1, &tx1}, - stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address())}, - wantAccountLinks: map[string][]string{tx1.Hash: {kp1.Address()}}, + stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address())}, + wantAccountLinks: map[int64][]string{tx1.ToID: {kp1.Address()}}, wantErrContains: "", wantHashes: []string{tx1.Hash}, }, @@ -170,7 +175,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { sqlExecuter = tx } - gotInsertedHashes, err := m.BatchInsert(ctx, sqlExecuter, tc.txs, tc.stellarAddressesByHash) + gotInsertedHashes, err := m.BatchInsert(ctx, sqlExecuter, tc.txs, tc.stellarAddressesByToID) if tc.wantErrContains != "" { require.Error(t, err) @@ -189,24 +194,24 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { // Verify the account links if len(tc.wantAccountLinks) > 0 { var accountLinks []struct { - TxHash string `db:"tx_hash"` + TxToID int64 `db:"tx_to_id"` AccountID string `db:"account_id"` } - err = sqlExecuter.SelectContext(ctx, &accountLinks, "SELECT tx_hash, account_id FROM transactions_accounts ORDER BY tx_hash, account_id") + err = sqlExecuter.SelectContext(ctx, &accountLinks, "SELECT tx_to_id, account_id FROM transactions_accounts ORDER BY tx_to_id, account_id") require.NoError(t, err) - // Create a map of tx_hash -> set of account_ids for O(1) lookups - accountLinksMap := make(map[string][]string) + // Create a map of tx_to_id -> set of account_ids for O(1) lookups + accountLinksMap := make(map[int64][]string) for _, link := range accountLinks { - accountLinksMap[link.TxHash] = append(accountLinksMap[link.TxHash], link.AccountID) + accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], link.AccountID) } // Verify each transaction has its expected account links require.Equal(t, len(tc.wantAccountLinks), len(accountLinksMap), "number of elements in the maps don't match") for key, expectedSlice := range tc.wantAccountLinks { actualSlice, exists := accountLinksMap[key] - require.True(t, exists, "key %s not found in actual map", key) - assert.ElementsMatch(t, expectedSlice, actualSlice, "slices for key %s don't match", key) + require.True(t, exists, "key %d not found in actual map", key) + assert.ElementsMatch(t, expectedSlice, actualSlice, "slices for key %d don't match", key) } } }) @@ -270,32 +275,32 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { testCases := []struct { name string txs []*types.Transaction - stellarAddressesByHash map[string]set.Set[string] + stellarAddressesByToID map[int64]set.Set[string] wantCount int wantErrContains string }{ { name: "🟢successful_insert_multiple", txs: []*types.Transaction{&tx1, &tx2}, - stellarAddressesByHash: map[string]set.Set[string]{tx1.Hash: set.NewSet(kp1.Address()), tx2.Hash: set.NewSet(kp2.Address())}, + stellarAddressesByToID: map[int64]set.Set[string]{tx1.ToID: set.NewSet(kp1.Address()), tx2.ToID: set.NewSet(kp2.Address())}, wantCount: 2, }, { name: "🟢empty_input", txs: []*types.Transaction{}, - stellarAddressesByHash: map[string]set.Set[string]{}, + stellarAddressesByToID: map[int64]set.Set[string]{}, wantCount: 0, }, { name: "🟢nullable_fields", txs: []*types.Transaction{&tx3}, - stellarAddressesByHash: map[string]set.Set[string]{tx3.Hash: set.NewSet(kp1.Address())}, + stellarAddressesByToID: map[int64]set.Set[string]{tx3.ToID: set.NewSet(kp1.Address())}, wantCount: 1, }, { name: "🟢no_participants", txs: []*types.Transaction{&tx1}, - stellarAddressesByHash: map[string]set.Set[string]{}, + stellarAddressesByToID: map[int64]set.Set[string]{}, wantCount: 1, }, } @@ -321,7 +326,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { On("ObserveDBBatchSize", "BatchCopy", "transactions", mock.Anything).Return().Once() mockMetricsService. On("IncDBQuery", "BatchCopy", "transactions").Return().Once() - if len(tc.stellarAddressesByHash) > 0 { + if len(tc.stellarAddressesByToID) > 0 { mockMetricsService. On("IncDBQuery", "BatchCopy", "transactions_accounts").Return().Once() } @@ -337,7 +342,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { pgxTx, err := conn.Begin(ctx) require.NoError(t, err) - gotCount, err := m.BatchCopy(ctx, pgxTx, tc.txs, tc.stellarAddressesByHash) + gotCount, err := m.BatchCopy(ctx, pgxTx, tc.txs, tc.stellarAddressesByToID) if tc.wantErrContains != "" { require.Error(t, err) @@ -357,24 +362,24 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { assert.Len(t, dbInsertedHashes, tc.wantCount) // Verify account links if expected - if len(tc.stellarAddressesByHash) > 0 && tc.wantCount > 0 { + if len(tc.stellarAddressesByToID) > 0 && tc.wantCount > 0 { var accountLinks []struct { - TxHash string `db:"tx_hash"` + TxToID int64 `db:"tx_to_id"` AccountID string `db:"account_id"` } - err = dbConnectionPool.SelectContext(ctx, &accountLinks, "SELECT tx_hash, account_id FROM transactions_accounts ORDER BY tx_hash, account_id") + err = dbConnectionPool.SelectContext(ctx, &accountLinks, "SELECT tx_to_id, account_id FROM transactions_accounts ORDER BY tx_to_id, account_id") require.NoError(t, err) - // Create a map of tx_hash -> set of account_ids - accountLinksMap := make(map[string][]string) + // Create a map of tx_to_id -> set of account_ids + accountLinksMap := make(map[int64][]string) for _, link := range accountLinks { - accountLinksMap[link.TxHash] = append(accountLinksMap[link.TxHash], link.AccountID) + accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], link.AccountID) } // Verify each expected transaction has its account links - for txHash, expectedAddresses := range tc.stellarAddressesByHash { - actualAddresses := accountLinksMap[txHash] - assert.ElementsMatch(t, expectedAddresses.ToSlice(), actualAddresses, "account links for tx %s don't match", txHash) + for toID, expectedAddresses := range tc.stellarAddressesByToID { + actualAddresses := accountLinksMap[toID] + assert.ElementsMatch(t, expectedAddresses.ToSlice(), actualAddresses, "account links for tx %d don't match", toID) } } }) @@ -415,8 +420,8 @@ func Test_TransactionModel_BatchCopy_DuplicateFails(t *testing.T) { sqlxDB, err := dbConnectionPool.SqlxDB(ctx) require.NoError(t, err) txModel := &TransactionModel{DB: dbConnectionPool, MetricsService: metrics.NewMetricsService(sqlxDB)} - _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1}, map[string]set.Set[string]{ - tx1.Hash: set.NewSet(kp1.Address()), + _, err = txModel.BatchInsert(ctx, nil, []*types.Transaction{&tx1}, map[int64]set.Set[string]{ + tx1.ToID: set.NewSet(kp1.Address()), }) require.NoError(t, err) @@ -444,8 +449,8 @@ func Test_TransactionModel_BatchCopy_DuplicateFails(t *testing.T) { pgxTx, err := conn.Begin(ctx) require.NoError(t, err) - _, err = m.BatchCopy(ctx, pgxTx, []*types.Transaction{&tx1}, map[string]set.Set[string]{ - tx1.Hash: set.NewSet(kp1.Address()), + _, err = m.BatchCopy(ctx, pgxTx, []*types.Transaction{&tx1}, map[int64]set.Set[string]{ + tx1.ToID: set.NewSet(kp1.Address()), }) // BatchCopy should fail with a unique constraint violation @@ -578,11 +583,11 @@ func TestTransactionModel_BatchGetByAccountAddress(t *testing.T) { // Create test transactions_accounts links _, err = dbConnectionPool.ExecContext(ctx, ` - INSERT INTO transactions_accounts (tx_hash, account_id) + INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES - ('tx1', $1), - ('tx2', $1), - ('tx3', $2) + (1, $1), + (2, $1), + (3, $2) `, address1, address2) require.NoError(t, err) @@ -630,9 +635,9 @@ func TestTransactionModel_BatchGetByOperationIDs(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO operations (id, tx_hash, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES - (1, 'tx1', 'payment', 'xdr1', 'op_success', true, 1, $1), - (2, 'tx2', 'create_account', 'xdr2', 'op_success', true, 2, $1), - (3, 'tx1', 'payment', 'xdr3', 'op_success', true, 3, $1) + (1, 'tx1', 'PAYMENT', 'xdr1', 'op_success', true, 1, $1), + (2, 'tx2', 'CREATE_ACCOUNT', 'xdr2', 'op_success', true, 2, $1), + (3, 'tx1', 'PAYMENT', 'xdr3', 'op_success', true, 3, $1) `, now) require.NoError(t, err) @@ -691,9 +696,9 @@ func TestTransactionModel_BatchGetByStateChangeIDs(t *testing.T) { _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO state_changes (to_id, state_change_order, state_change_category, ledger_created_at, ledger_number, account_id, operation_id, tx_hash) VALUES - (1, 1, 'credit', $1, 1, $2, 1, 'tx1'), - (2, 1, 'debit', $1, 2, $2, 2, 'tx2'), - (3, 1, 'credit', $1, 3, $2, 3, 'tx1') + (1, 1, 'BALANCE', $1, 1, $2, 1, 'tx1'), + (2, 1, 'BALANCE', $1, 2, $2, 2, 'tx2'), + (3, 1, 'BALANCE', $1, 3, $2, 3, 'tx1') `, now, address) require.NoError(t, err) @@ -745,10 +750,10 @@ func BenchmarkTransactionModel_BatchInsert(b *testing.B) { //nolint:errcheck // truncate is best-effort cleanup in benchmarks dbConnectionPool.ExecContext(ctx, "TRUNCATE transactions, transactions_accounts CASCADE") // Generate fresh test data for each iteration - txs, addressesByHash := generateTestTransactions(size, int64(i*size)) + txs, addressesByToID := generateTestTransactions(size, int32(i*size)) b.StartTimer() - _, err := m.BatchInsert(ctx, nil, txs, addressesByHash) + _, err := m.BatchInsert(ctx, nil, txs, addressesByToID) if err != nil { b.Fatalf("BatchInsert failed: %v", err) } @@ -801,7 +806,7 @@ func BenchmarkTransactionModel_BatchCopy(b *testing.B) { } // Generate fresh test data for each iteration - txs, addressesByHash := generateTestTransactions(size, int64(i*size)) + txs, addressesByToID := generateTestTransactions(size, int32(i*size)) // Start a pgx transaction pgxTx, err := conn.Begin(ctx) @@ -810,7 +815,7 @@ func BenchmarkTransactionModel_BatchCopy(b *testing.B) { } b.StartTimer() - _, err = m.BatchCopy(ctx, pgxTx, txs, addressesByHash) + _, err = m.BatchCopy(ctx, pgxTx, txs, addressesByToID) if err != nil { pgxTx.Rollback(ctx) b.Fatalf("BatchCopy failed: %v", err) diff --git a/internal/db/migrations/2025-06-10.2-create_indexer_table_transactions.sql b/internal/db/migrations/2025-06-10.2-transactions.sql similarity index 80% rename from internal/db/migrations/2025-06-10.2-create_indexer_table_transactions.sql rename to internal/db/migrations/2025-06-10.2-transactions.sql index 252b4e260..efab39832 100644 --- a/internal/db/migrations/2025-06-10.2-create_indexer_table_transactions.sql +++ b/internal/db/migrations/2025-06-10.2-transactions.sql @@ -2,8 +2,8 @@ -- Table: transactions CREATE TABLE transactions ( - hash TEXT PRIMARY KEY, - to_id BIGINT NOT NULL, + to_id BIGINT PRIMARY KEY, + hash TEXT NOT NULL UNIQUE, envelope_xdr TEXT, fee_charged BIGINT NOT NULL, result_code TEXT NOT NULL, @@ -18,10 +18,10 @@ CREATE INDEX idx_transactions_ledger_created_at ON transactions(ledger_created_a -- Table: transactions_accounts CREATE TABLE transactions_accounts ( - tx_hash TEXT NOT NULL REFERENCES transactions(hash) ON DELETE CASCADE, + tx_to_id BIGINT NOT NULL REFERENCES transactions(to_id) ON DELETE CASCADE, account_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - PRIMARY KEY (account_id, tx_hash) + PRIMARY KEY (account_id, tx_to_id) ); -- +migrate Down diff --git a/internal/db/migrations/2025-06-10.3-create_indexer_table_operations.sql b/internal/db/migrations/2025-06-10.3-create_indexer_table_operations.sql index 7915bcf19..09f2bd5b5 100644 --- a/internal/db/migrations/2025-06-10.3-create_indexer_table_operations.sql +++ b/internal/db/migrations/2025-06-10.3-create_indexer_table_operations.sql @@ -3,8 +3,21 @@ -- Table: operations CREATE TABLE operations ( id BIGINT PRIMARY KEY, - tx_hash TEXT NOT NULL REFERENCES transactions(hash), - operation_type TEXT NOT NULL, + tx_hash TEXT NOT NULL REFERENCES transactions(hash) ON DELETE CASCADE, + operation_type TEXT NOT NULL CHECK ( + operation_type IN ( + 'CREATE_ACCOUNT', 'PAYMENT', 'PATH_PAYMENT_STRICT_RECEIVE', + 'MANAGE_SELL_OFFER', 'CREATE_PASSIVE_SELL_OFFER', 'SET_OPTIONS', + 'CHANGE_TRUST', 'ALLOW_TRUST', 'ACCOUNT_MERGE', 'INFLATION', + 'MANAGE_DATA', 'BUMP_SEQUENCE', 'MANAGE_BUY_OFFER', + 'PATH_PAYMENT_STRICT_SEND', 'CREATE_CLAIMABLE_BALANCE', + 'CLAIM_CLAIMABLE_BALANCE', 'BEGIN_SPONSORING_FUTURE_RESERVES', + 'END_SPONSORING_FUTURE_RESERVES', 'REVOKE_SPONSORSHIP', + 'CLAWBACK', 'CLAWBACK_CLAIMABLE_BALANCE', 'SET_TRUST_LINE_FLAGS', + 'LIQUIDITY_POOL_DEPOSIT', 'LIQUIDITY_POOL_WITHDRAW', + 'INVOKE_HOST_FUNCTION', 'EXTEND_FOOTPRINT_TTL', 'RESTORE_FOOTPRINT' + ) + ), operation_xdr TEXT, result_code TEXT NOT NULL, successful BOOLEAN NOT NULL, diff --git a/internal/db/migrations/2025-06-10.4-create_indexer_table_state_changes.sql b/internal/db/migrations/2025-06-10.4-create_indexer_table_state_changes.sql index 52b4c375b..73f14f704 100644 --- a/internal/db/migrations/2025-06-10.4-create_indexer_table_state_changes.sql +++ b/internal/db/migrations/2025-06-10.4-create_indexer_table_state_changes.sql @@ -3,15 +3,27 @@ -- Table: state_changes CREATE TABLE state_changes ( to_id BIGINT NOT NULL, - state_change_order BIGINT NOT NULL, - state_change_category TEXT NOT NULL, - state_change_reason TEXT, + state_change_order BIGINT NOT NULL CHECK (state_change_order >= 1), + state_change_category TEXT NOT NULL CHECK ( + state_change_category IN ( + 'BALANCE', 'ACCOUNT', 'SIGNER', 'SIGNATURE_THRESHOLD', + 'METADATA', 'FLAGS', 'TRUSTLINE', 'RESERVES', + 'BALANCE_AUTHORIZATION', 'AUTHORIZATION' + ) + ), + state_change_reason TEXT CHECK ( + state_change_reason IS NULL OR state_change_reason IN ( + 'CREATE', 'MERGE', 'DEBIT', 'CREDIT', 'MINT', 'BURN', + 'ADD', 'REMOVE', 'UPDATE', 'LOW', 'MEDIUM', 'HIGH', + 'HOME_DOMAIN', 'SET', 'CLEAR', 'DATA_ENTRY', 'SPONSOR', 'UNSPONSOR' + ) + ), ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), ledger_created_at TIMESTAMPTZ NOT NULL, ledger_number INTEGER NOT NULL, account_id TEXT NOT NULL, operation_id BIGINT NOT NULL, - tx_hash TEXT NOT NULL REFERENCES transactions(hash), + tx_hash TEXT NOT NULL REFERENCES transactions(hash) ON DELETE CASCADE, token_id TEXT, amount TEXT, signer_account_id TEXT, diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index bdc64b8eb..c435e3c81 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -25,7 +25,7 @@ type IndexerBufferInterface interface { PushTransaction(participant string, transaction types.Transaction) PushOperation(participant string, operation types.Operation, transaction types.Transaction) PushStateChange(transaction types.Transaction, operation types.Operation, stateChange types.StateChange) - GetTransactionsParticipants() map[string]set.Set[string] + GetTransactionsParticipants() map[int64]set.Set[string] GetOperationsParticipants() map[int64]set.Set[string] GetAllParticipants() []string GetNumberOfTransactions() int diff --git a/internal/indexer/indexer_buffer.go b/internal/indexer/indexer_buffer.go index 32389438f..6594e8fa7 100644 --- a/internal/indexer/indexer_buffer.go +++ b/internal/indexer/indexer_buffer.go @@ -59,7 +59,7 @@ type SACBalanceChangeKey struct { type IndexerBuffer struct { mu sync.RWMutex txByHash map[string]*types.Transaction - participantsByTxHash map[string]set.Set[string] + participantsByToID map[int64]set.Set[string] opByID map[int64]*types.Operation participantsByOpID map[int64]set.Set[string] stateChanges []types.StateChange @@ -78,7 +78,7 @@ type IndexerBuffer struct { func NewIndexerBuffer() *IndexerBuffer { return &IndexerBuffer{ txByHash: make(map[string]*types.Transaction), - participantsByTxHash: make(map[string]set.Set[string]), + participantsByToID: make(map[int64]set.Set[string]), opByID: make(map[int64]*types.Operation), participantsByOpID: make(map[int64]set.Set[string]), stateChanges: make([]types.StateChange, 0), @@ -110,7 +110,7 @@ func (b *IndexerBuffer) PushTransaction(participant string, transaction types.Tr // 1. Check if transaction already exists in txByHash // 2. If not, store the transaction pointer // 3. Add participant to the global participants set -// 4. Add participant to this transaction's participant set in participantsByTxHash +// 4. Add participant to this transaction's participant set in participantsByToID // // Caller must hold write lock. func (b *IndexerBuffer) pushTransactionUnsafe(participant string, transaction *types.Transaction) { @@ -119,13 +119,14 @@ func (b *IndexerBuffer) pushTransactionUnsafe(participant string, transaction *t b.txByHash[txHash] = transaction } - // Track this participant globally - if _, exists := b.participantsByTxHash[txHash]; !exists { - b.participantsByTxHash[txHash] = set.NewSet[string]() + // Track this participant by ToID + toID := transaction.ToID + if _, exists := b.participantsByToID[toID]; !exists { + b.participantsByToID[toID] = set.NewSet[string]() } // Add participant - O(1) with automatic deduplication - b.participantsByTxHash[txHash].Add(participant) + b.participantsByToID[toID].Add(participant) // Track participant in global set for batch account insertion if participant != "" { @@ -165,12 +166,12 @@ func (b *IndexerBuffer) GetTransactions() []*types.Transaction { return txs } -// GetTransactionsParticipants returns a map of transaction hashes to its participants. -func (b *IndexerBuffer) GetTransactionsParticipants() map[string]set.Set[string] { +// GetTransactionsParticipants returns a map of transaction ToIDs to its participants. +func (b *IndexerBuffer) GetTransactionsParticipants() map[int64]set.Set[string] { b.mu.RLock() defer b.mu.RUnlock() - return b.participantsByTxHash + return b.participantsByToID } // PushTrustlineChange adds a trustline change to the buffer and tracks unique assets. @@ -442,15 +443,15 @@ func (b *IndexerBuffer) Merge(other IndexerBufferInterface) { // Merge transactions (canonical storage) - this establishes our canonical pointers maps.Copy(b.txByHash, otherBuffer.txByHash) - for txHash, otherParticipants := range otherBuffer.participantsByTxHash { - if existing, exists := b.participantsByTxHash[txHash]; exists { + for toID, otherParticipants := range otherBuffer.participantsByToID { + if existing, exists := b.participantsByToID[toID]; exists { // Merge into existing set - iterate and add (Union creates new set) for participant := range otherParticipants.Iter() { existing.Add(participant) } } else { // Clone the set instead of creating empty + iterating - b.participantsByTxHash[txHash] = otherParticipants.Clone() + b.participantsByToID[toID] = otherParticipants.Clone() } } @@ -553,7 +554,7 @@ func (b *IndexerBuffer) Clear() { // Clear maps (keep allocated backing arrays) clear(b.txByHash) - clear(b.participantsByTxHash) + clear(b.participantsByToID) clear(b.opByID) clear(b.participantsByOpID) clear(b.uniqueTrustlineAssets) diff --git a/internal/indexer/indexer_buffer_test.go b/internal/indexer/indexer_buffer_test.go index 59fce805f..f7f79742d 100644 --- a/internal/indexer/indexer_buffer_test.go +++ b/internal/indexer/indexer_buffer_test.go @@ -28,8 +28,8 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { t.Run("🟢 sequential pushes", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} indexerBuffer.PushTransaction("alice", tx1) indexerBuffer.PushTransaction("alice", tx2) @@ -38,8 +38,8 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { // Assert participants by transaction txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx2.ToID]) // Assert GetNumberOfTransactions assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) @@ -51,8 +51,8 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { t.Run("🟢 concurrent pushes", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} wg := sync.WaitGroup{} wg.Add(4) @@ -76,8 +76,8 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { // Assert participants by transaction txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx2.ToID]) // Assert GetNumberOfTransactions assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) @@ -88,8 +88,8 @@ func TestIndexerBuffer_PushOperation(t *testing.T) { t.Run("🟢 sequential pushes", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} op1 := types.Operation{ID: 1} op2 := types.Operation{ID: 2} @@ -105,15 +105,15 @@ func TestIndexerBuffer_PushOperation(t *testing.T) { // Assert transactions were also added txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("bob", "chuck"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("bob", "chuck"), txParticipants[tx2.ToID]) }) t.Run("🟢 concurrent pushes", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} op1 := types.Operation{ID: 1} op2 := types.Operation{ID: 2} @@ -223,8 +223,8 @@ func TestIndexerBuffer_PushStateChange(t *testing.T) { // Assert transaction participants txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("someone", "alice"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("someone", "alice", "eve", "bob"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("someone", "alice"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("someone", "alice", "eve", "bob"), txParticipants[tx2.ToID]) // Assert operation participants opParticipants := indexerBuffer.GetOperationsParticipants() @@ -240,8 +240,8 @@ func TestIndexerBuffer_GetNumberOfTransactions(t *testing.T) { assert.Equal(t, 0, indexerBuffer.GetNumberOfTransactions()) - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} indexerBuffer.PushTransaction("alice", tx1) assert.Equal(t, 1, indexerBuffer.GetNumberOfTransactions()) @@ -259,8 +259,8 @@ func TestIndexerBuffer_GetAllTransactions(t *testing.T) { t.Run("🟢 returns all unique transactions", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1", LedgerNumber: 100} - tx2 := types.Transaction{Hash: "tx_hash_2", LedgerNumber: 101} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1, LedgerNumber: 100} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2, LedgerNumber: 101} indexerBuffer.PushTransaction("alice", tx1) indexerBuffer.PushTransaction("bob", tx2) @@ -276,16 +276,16 @@ func TestIndexerBuffer_GetAllTransactionsParticipants(t *testing.T) { t.Run("🟢 returns correct participants mapping", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} indexerBuffer.PushTransaction("alice", tx1) indexerBuffer.PushTransaction("bob", tx1) indexerBuffer.PushTransaction("alice", tx2) txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("alice"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("alice"), txParticipants[tx2.ToID]) }) } @@ -293,7 +293,7 @@ func TestIndexerBuffer_GetAllOperations(t *testing.T) { t.Run("🟢 returns all unique operations", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} op1 := types.Operation{ID: 1, TxHash: tx1.Hash} op2 := types.Operation{ID: 2, TxHash: tx1.Hash} @@ -311,7 +311,7 @@ func TestIndexerBuffer_GetAllOperationsParticipants(t *testing.T) { t.Run("🟢 returns correct participants mapping", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} op1 := types.Operation{ID: 1, TxHash: tx1.Hash} op2 := types.Operation{ID: 2, TxHash: tx1.Hash} @@ -355,8 +355,8 @@ func TestIndexerBuffer_GetAllParticipants(t *testing.T) { t.Run("🟢 collects participants from transactions", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} indexerBuffer.PushTransaction("alice", tx1) indexerBuffer.PushTransaction("bob", tx2) @@ -369,7 +369,7 @@ func TestIndexerBuffer_GetAllParticipants(t *testing.T) { t.Run("🟢 collects participants from operations", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx := types.Transaction{Hash: "tx_hash_1"} + tx := types.Transaction{Hash: "tx_hash_1", ToID: 1} op1 := types.Operation{ID: 1, TxHash: tx.Hash} op2 := types.Operation{ID: 2, TxHash: tx.Hash} @@ -419,7 +419,7 @@ func TestIndexerBuffer_GetAllParticipants(t *testing.T) { t.Run("🟢 ignores empty participants", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() - tx := types.Transaction{Hash: "tx_hash_1"} + tx := types.Transaction{Hash: "tx_hash_1", ToID: 1} indexerBuffer.PushTransaction("", tx) // empty participant indexerBuffer.PushTransaction("alice", tx) @@ -442,8 +442,8 @@ func TestIndexerBuffer_Merge(t *testing.T) { buffer1 := NewIndexerBuffer() buffer2 := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} buffer1.PushTransaction("alice", tx1) buffer2.PushTransaction("bob", tx2) @@ -457,15 +457,15 @@ func TestIndexerBuffer_Merge(t *testing.T) { // Verify transaction participants txParticipants := buffer1.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("bob"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("bob"), txParticipants[tx2.ToID]) }) t.Run("🟢 merge operations only", func(t *testing.T) { buffer1 := NewIndexerBuffer() buffer2 := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} op1 := types.Operation{ID: 1, TxHash: tx1.Hash} op2 := types.Operation{ID: 2, TxHash: tx1.Hash} @@ -511,8 +511,8 @@ func TestIndexerBuffer_Merge(t *testing.T) { buffer1 := NewIndexerBuffer() buffer2 := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} op1 := types.Operation{ID: 1, TxHash: tx1.Hash} // Buffer1 has tx1 with alice @@ -532,8 +532,8 @@ func TestIndexerBuffer_Merge(t *testing.T) { // Verify tx1 has both alice and bob as participants txParticipants := buffer1.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("charlie"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("charlie"), txParticipants[tx2.ToID]) // Verify operation participants merged opParticipants := buffer1.GetOperationsParticipants() @@ -544,7 +544,7 @@ func TestIndexerBuffer_Merge(t *testing.T) { buffer1 := NewIndexerBuffer() buffer2 := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} op1 := types.Operation{ID: 1, TxHash: tx1.Hash} sc1 := types.StateChange{ToID: 1, StateChangeOrder: 1, AccountID: "alice"} @@ -563,7 +563,7 @@ func TestIndexerBuffer_Merge(t *testing.T) { buffer1 := NewIndexerBuffer() buffer2 := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} buffer1.PushTransaction("alice", tx1) buffer1.Merge(buffer2) @@ -576,9 +576,9 @@ func TestIndexerBuffer_Merge(t *testing.T) { buffer2 := NewIndexerBuffer() buffer3 := NewIndexerBuffer() - tx1 := types.Transaction{Hash: "tx_hash_1"} - tx2 := types.Transaction{Hash: "tx_hash_2"} - tx3 := types.Transaction{Hash: "tx_hash_3"} + tx1 := types.Transaction{Hash: "tx_hash_1", ToID: 1} + tx2 := types.Transaction{Hash: "tx_hash_2", ToID: 2} + tx3 := types.Transaction{Hash: "tx_hash_3", ToID: 3} buffer1.PushTransaction("alice", tx1) buffer2.PushTransaction("bob", tx2) @@ -642,8 +642,8 @@ func TestIndexerBuffer_Merge(t *testing.T) { // Verify participants mappings txParticipants := buffer1.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.Hash]) - assert.Equal(t, set.NewSet("bob"), txParticipants[tx2.Hash]) + assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, set.NewSet("bob"), txParticipants[tx2.ToID]) opParticipants := buffer1.GetOperationsParticipants() assert.Equal(t, set.NewSet("alice"), opParticipants[int64(1)]) diff --git a/internal/indexer/indexer_test.go b/internal/indexer/indexer_test.go index 825bcec0f..e54c312de 100644 --- a/internal/indexer/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -226,9 +226,9 @@ func TestIndexer_ProcessLedgerTransactions(t *testing.T) { // Verify transaction participants txParticipantsMap := buffer.GetTransactionsParticipants() - txHash := "0102030000000000000000000000000000000000000000000000000000000000" - assert.True(t, txParticipantsMap[txHash].Contains("alice"), "alice should be in tx participants") - assert.True(t, txParticipantsMap[txHash].Contains("bob"), "bob should be in tx participants") + toID := allTxs[0].ToID + assert.True(t, txParticipantsMap[toID].Contains("alice"), "alice should be in tx participants") + assert.True(t, txParticipantsMap[toID].Contains("bob"), "bob should be in tx participants") // Verify operations allOps := buffer.GetOperations() diff --git a/internal/indexer/types/types.go b/internal/indexer/types/types.go index f5da98203..42be608a8 100644 --- a/internal/indexer/types/types.go +++ b/internal/indexer/types/types.go @@ -122,9 +122,9 @@ type Account struct { CreatedAt time.Time `json:"createdAt,omitempty" db:"created_at"` } -type AccountWithTxHash struct { +type AccountWithToID struct { Account - TxHash string `json:"txHash,omitempty" db:"tx_hash"` + ToID int64 `json:"toId,omitempty" db:"tx_to_id"` } type AccountWithOperationID struct { diff --git a/internal/integrationtests/infrastructure/backfill_helpers.go b/internal/integrationtests/infrastructure/backfill_helpers.go index dc2563627..9448175ad 100644 --- a/internal/integrationtests/infrastructure/backfill_helpers.go +++ b/internal/integrationtests/infrastructure/backfill_helpers.go @@ -55,7 +55,7 @@ func (s *SharedContainers) GetTransactionCountForAccount(ctx context.Context, ac query := ` SELECT COUNT(DISTINCT t.hash) FROM transactions t - INNER JOIN transactions_accounts ta ON t.hash = ta.tx_hash + INNER JOIN transactions_accounts ta ON t.to_id = ta.tx_to_id WHERE ta.account_id = $1 AND t.ledger_number BETWEEN $2 AND $3 ` @@ -113,7 +113,7 @@ func (s *SharedContainers) GetTransactionAccountLinkCount(ctx context.Context, a query := ` SELECT COUNT(*) FROM transactions_accounts ta - INNER JOIN transactions t ON ta.tx_hash = t.hash + INNER JOIN transactions t ON ta.tx_to_id = t.to_id WHERE ta.account_id = $1 AND t.ledger_number BETWEEN $2 AND $3 ` diff --git a/internal/serve/graphql/dataloaders/account_loaders.go b/internal/serve/graphql/dataloaders/account_loaders.go index d072f74a5..e6f0c6cd0 100644 --- a/internal/serve/graphql/dataloaders/account_loaders.go +++ b/internal/serve/graphql/dataloaders/account_loaders.go @@ -11,32 +11,32 @@ import ( ) type AccountColumnsKey struct { - TxHash string + ToID int64 OperationID int64 StateChangeID string Columns string } -// accountsByTxHashLoader creates a dataloader for fetching accounts by transaction hash +// accountsByToIDLoader creates a dataloader for fetching accounts by transaction ToID // This prevents N+1 queries when multiple transactions request their accounts -// The loader batches multiple transaction hashes into a single database query -func accountsByTxHashLoader(models *data.Models) *dataloadgen.Loader[AccountColumnsKey, []*types.Account] { +// The loader batches multiple transaction ToIDs into a single database query +func accountsByToIDLoader(models *data.Models) *dataloadgen.Loader[AccountColumnsKey, []*types.Account] { return newOneToManyLoader( - func(ctx context.Context, keys []AccountColumnsKey) ([]*types.AccountWithTxHash, error) { - txHashes := make([]string, len(keys)) + func(ctx context.Context, keys []AccountColumnsKey) ([]*types.AccountWithToID, error) { + toIDs := make([]int64, len(keys)) columns := keys[0].Columns for i, key := range keys { - txHashes[i] = key.TxHash + toIDs[i] = key.ToID } - return models.Account.BatchGetByTxHashes(ctx, txHashes, columns) + return models.Account.BatchGetByToIDs(ctx, toIDs, columns) }, - func(item *types.AccountWithTxHash) string { - return item.TxHash + func(item *types.AccountWithToID) int64 { + return item.ToID }, - func(key AccountColumnsKey) string { - return key.TxHash + func(key AccountColumnsKey) int64 { + return key.ToID }, - func(item *types.AccountWithTxHash) types.Account { + func(item *types.AccountWithToID) types.Account { return item.Account }, ) diff --git a/internal/serve/graphql/dataloaders/loaders.go b/internal/serve/graphql/dataloaders/loaders.go index bb3df52cb..0c0d0e5db 100644 --- a/internal/serve/graphql/dataloaders/loaders.go +++ b/internal/serve/graphql/dataloaders/loaders.go @@ -21,9 +21,9 @@ type Dataloaders struct { // Used by Transaction.operations field resolver to prevent N+1 queries OperationsByTxHashLoader *dataloadgen.Loader[OperationColumnsKey, []*types.OperationWithCursor] - // AccountsByTxHashLoader batches requests for accounts by transaction hash + // AccountsByToIDLoader batches requests for accounts by transaction ToID // Used by Transaction.accounts field resolver to prevent N+1 queries - AccountsByTxHashLoader *dataloadgen.Loader[AccountColumnsKey, []*types.Account] + AccountsByToIDLoader *dataloadgen.Loader[AccountColumnsKey, []*types.Account] // StateChangesByTxHashLoader batches requests for state changes by transaction hash // Used by Transaction.stateChanges field resolver to prevent N+1 queries @@ -66,7 +66,7 @@ func NewDataloaders(models *data.Models) *Dataloaders { TransactionsByOperationIDLoader: transactionByOperationIDLoader(models), StateChangesByTxHashLoader: stateChangesByTxHashLoader(models), StateChangesByOperationIDLoader: stateChangesByOperationIDLoader(models), - AccountsByTxHashLoader: accountsByTxHashLoader(models), + AccountsByToIDLoader: accountsByToIDLoader(models), AccountsByOperationIDLoader: accountsByOperationIDLoader(models), AccountByStateChangeIDLoader: accountByStateChangeIDLoader(models), } diff --git a/internal/serve/graphql/resolvers/test_utils.go b/internal/serve/graphql/resolvers/test_utils.go index 5d1aeec62..27c7b1e47 100644 --- a/internal/serve/graphql/resolvers/test_utils.go +++ b/internal/serve/graphql/resolvers/test_utils.go @@ -71,7 +71,7 @@ func setupDB(ctx context.Context, t *testing.T, dbConnectionPool db.ConnectionPo ops = append(ops, &types.Operation{ ID: toid.New(testLedger, int32(i+1), int32(j+1)).ToInt64(), TxHash: txn.Hash, - OperationType: "payment", + OperationType: "PAYMENT", OperationXDR: fmt.Sprintf("opxdr%d", opIdx), ResultCode: "op_success", Successful: true, @@ -139,8 +139,8 @@ func setupDB(ctx context.Context, t *testing.T, dbConnectionPool db.ConnectionPo require.NoError(t, err) _, err = tx.ExecContext(ctx, - `INSERT INTO transactions_accounts (tx_hash, account_id) VALUES ($1, $2)`, - txn.Hash, parentAccount.StellarAddress) + `INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2)`, + txn.ToID, parentAccount.StellarAddress) require.NoError(t, err) } diff --git a/internal/serve/graphql/resolvers/transaction.resolvers.go b/internal/serve/graphql/resolvers/transaction.resolvers.go index ab97f04af..86369c535 100644 --- a/internal/serve/graphql/resolvers/transaction.resolvers.go +++ b/internal/serve/graphql/resolvers/transaction.resolvers.go @@ -69,10 +69,10 @@ func (r *transactionResolver) Accounts(ctx context.Context, obj *types.Transacti // This prevents N+1 queries when multiple transactions request their operations // The loader groups multiple requests and executes them in a single database query loaderKey := dataloaders.AccountColumnsKey{ - TxHash: obj.Hash, + ToID: obj.ToID, Columns: strings.Join(dbColumns, ", "), } - accounts, err := loaders.AccountsByTxHashLoader.Load(ctx, loaderKey) + accounts, err := loaders.AccountsByToIDLoader.Load(ctx, loaderKey) if err != nil { return nil, err } diff --git a/internal/serve/graphql/resolvers/transaction_resolvers_test.go b/internal/serve/graphql/resolvers/transaction_resolvers_test.go index 8334c05c4..f0ee826f6 100644 --- a/internal/serve/graphql/resolvers/transaction_resolvers_test.go +++ b/internal/serve/graphql/resolvers/transaction_resolvers_test.go @@ -154,9 +154,9 @@ func TestTransactionResolver_Operations(t *testing.T) { func TestTransactionResolver_Accounts(t *testing.T) { mockMetricsService := &metrics.MockMetricsService{} - mockMetricsService.On("IncDBQuery", "BatchGetByTxHashes", "accounts").Return() - mockMetricsService.On("ObserveDBQueryDuration", "BatchGetByTxHashes", "accounts", mock.Anything).Return() - mockMetricsService.On("ObserveDBBatchSize", "BatchGetByTxHashes", "accounts", mock.Anything).Return() + mockMetricsService.On("IncDBQuery", "BatchGetByToIDs", "accounts").Return() + mockMetricsService.On("ObserveDBQueryDuration", "BatchGetByToIDs", "accounts", mock.Anything).Return() + mockMetricsService.On("ObserveDBBatchSize", "BatchGetByToIDs", "accounts", mock.Anything).Return() defer mockMetricsService.AssertExpectations(t) resolver := &transactionResolver{&Resolver{ @@ -167,7 +167,7 @@ func TestTransactionResolver_Accounts(t *testing.T) { }, }, }} - parentTx := &types.Transaction{Hash: "tx1"} + parentTx := &types.Transaction{ToID: toid.New(1000, 1, 0).ToInt64(), Hash: "tx1"} t.Run("success", func(t *testing.T) { loaders := dataloaders.NewDataloaders(resolver.models) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index d50920ffe..1dcf4c412 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -225,7 +225,7 @@ func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.Ledger // data ready for database insertion after participant filtering. type filteredIngestionData struct { txs []*types.Transaction - txParticipants map[string]set.Set[string] + txParticipants map[int64]set.Set[string] ops []*types.Operation opParticipants map[int64]set.Set[string] stateChanges []types.StateChange @@ -248,7 +248,7 @@ func (m *ingestService) filterByRegisteredAccounts( ctx context.Context, dbTx pgx.Tx, txs []*types.Transaction, - txParticipants map[string]set.Set[string], + txParticipants map[int64]set.Set[string], ops []*types.Operation, opParticipants map[int64]set.Set[string], stateChanges []types.StateChange, @@ -264,19 +264,19 @@ func (m *ingestService) filterByRegisteredAccounts( log.Ctx(ctx).Infof("filtering enabled: %d/%d participants are registered", len(existing), len(allParticipants)) // Filter transactions: include if ANY participant is registered - txHashesToInclude := set.NewSet[string]() - for txHash, participants := range txParticipants { + toIDsToInclude := set.NewSet[int64]() + for toID, participants := range txParticipants { if hasRegisteredParticipant(participants, registeredAccounts) { - txHashesToInclude.Add(txHash) + toIDsToInclude.Add(toID) } } - filteredTxs := make([]*types.Transaction, 0, txHashesToInclude.Cardinality()) - filteredTxParticipants := make(map[string]set.Set[string]) + filteredTxs := make([]*types.Transaction, 0, toIDsToInclude.Cardinality()) + filteredTxParticipants := make(map[int64]set.Set[string]) for _, tx := range txs { - if txHashesToInclude.Contains(tx.Hash) { + if toIDsToInclude.Contains(tx.ToID) { filteredTxs = append(filteredTxs, tx) - filteredTxParticipants[tx.Hash] = txParticipants[tx.Hash] + filteredTxParticipants[tx.ToID] = txParticipants[tx.ToID] } } @@ -363,11 +363,11 @@ func (m *ingestService) insertIntoDB(ctx context.Context, dbTx pgx.Tx, data *fil } // insertTransactions batch inserts transactions with their participants into the database. -func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, stellarAddressesByTxHash map[string]set.Set[string]) error { +func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, stellarAddressesByToID map[int64]set.Set[string]) error { if len(txs) == 0 { return nil } - _, err := m.models.Transactions.BatchCopy(ctx, pgxTx, txs, stellarAddressesByTxHash) + _, err := m.models.Transactions.BatchCopy(ctx, pgxTx, txs, stellarAddressesByToID) if err != nil { return fmt.Errorf("batch inserting transactions: %w", err) } diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 28f6d9db8..a06932e17 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1395,7 +1395,7 @@ func Test_ingestService_filterParticipantData(t *testing.T) { wantStateChangeCount: 1, verifyParticipants: func(t *testing.T, filtered *filteredIngestionData) { // Verify ALL participants are preserved (not just registered ones) - participants := filtered.txParticipants["tx_hash_1"] + participants := filtered.txParticipants[int64(1)] assert.Equal(t, 2, participants.Cardinality()) assert.True(t, participants.Contains("GABC1111111111111111111111111111111111111111111111111")) assert.True(t, participants.Contains("GXYZ9999999999999999999999999999999999999999999999999"))