diff --git a/internal/data/accounts.go b/internal/data/accounts.go index 89187fee7..d2450c242 100644 --- a/internal/data/accounts.go +++ b/internal/data/accounts.go @@ -36,7 +36,7 @@ func (m *AccountModel) Get(ctx context.Context, address string) (*types.Account, const query = `SELECT * FROM accounts WHERE stellar_address = $1` var account types.Account start := time.Now() - err := m.DB.GetContext(ctx, &account, query, address) + err := m.DB.GetContext(ctx, &account, query, types.AddressBytea(address)) duration := time.Since(start).Seconds() m.MetricsService.ObserveDBQueryDuration("Get", "accounts", duration) if err != nil { @@ -50,8 +50,8 @@ func (m *AccountModel) Get(ctx context.Context, address string) (*types.Account, func (m *AccountModel) GetAll(ctx context.Context) ([]string, error) { const query = `SELECT stellar_address FROM accounts` start := time.Now() - accounts := []string{} - err := m.DB.SelectContext(ctx, &accounts, query) + var addresses []types.AddressBytea + err := m.DB.SelectContext(ctx, &addresses, query) duration := time.Since(start).Seconds() m.MetricsService.ObserveDBQueryDuration("GetAll", "accounts", duration) if err != nil { @@ -59,13 +59,18 @@ func (m *AccountModel) GetAll(ctx context.Context) ([]string, error) { return nil, fmt.Errorf("getting all accounts: %w", err) } m.MetricsService.IncDBQuery("GetAll", "accounts") - return accounts, nil + // Convert []AddressBytea to []string + result := make([]string, len(addresses)) + for i, addr := range addresses { + result[i] = string(addr) + } + return result, nil } func (m *AccountModel) Insert(ctx context.Context, address string) error { const query = `INSERT INTO accounts (stellar_address) VALUES ($1)` start := time.Now() - _, err := m.DB.ExecContext(ctx, query, address) + _, err := m.DB.ExecContext(ctx, query, types.AddressBytea(address)) duration := time.Since(start).Seconds() m.MetricsService.ObserveDBQueryDuration("Insert", "accounts", duration) if err != nil { @@ -82,7 +87,7 @@ func (m *AccountModel) Insert(ctx context.Context, address string) error { func (m *AccountModel) Delete(ctx context.Context, address string) error { const query = `DELETE FROM accounts WHERE stellar_address = $1` start := time.Now() - result, err := m.DB.ExecContext(ctx, query, address) + result, err := m.DB.ExecContext(ctx, query, types.AddressBytea(address)) duration := time.Since(start).Seconds() m.MetricsService.ObserveDBQueryDuration("Delete", "accounts", duration) if err != nil { @@ -104,25 +109,53 @@ func (m *AccountModel) Delete(ctx context.Context, address string) error { return nil } +// BatchGetByIDs returns the subset of provided account IDs that exist in the accounts table. // BatchGetByIDs returns the subset of provided account IDs that exist in the accounts table. func (m *AccountModel) BatchGetByIDs(ctx context.Context, dbTx pgx.Tx, accountIDs []string) ([]string, error) { if len(accountIDs) == 0 { return []string{}, nil } + // Convert string addresses to [][]byte for BYTEA array comparison + byteAddresses := make([][]byte, len(accountIDs)) + for i, addr := range accountIDs { + addrBytes, err := types.AddressBytea(addr).Value() + if err != nil { + return nil, fmt.Errorf("converting address %s to bytes: %w", addr, err) + } + if addrBytes == nil { + return nil, fmt.Errorf("address %s converted to nil", addr) + } + byteAddresses[i] = addrBytes.([]byte) + } + const query = `SELECT stellar_address FROM accounts WHERE stellar_address = ANY($1)` start := time.Now() - var existingAccounts []string - rows, err := dbTx.Query(ctx, query, accountIDs) + rows, err := dbTx.Query(ctx, query, byteAddresses) if err != nil { m.MetricsService.IncDBQueryError("BatchGetByIDs", "accounts", utils.GetDBErrorType(err)) return nil, fmt.Errorf("querying accounts by IDs: %w", err) } - existingAccounts, err = pgx.CollectRows(rows, pgx.RowTo[string]) - if err != nil { + defer rows.Close() + + var existingAccounts []string + for rows.Next() { + var addrBytes []byte + if err := rows.Scan(&addrBytes); err != nil { + m.MetricsService.IncDBQueryError("BatchGetByIDs", "accounts", utils.GetDBErrorType(err)) + return nil, fmt.Errorf("scanning address: %w", err) + } + var addr types.AddressBytea + if err := addr.Scan(addrBytes); err != nil { + return nil, fmt.Errorf("converting address bytes: %w", err) + } + existingAccounts = append(existingAccounts, string(addr)) + } + if err := rows.Err(); err != nil { m.MetricsService.IncDBQueryError("BatchGetByIDs", "accounts", utils.GetDBErrorType(err)) - return nil, fmt.Errorf("collecting rows: %w", err) + return nil, fmt.Errorf("iterating rows: %w", err) } + duration := time.Since(start).Seconds() m.MetricsService.ObserveDBQueryDuration("BatchGetByIDs", "accounts", duration) m.MetricsService.ObserveDBBatchSize("BatchGetByIDs", "accounts", len(accountIDs)) @@ -133,17 +166,17 @@ func (m *AccountModel) BatchGetByIDs(ctx context.Context, dbTx pgx.Tx, accountID // IsAccountFeeBumpEligible checks whether an account is eligible to have its transaction fee-bumped. Channel Accounts should be // eligible because some of the transactions will have the channel accounts as the source account (i. e. create account sponsorship). func (m *AccountModel) IsAccountFeeBumpEligible(ctx context.Context, address string) (bool, error) { + // accounts.stellar_address is BYTEA, channel_accounts.public_key is VARCHAR + // Use separate EXISTS checks to avoid type mismatch in UNION const query = ` SELECT - EXISTS( - SELECT stellar_address FROM accounts WHERE stellar_address = $1 - UNION - SELECT public_key FROM channel_accounts WHERE public_key = $1 - ) + EXISTS(SELECT 1 FROM accounts WHERE stellar_address = $1) + OR + EXISTS(SELECT 1 FROM channel_accounts WHERE public_key = $2) ` var exists bool start := time.Now() - err := m.DB.GetContext(ctx, &exists, query, address) + err := m.DB.GetContext(ctx, &exists, query, types.AddressBytea(address), address) duration := time.Since(start).Seconds() m.MetricsService.ObserveDBQueryDuration("IsAccountFeeBumpEligible", "accounts", duration) if err != nil { diff --git a/internal/data/accounts_test.go b/internal/data/accounts_test.go index 97fbb5deb..0dac677ee 100644 --- a/internal/data/accounts_test.go +++ b/internal/data/accounts_test.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/wallet-backend/internal/db" "github.com/stellar/wallet-backend/internal/db/dbtest" + "github.com/stellar/wallet-backend/internal/indexer/types" "github.com/stellar/wallet-backend/internal/metrics" ) @@ -33,6 +34,12 @@ func TestAccountModel_BatchGetByIDs(t *testing.T) { ctx := context.Background() + // Generate test addresses + account1 := keypair.MustRandom().Address() + account2 := keypair.MustRandom().Address() + nonexistent1 := keypair.MustRandom().Address() + nonexistent2 := keypair.MustRandom().Address() + t.Run("empty input returns empty result", func(t *testing.T) { var result []string err := db.RunInPgxTransaction(ctx, dbConnectionPool, func(tx pgx.Tx) error { @@ -44,8 +51,9 @@ func TestAccountModel_BatchGetByIDs(t *testing.T) { }) t.Run("returns existing accounts only", func(t *testing.T) { - // Insert some test accounts - _, err := dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", "account1", "account2") + // Insert some test accounts using StellarAddress for BYTEA conversion + _, err := dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", + types.AddressBytea(account1), types.AddressBytea(account2)) require.NoError(t, err) // Test with mix of existing and non-existing accounts @@ -55,17 +63,17 @@ func TestAccountModel_BatchGetByIDs(t *testing.T) { var result []string err = db.RunInPgxTransaction(ctx, dbConnectionPool, func(tx pgx.Tx) error { - result, err = accountModel.BatchGetByIDs(ctx, tx, []string{"account1", "nonexistent", "account2", "another_nonexistent"}) + result, err = accountModel.BatchGetByIDs(ctx, tx, []string{account1, nonexistent1, account2, nonexistent2}) return err }) require.NoError(t, err) // Should only return the existing accounts assert.Len(t, result, 2) - assert.Contains(t, result, "account1") - assert.Contains(t, result, "account2") - assert.NotContains(t, result, "nonexistent") - assert.NotContains(t, result, "another_nonexistent") + assert.Contains(t, result, account1) + assert.Contains(t, result, account2) + assert.NotContains(t, result, nonexistent1) + assert.NotContains(t, result, nonexistent2) }) t.Run("returns empty when no accounts exist", func(t *testing.T) { @@ -79,7 +87,7 @@ func TestAccountModel_BatchGetByIDs(t *testing.T) { var result []string err = db.RunInPgxTransaction(ctx, dbConnectionPool, func(tx pgx.Tx) error { - result, err = accountModel.BatchGetByIDs(ctx, tx, []string{"nonexistent1", "nonexistent2"}) + result, err = accountModel.BatchGetByIDs(ctx, tx, []string{nonexistent1, nonexistent2}) return err }) require.NoError(t, err) @@ -110,12 +118,11 @@ func TestAccountModel_Insert(t *testing.T) { err = m.Insert(ctx, address) require.NoError(t, err) - var dbAddress sql.NullString - err = m.DB.GetContext(ctx, &dbAddress, "SELECT stellar_address FROM accounts WHERE stellar_address = $1", address) + var dbAddress types.AddressBytea + err = m.DB.GetContext(ctx, &dbAddress, "SELECT stellar_address FROM accounts WHERE stellar_address = $1", types.AddressBytea(address)) require.NoError(t, err) - assert.True(t, dbAddress.Valid) - assert.Equal(t, address, dbAddress.String) + assert.Equal(t, address, string(dbAddress)) }) t.Run("duplicate insert fails", func(t *testing.T) { @@ -164,7 +171,7 @@ func TestAccountModel_Delete(t *testing.T) { ctx := context.Background() address := keypair.MustRandom().Address() - result, insertErr := m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + result, insertErr := m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, insertErr) rowAffected, err := result.RowsAffected() require.NoError(t, err) @@ -173,7 +180,7 @@ func TestAccountModel_Delete(t *testing.T) { err = m.Delete(ctx, address) require.NoError(t, err) - var dbAddress sql.NullString + var dbAddress types.AddressBytea err = m.DB.GetContext(ctx, &dbAddress, "SELECT stellar_address FROM accounts LIMIT 1") assert.ErrorIs(t, err, sql.ErrNoRows) }) @@ -218,7 +225,7 @@ func TestAccountModelGet(t *testing.T) { address := keypair.MustRandom().Address() // Insert test account - result, err := m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + result, err := m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) rowAffected, err := result.RowsAffected() require.NoError(t, err) @@ -227,7 +234,7 @@ func TestAccountModelGet(t *testing.T) { // Test Get function account, err := m.Get(ctx, address) require.NoError(t, err) - assert.Equal(t, address, account.StellarAddress) + assert.Equal(t, address, string(account.StellarAddress)) } func TestAccountModelBatchGetByToIDs(t *testing.T) { @@ -255,7 +262,8 @@ func TestAccountModelBatchGetByToIDs(t *testing.T) { toID2 := int64(2) // Insert test accounts - _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", address1, address2) + _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", + types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Insert test transactions first @@ -263,7 +271,8 @@ func TestAccountModelBatchGetByToIDs(t *testing.T) { require.NoError(t, err) // Insert test transactions_accounts links - _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2), ($3, $4)", toID1, address1, toID2, address2) + _, err = m.DB.ExecContext(ctx, "INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES ($1, $2), ($3, $4)", + toID1, types.AddressBytea(address1), toID2, types.AddressBytea(address2)) require.NoError(t, err) // Test BatchGetByToIDs function @@ -274,7 +283,7 @@ func TestAccountModelBatchGetByToIDs(t *testing.T) { // Verify accounts are returned with correct to_id addressSet := make(map[string]int64) for _, acc := range accounts { - addressSet[acc.StellarAddress] = acc.ToID + addressSet[string(acc.StellarAddress)] = acc.ToID } assert.Equal(t, toID1, addressSet[address1]) assert.Equal(t, toID2, addressSet[address2]) @@ -304,8 +313,9 @@ func TestAccountModelBatchGetByOperationIDs(t *testing.T) { operationID1 := int64(123) operationID2 := int64(456) - // Insert test accounts - _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", address1, address2) + // Insert test accounts (stellar_address is BYTEA) + _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", + types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Insert test transactions first @@ -316,8 +326,9 @@ func TestAccountModelBatchGetByOperationIDs(t *testing.T) { _, err = m.DB.ExecContext(ctx, "INSERT INTO operations (id, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES ($1, 'PAYMENT', 'xdr1', 'op_success', true, 1, NOW()), ($2, 'PAYMENT', 'xdr2', 'op_success', true, 2, NOW())", operationID1, operationID2) require.NoError(t, err) - // Insert test operations_accounts links - _, err = m.DB.ExecContext(ctx, "INSERT INTO operations_accounts (operation_id, account_id) VALUES ($1, $2), ($3, $4)", operationID1, address1, operationID2, address2) + // Insert test operations_accounts links (account_id is BYTEA) + _, err = m.DB.ExecContext(ctx, "INSERT INTO operations_accounts (operation_id, account_id) VALUES ($1, $2), ($3, $4)", + operationID1, types.AddressBytea(address1), operationID2, types.AddressBytea(address2)) require.NoError(t, err) // Test BatchGetByOperationID function @@ -328,7 +339,7 @@ func TestAccountModelBatchGetByOperationIDs(t *testing.T) { // Verify accounts are returned with correct operation_id addressSet := make(map[string]int64) for _, acc := range accounts { - addressSet[acc.StellarAddress] = acc.OperationID + addressSet[string(acc.StellarAddress)] = acc.OperationID } assert.Equal(t, operationID1, addressSet[address1]) assert.Equal(t, operationID2, addressSet[address2]) @@ -358,7 +369,7 @@ func TestAccountModel_IsAccountFeeBumpEligible(t *testing.T) { require.NoError(t, err) assert.False(t, isFeeBumpEligible) - result, err := m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + result, err := m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) rowAffected, err := result.RowsAffected() require.NoError(t, err) @@ -395,8 +406,9 @@ func TestAccountModelBatchGetByStateChangeIDs(t *testing.T) { stateChangeOrder1 := int64(1) stateChangeOrder2 := int64(1) - // Insert test accounts - _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", address1, address2) + // Insert test accounts (stellar_address is BYTEA) + _, err = m.DB.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", + types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Insert test transactions first @@ -407,7 +419,7 @@ func TestAccountModelBatchGetByStateChangeIDs(t *testing.T) { _, err = m.DB.ExecContext(ctx, "INSERT INTO operations (id, operation_type, operation_xdr, result_code, successful, ledger_number, ledger_created_at) VALUES (4097, 'PAYMENT', 'xdr1', 'op_success', true, 1, NOW()), (8193, 'PAYMENT', 'xdr2', 'op_success', true, 2, NOW())") require.NoError(t, err) - // Insert test state changes that reference the accounts + // Insert test state changes that reference the accounts (state_changes.account_id is TEXT) _, err = m.DB.ExecContext(ctx, ` INSERT INTO state_changes ( to_id, state_change_order, state_change_category, ledger_created_at, @@ -415,7 +427,7 @@ func TestAccountModelBatchGetByStateChangeIDs(t *testing.T) { ) VALUES ($1, $2, 'BALANCE', NOW(), 1, $3, 4097), ($4, $5, 'BALANCE', NOW(), 2, $6, 8193) - `, toID1, stateChangeOrder1, address1, toID2, stateChangeOrder2, address2) + `, toID1, stateChangeOrder1, types.AddressBytea(address1), toID2, stateChangeOrder2, types.AddressBytea(address2)) require.NoError(t, err) // Test BatchGetByStateChangeIDs function @@ -429,7 +441,7 @@ func TestAccountModelBatchGetByStateChangeIDs(t *testing.T) { // Verify accounts are returned with correct state_change_id (format: to_id-operation_id-state_change_order) addressSet := make(map[string]string) for _, acc := range accounts { - addressSet[acc.StellarAddress] = acc.StateChangeID + addressSet[string(acc.StellarAddress)] = acc.StateChangeID } assert.Equal(t, "4096-4097-1", addressSet[address1]) assert.Equal(t, "8192-8193-1", addressSet[address2]) diff --git a/internal/data/operations.go b/internal/data/operations.go index fbdcb9fae..d8f92e8ee 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -300,13 +300,17 @@ func (m *OperationModel) BatchInsert( ledgerCreatedAts[i] = op.LedgerCreatedAt } - // 2. Flatten the stellarAddressesByOpID into parallel slices + // 2. Flatten the stellarAddressesByOpID into parallel slices, converting to BYTEA var opIDs []int64 - var stellarAddresses []string + var stellarAddressBytes [][]byte for opID, addresses := range stellarAddressesByOpID { for address := range addresses.Iter() { opIDs = append(opIDs, opID) - stellarAddresses = append(stellarAddresses, address) + addrBytes, err := types.AddressBytea(address).Value() + if err != nil { + return nil, fmt.Errorf("converting address %s to bytes: %w", address, err) + } + stellarAddressBytes = append(stellarAddressBytes, addrBytes.([]byte)) } } @@ -342,7 +346,7 @@ func (m *OperationModel) BatchInsert( FROM ( SELECT UNNEST($8::bigint[]) AS op_id, - UNNEST($9::text[]) AS account_id + UNNEST($9::bytea[]) AS account_id ) oa ON CONFLICT DO NOTHING ) @@ -362,7 +366,7 @@ func (m *OperationModel) BatchInsert( pq.Array(ledgerNumbers), pq.Array(ledgerCreatedAts), pq.Array(opIDs), - pq.Array(stellarAddresses), + pq.Array(stellarAddressBytes), ) duration := time.Since(start).Seconds() for _, dbTableName := range []string{"operations", "operations_accounts"} { @@ -436,7 +440,12 @@ func (m *OperationModel) BatchCopy( for opID, addresses := range stellarAddressesByOpID { opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} for _, addr := range addresses.ToSlice() { - oaRows = append(oaRows, []any{opIDPgtype, pgtype.Text{String: addr, Valid: true}}) + var addrBytes any + addrBytes, err = types.AddressBytea(addr).Value() + if err != nil { + return 0, fmt.Errorf("converting address %s to bytes: %w", addr, err) + } + oaRows = append(oaRows, []any{opIDPgtype, addrBytes}) } } diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index cbf4dde82..fb8f64a34 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -56,8 +56,8 @@ func Test_OperationModel_BatchInsert(t *testing.T) { // Create test data kp1 := keypair.MustRandom() kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) SELECT UNNEST(ARRAY[$1, $2])" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address(), kp2.Address()) + const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) require.NoError(t, err) // Create referenced transactions first with specific ToIDs @@ -206,8 +206,8 @@ func Test_OperationModel_BatchInsert(t *testing.T) { // Verify the account links if len(tc.wantAccountLinks) > 0 { var accountLinks []struct { - OperationID int64 `db:"operation_id"` - AccountID string `db:"account_id"` + OperationID int64 `db:"operation_id"` + AccountID types.AddressBytea `db:"account_id"` } err = sqlExecuter.SelectContext(ctx, &accountLinks, "SELECT operation_id, account_id FROM operations_accounts ORDER BY operation_id, account_id") require.NoError(t, err) @@ -215,7 +215,7 @@ func Test_OperationModel_BatchInsert(t *testing.T) { // Create a map of operation_id -> set of account_ids for O(1) lookups accountLinksMap := make(map[int64][]string) for _, link := range accountLinks { - accountLinksMap[link.OperationID] = append(accountLinksMap[link.OperationID], link.AccountID) + accountLinksMap[link.OperationID] = append(accountLinksMap[link.OperationID], string(link.AccountID)) } // Verify each operation has its expected account links @@ -243,8 +243,8 @@ func Test_OperationModel_BatchCopy(t *testing.T) { // Create test accounts kp1 := keypair.MustRandom() kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) SELECT UNNEST(ARRAY[$1, $2])" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address(), kp2.Address()) + const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) require.NoError(t, err) // Create referenced transactions first with specific ToIDs @@ -384,8 +384,8 @@ func Test_OperationModel_BatchCopy(t *testing.T) { // Verify account links if expected if len(tc.stellarAddressesByOpID) > 0 && tc.wantCount > 0 { var accountLinks []struct { - OperationID int64 `db:"operation_id"` - AccountID string `db:"account_id"` + OperationID int64 `db:"operation_id"` + AccountID types.AddressBytea `db:"account_id"` } err = dbConnectionPool.SelectContext(ctx, &accountLinks, "SELECT operation_id, account_id FROM operations_accounts ORDER BY operation_id, account_id") require.NoError(t, err) @@ -393,7 +393,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) { // Create a map of operation_id -> set of account_ids accountLinksMap := make(map[int64][]string) for _, link := range accountLinks { - accountLinksMap[link.OperationID] = append(accountLinksMap[link.OperationID], link.AccountID) + accountLinksMap[link.OperationID] = append(accountLinksMap[link.OperationID], string(link.AccountID)) } // Verify each expected operation has its account links @@ -419,7 +419,7 @@ func Test_OperationModel_BatchCopy_DuplicateFails(t *testing.T) { // Create test accounts kp1 := keypair.MustRandom() const q = "INSERT INTO accounts (stellar_address) VALUES ($1)" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address()) + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address())) require.NoError(t, err) // Create a parent transaction that the operation will reference @@ -824,7 +824,7 @@ func TestOperationModel_BatchGetByAccountAddresses(t *testing.T) { (4097, $1), (8193, $1), (12289, $2) - `, address1, address2) + `, types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Test BatchGetByAccount diff --git a/internal/data/query_utils.go b/internal/data/query_utils.go index c458ba13d..d2304136a 100644 --- a/internal/data/query_utils.go +++ b/internal/data/query_utils.go @@ -65,6 +65,21 @@ func pgtypeInt2FromNullInt16(ni sql.NullInt16) pgtype.Int2 { return pgtype.Int2{Int16: ni.Int16, Valid: ni.Valid} } +// pgtypeBytesFromNullAddressBytea converts NullAddressBytea to bytes for BYTEA insert. +func pgtypeBytesFromNullAddressBytea(na types.NullAddressBytea) ([]byte, error) { + if !na.Valid { + return nil, nil + } + val, err := na.Value() + if err != nil { + return nil, fmt.Errorf("converting address to bytes: %w", err) + } + if val == nil { + return nil, nil + } + return val.([]byte), nil +} + // BuildPaginatedQuery constructs a paginated SQL query with cursor-based pagination func buildGetByAccountAddressQuery(config paginatedQueryConfig) (string, []any) { var queryBuilder strings.Builder @@ -75,8 +90,8 @@ func buildGetByAccountAddressQuery(config paginatedQueryConfig) (string, []any) queryBuilder.WriteString(fmt.Sprintf(` SELECT %s, %s.%s as cursor FROM %s - INNER JOIN %s - ON %s + INNER JOIN %s + ON %s WHERE %s.account_id = $%d`, config.Columns, config.TableName, @@ -86,7 +101,7 @@ func buildGetByAccountAddressQuery(config paginatedQueryConfig) (string, []any) config.JoinCondition, config.JoinTable, argIndex)) - args = append(args, config.AccountAddress) + args = append(args, types.AddressBytea(config.AccountAddress)) argIndex++ // Add cursor condition if provided diff --git a/internal/data/statechanges.go b/internal/data/statechanges.go index 442d55447..c9e957982 100644 --- a/internal/data/statechanges.go +++ b/internal/data/statechanges.go @@ -26,7 +26,7 @@ type StateChangeModel struct { func (m *StateChangeModel) BatchGetByAccountAddress(ctx context.Context, accountAddress string, txHash *string, operationID *int64, category *string, reason *string, columns string, limit *int32, cursor *types.StateChangeCursor, sortOrder SortOrder) ([]*types.StateChangeWithCursor, error) { columns = prepareColumnsWithID(columns, types.StateChange{}, "", "to_id", "operation_id", "state_change_order") var queryBuilder strings.Builder - args := []interface{}{accountAddress} + args := []interface{}{types.AddressBytea(accountAddress)} argIndex := 2 queryBuilder.WriteString(fmt.Sprintf(` @@ -184,16 +184,16 @@ func (m *StateChangeModel) BatchInsert( reasons := make([]*string, len(stateChanges)) ledgerCreatedAts := make([]time.Time, len(stateChanges)) ledgerNumbers := make([]int, len(stateChanges)) - accountIDs := make([]string, len(stateChanges)) + accountIDBytes := make([][]byte, len(stateChanges)) operationIDs := make([]int64, len(stateChanges)) tokenIDs := make([]*string, len(stateChanges)) amounts := make([]*string, len(stateChanges)) - signerAccountIDs := make([]*string, len(stateChanges)) - spenderAccountIDs := make([]*string, len(stateChanges)) - sponsoredAccountIDs := make([]*string, len(stateChanges)) - sponsorAccountIDs := make([]*string, len(stateChanges)) - deployerAccountIDs := make([]*string, len(stateChanges)) - funderAccountIDs := make([]*string, len(stateChanges)) + signerAccountIDBytes := make([][]byte, len(stateChanges)) + spenderAccountIDBytes := make([][]byte, len(stateChanges)) + sponsoredAccountIDBytes := make([][]byte, len(stateChanges)) + sponsorAccountIDBytes := make([][]byte, len(stateChanges)) + deployerAccountIDBytes := make([][]byte, len(stateChanges)) + funderAccountIDBytes := make([][]byte, len(stateChanges)) claimableBalanceIDs := make([]*string, len(stateChanges)) liquidityPoolIDs := make([]*string, len(stateChanges)) sponsoredDataValues := make([]*string, len(stateChanges)) @@ -212,9 +212,15 @@ func (m *StateChangeModel) BatchInsert( categories[i] = string(sc.StateChangeCategory) ledgerCreatedAts[i] = sc.LedgerCreatedAt ledgerNumbers[i] = int(sc.LedgerNumber) - accountIDs[i] = sc.AccountID operationIDs[i] = sc.OperationID + // Convert account_id to BYTEA (required field) + addrBytes, err := sc.AccountID.Value() + if err != nil { + return nil, fmt.Errorf("converting account_id: %w", err) + } + accountIDBytes[i] = addrBytes.([]byte) + // Nullable fields if sc.StateChangeReason != nil { reason := string(*sc.StateChangeReason) @@ -226,23 +232,31 @@ func (m *StateChangeModel) BatchInsert( if sc.Amount.Valid { amounts[i] = &sc.Amount.String } - if sc.SignerAccountID.Valid { - signerAccountIDs[i] = &sc.SignerAccountID.String + + // Convert nullable account_id fields to BYTEA + signerAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SignerAccountID) + if err != nil { + return nil, fmt.Errorf("converting signer_account_id: %w", err) } - if sc.SpenderAccountID.Valid { - spenderAccountIDs[i] = &sc.SpenderAccountID.String + spenderAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SpenderAccountID) + if err != nil { + return nil, fmt.Errorf("converting spender_account_id: %w", err) } - if sc.SponsoredAccountID.Valid { - sponsoredAccountIDs[i] = &sc.SponsoredAccountID.String + sponsoredAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SponsoredAccountID) + if err != nil { + return nil, fmt.Errorf("converting sponsored_account_id: %w", err) } - if sc.SponsorAccountID.Valid { - sponsorAccountIDs[i] = &sc.SponsorAccountID.String + sponsorAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.SponsorAccountID) + if err != nil { + return nil, fmt.Errorf("converting sponsor_account_id: %w", err) } - if sc.DeployerAccountID.Valid { - deployerAccountIDs[i] = &sc.DeployerAccountID.String + deployerAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.DeployerAccountID) + if err != nil { + return nil, fmt.Errorf("converting deployer_account_id: %w", err) } - if sc.FunderAccountID.Valid { - funderAccountIDs[i] = &sc.FunderAccountID.String + funderAccountIDBytes[i], err = pgtypeBytesFromNullAddressBytea(sc.FunderAccountID) + if err != nil { + return nil, fmt.Errorf("converting funder_account_id: %w", err) } if sc.ClaimableBalanceID.Valid { claimableBalanceIDs[i] = &sc.ClaimableBalanceID.String @@ -289,16 +303,16 @@ func (m *StateChangeModel) BatchInsert( UNNEST($4::text[]) AS state_change_reason, UNNEST($5::timestamptz[]) AS ledger_created_at, UNNEST($6::integer[]) AS ledger_number, - UNNEST($7::text[]) AS account_id, + UNNEST($7::bytea[]) AS account_id, UNNEST($8::bigint[]) AS operation_id, UNNEST($9::text[]) AS token_id, UNNEST($10::text[]) AS amount, - UNNEST($11::text[]) AS signer_account_id, - UNNEST($12::text[]) AS spender_account_id, - UNNEST($13::text[]) AS sponsored_account_id, - UNNEST($14::text[]) AS sponsor_account_id, - UNNEST($15::text[]) AS deployer_account_id, - UNNEST($16::text[]) AS funder_account_id, + UNNEST($11::bytea[]) AS signer_account_id, + UNNEST($12::bytea[]) AS spender_account_id, + UNNEST($13::bytea[]) AS sponsored_account_id, + UNNEST($14::bytea[]) AS sponsor_account_id, + UNNEST($15::bytea[]) AS deployer_account_id, + UNNEST($16::bytea[]) AS funder_account_id, UNNEST($17::text[]) AS claimable_balance_id, UNNEST($18::text[]) AS liquidity_pool_id, UNNEST($19::text[]) AS sponsored_data, @@ -342,16 +356,16 @@ func (m *StateChangeModel) BatchInsert( pq.Array(reasons), pq.Array(ledgerCreatedAts), pq.Array(ledgerNumbers), - pq.Array(accountIDs), + pq.Array(accountIDBytes), pq.Array(operationIDs), pq.Array(tokenIDs), pq.Array(amounts), - pq.Array(signerAccountIDs), - pq.Array(spenderAccountIDs), - pq.Array(sponsoredAccountIDs), - pq.Array(sponsorAccountIDs), - pq.Array(deployerAccountIDs), - pq.Array(funderAccountIDs), + pq.Array(signerAccountIDBytes), + pq.Array(spenderAccountIDBytes), + pq.Array(sponsoredAccountIDBytes), + pq.Array(sponsorAccountIDBytes), + pq.Array(deployerAccountIDBytes), + pq.Array(funderAccountIDBytes), pq.Array(claimableBalanceIDs), pq.Array(liquidityPoolIDs), pq.Array(sponsoredDataValues), @@ -410,6 +424,39 @@ func (m *StateChangeModel) BatchCopy( }, pgx.CopyFromSlice(len(stateChanges), func(i int) ([]any, error) { sc := stateChanges[i] + + // Convert account_id to BYTEA (required field) + accountBytes, err := sc.AccountID.Value() + if err != nil { + return nil, fmt.Errorf("converting account_id: %w", err) + } + + // Convert nullable account_id fields to BYTEA + signerBytes, err := pgtypeBytesFromNullAddressBytea(sc.SignerAccountID) + if err != nil { + return nil, fmt.Errorf("converting signer_account_id: %w", err) + } + spenderBytes, err := pgtypeBytesFromNullAddressBytea(sc.SpenderAccountID) + if err != nil { + return nil, fmt.Errorf("converting spender_account_id: %w", err) + } + sponsoredBytes, err := pgtypeBytesFromNullAddressBytea(sc.SponsoredAccountID) + if err != nil { + return nil, fmt.Errorf("converting sponsored_account_id: %w", err) + } + sponsorBytes, err := pgtypeBytesFromNullAddressBytea(sc.SponsorAccountID) + if err != nil { + return nil, fmt.Errorf("converting sponsor_account_id: %w", err) + } + deployerBytes, err := pgtypeBytesFromNullAddressBytea(sc.DeployerAccountID) + if err != nil { + return nil, fmt.Errorf("converting deployer_account_id: %w", err) + } + funderBytes, err := pgtypeBytesFromNullAddressBytea(sc.FunderAccountID) + if err != nil { + return nil, fmt.Errorf("converting funder_account_id: %w", err) + } + return []any{ pgtype.Int8{Int64: sc.ToID, Valid: true}, pgtype.Int8{Int64: sc.StateChangeOrder, Valid: true}, @@ -417,16 +464,16 @@ func (m *StateChangeModel) BatchCopy( pgtypeTextFromReasonPtr(sc.StateChangeReason), pgtype.Timestamptz{Time: sc.LedgerCreatedAt, Valid: true}, pgtype.Int4{Int32: int32(sc.LedgerNumber), Valid: true}, - pgtype.Text{String: sc.AccountID, Valid: true}, + accountBytes, pgtype.Int8{Int64: sc.OperationID, Valid: true}, pgtypeTextFromNullString(sc.TokenID), pgtypeTextFromNullString(sc.Amount), - pgtypeTextFromNullString(sc.SignerAccountID), - pgtypeTextFromNullString(sc.SpenderAccountID), - pgtypeTextFromNullString(sc.SponsoredAccountID), - pgtypeTextFromNullString(sc.SponsorAccountID), - pgtypeTextFromNullString(sc.DeployerAccountID), - pgtypeTextFromNullString(sc.FunderAccountID), + signerBytes, + spenderBytes, + sponsoredBytes, + sponsorBytes, + deployerBytes, + funderBytes, pgtypeTextFromNullString(sc.ClaimableBalanceID), pgtypeTextFromNullString(sc.LiquidityPoolID), pgtypeTextFromNullString(sc.SponsoredData), diff --git a/internal/data/statechanges_test.go b/internal/data/statechanges_test.go index c4c99b06f..59c10ab7e 100644 --- a/internal/data/statechanges_test.go +++ b/internal/data/statechanges_test.go @@ -22,12 +22,15 @@ import ( // generateTestStateChanges creates n test state changes for benchmarking. // Populates all fields to provide an upper-bound benchmark. -func generateTestStateChanges(n int, accountID string, startToID int64) []types.StateChange { +// The auxAddresses parameter provides pre-generated valid Stellar addresses for nullable account_id fields. +func generateTestStateChanges(n int, accountID string, startToID int64, auxAddresses []string) []types.StateChange { scs := make([]types.StateChange, n) now := time.Now() reason := types.StateChangeReasonCredit for i := 0; i < n; i++ { + // Use modulo to cycle through auxiliary addresses for nullable account_id fields + auxIdx := i % len(auxAddresses) scs[i] = types.StateChange{ ToID: startToID + int64(i), StateChangeOrder: 1, @@ -35,20 +38,21 @@ func generateTestStateChanges(n int, accountID string, startToID int64) []types. StateChangeReason: &reason, LedgerCreatedAt: now, LedgerNumber: uint32(i + 1), - AccountID: accountID, + AccountID: types.AddressBytea(accountID), OperationID: int64(i + 1), // sql.NullString fields - TokenID: sql.NullString{String: fmt.Sprintf("token_%d", i), Valid: true}, - Amount: sql.NullString{String: fmt.Sprintf("%d", (i+1)*100), Valid: true}, - SignerAccountID: sql.NullString{String: fmt.Sprintf("GSIGNER%032d", i), Valid: true}, - SpenderAccountID: sql.NullString{String: fmt.Sprintf("GSPENDER%031d", i), Valid: true}, - SponsoredAccountID: sql.NullString{String: fmt.Sprintf("GSPONSORED%028d", i), Valid: true}, - SponsorAccountID: sql.NullString{String: fmt.Sprintf("GSPONSOR%030d", i), Valid: true}, - DeployerAccountID: sql.NullString{String: fmt.Sprintf("GDEPLOYER%029d", i), Valid: true}, - FunderAccountID: sql.NullString{String: fmt.Sprintf("GFUNDER%031d", i), Valid: true}, + TokenID: sql.NullString{String: fmt.Sprintf("token_%d", i), Valid: true}, + Amount: sql.NullString{String: fmt.Sprintf("%d", (i+1)*100), Valid: true}, + // NullAddressBytea fields + SignerAccountID: types.NullAddressBytea{AddressBytea: types.AddressBytea(auxAddresses[auxIdx]), Valid: true}, + SpenderAccountID: types.NullAddressBytea{AddressBytea: types.AddressBytea(auxAddresses[(auxIdx+1)%len(auxAddresses)]), Valid: true}, + SponsoredAccountID: types.NullAddressBytea{AddressBytea: types.AddressBytea(auxAddresses[(auxIdx+2)%len(auxAddresses)]), Valid: true}, + SponsorAccountID: types.NullAddressBytea{AddressBytea: types.AddressBytea(auxAddresses[(auxIdx+3)%len(auxAddresses)]), Valid: true}, + DeployerAccountID: types.NullAddressBytea{AddressBytea: types.AddressBytea(auxAddresses[(auxIdx+4)%len(auxAddresses)]), Valid: true}, + FunderAccountID: types.NullAddressBytea{AddressBytea: types.AddressBytea(auxAddresses[(auxIdx+5)%len(auxAddresses)]), Valid: true}, // Typed fields (previously JSONB) - SignerWeightOld: sql.NullInt16{Int16: int16(i), Valid: true}, - SignerWeightNew: sql.NullInt16{Int16: int16(i + 1), Valid: true}, + SignerWeightOld: sql.NullInt16{Int16: int16(i % 256), Valid: true}, + SignerWeightNew: sql.NullInt16{Int16: int16((i + 1) % 256), Valid: true}, ThresholdOld: sql.NullInt16{Int16: 1, Valid: true}, ThresholdNew: sql.NullInt16{Int16: 2, Valid: true}, TrustlineLimitOld: sql.NullString{String: fmt.Sprintf("%d", i*1000), Valid: true}, @@ -74,8 +78,8 @@ func TestStateChangeModel_BatchInsert(t *testing.T) { // Create test data kp1 := keypair.MustRandom() kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) SELECT UNNEST(ARRAY[$1, $2])" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address(), kp2.Address()) + const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) require.NoError(t, err) // Create referenced transactions first @@ -120,7 +124,7 @@ func TestStateChangeModel_BatchInsert(t *testing.T) { StateChangeReason: &reason, LedgerCreatedAt: now, LedgerNumber: 1, - AccountID: kp1.Address(), + AccountID: types.AddressBytea(kp1.Address()), OperationID: 123, TokenID: sql.NullString{String: "token1", Valid: true}, Amount: sql.NullString{String: "100", Valid: true}, @@ -132,7 +136,7 @@ func TestStateChangeModel_BatchInsert(t *testing.T) { StateChangeReason: &reason, LedgerCreatedAt: now, LedgerNumber: 2, - AccountID: kp2.Address(), + AccountID: types.AddressBytea(kp2.Address()), OperationID: 456, } @@ -230,8 +234,8 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { // Create test accounts kp1 := keypair.MustRandom() kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) SELECT UNNEST(ARRAY[$1, $2])" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address(), kp2.Address()) + const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) require.NoError(t, err) // Create referenced transactions first @@ -276,7 +280,7 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { StateChangeReason: &reason, LedgerCreatedAt: now, LedgerNumber: 1, - AccountID: kp1.Address(), + AccountID: types.AddressBytea(kp1.Address()), OperationID: 123, TokenID: sql.NullString{String: "token1", Valid: true}, Amount: sql.NullString{String: "100", Valid: true}, @@ -288,7 +292,7 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { StateChangeReason: &reason, LedgerCreatedAt: now, LedgerNumber: 2, - AccountID: kp2.Address(), + AccountID: types.AddressBytea(kp2.Address()), OperationID: 456, } // State change with typed signer/threshold fields (uses to_id=1 to reference tx1) @@ -299,7 +303,7 @@ func TestStateChangeModel_BatchCopy(t *testing.T) { StateChangeReason: nil, LedgerCreatedAt: now, LedgerNumber: 3, - AccountID: kp1.Address(), + AccountID: types.AddressBytea(kp1.Address()), OperationID: 789, SignerWeightOld: sql.NullInt16{Int16: 0, Valid: true}, SignerWeightNew: sql.NullInt16{Int16: 10, Valid: true}, @@ -403,7 +407,7 @@ func TestStateChangeModel_BatchCopy_DuplicateFails(t *testing.T) { // Create test account kp1 := keypair.MustRandom() const q = "INSERT INTO accounts (stellar_address) VALUES ($1)" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address()) + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address())) require.NoError(t, err) // Create parent transaction @@ -421,7 +425,7 @@ func TestStateChangeModel_BatchCopy_DuplicateFails(t *testing.T) { StateChangeReason: &reason, LedgerCreatedAt: now, LedgerNumber: 1, - AccountID: kp1.Address(), + AccountID: types.AddressBytea(kp1.Address()), OperationID: 123, } @@ -479,7 +483,8 @@ func TestStateChangeModel_BatchGetByAccountAddress(t *testing.T) { // Create test accounts address1 := keypair.MustRandom().Address() address2 := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", address1, address2) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", + types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Create test transactions first @@ -499,7 +504,7 @@ func TestStateChangeModel_BatchGetByAccountAddress(t *testing.T) { (1, 1, 'BALANCE', $1, 1, $2, 123), (2, 1, 'BALANCE', $1, 2, $2, 456), (3, 1, 'BALANCE', $1, 3, $3, 789) - `, now, address1, address2) + `, now, types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) mockMetricsService := metrics.NewMockMetricsService() @@ -517,7 +522,7 @@ func TestStateChangeModel_BatchGetByAccountAddress(t *testing.T) { require.NoError(t, err) assert.Len(t, stateChanges, 2) for _, sc := range stateChanges { - assert.Equal(t, address1, sc.AccountID) + assert.Equal(t, address1, sc.AccountID.String()) } // Test BatchGetByAccount for address2 @@ -525,7 +530,7 @@ func TestStateChangeModel_BatchGetByAccountAddress(t *testing.T) { require.NoError(t, err) assert.Len(t, stateChanges, 1) for _, sc := range stateChanges { - assert.Equal(t, address2, sc.AccountID) + assert.Equal(t, address2, sc.AccountID.String()) } } @@ -541,7 +546,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { // Create test account address := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) // Create test transactions @@ -564,7 +569,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { (3, 1, 'SIGNER', 'ADD', $1, 3, $2, 789), (1, 2, 'BALANCE', 'DEBIT', $1, 4, $2, 124), (2, 2, 'SIGNER', 'ADD', $1, 5, $2, 999) - `, now, address) + `, now, types.AddressBytea(address)) require.NoError(t, err) t.Run("filter by transaction hash only", func(t *testing.T) { @@ -585,7 +590,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { assert.Len(t, stateChanges, 2) for _, sc := range stateChanges { assert.Equal(t, int64(1), sc.ToID) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -607,7 +612,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { assert.Len(t, stateChanges, 1) for _, sc := range stateChanges { assert.Equal(t, int64(123), sc.OperationID) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -631,7 +636,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { for _, sc := range stateChanges { assert.Equal(t, int64(1), sc.ToID) assert.Equal(t, int64(123), sc.OperationID) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -652,7 +657,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { assert.Len(t, stateChanges, 3) for _, sc := range stateChanges { assert.Equal(t, types.StateChangeCategoryBalance, sc.StateChangeCategory) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -673,7 +678,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { assert.Len(t, stateChanges, 2) for _, sc := range stateChanges { assert.Equal(t, types.StateChangeReasonAdd, *sc.StateChangeReason) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -696,7 +701,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { for _, sc := range stateChanges { assert.Equal(t, types.StateChangeCategorySigner, sc.StateChangeCategory) assert.Equal(t, types.StateChangeReasonAdd, *sc.StateChangeReason) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -723,7 +728,7 @@ func TestStateChangeModel_BatchGetByAccountAddress_WithFilters(t *testing.T) { assert.Equal(t, int64(123), sc.OperationID) assert.Equal(t, types.StateChangeCategoryBalance, sc.StateChangeCategory) assert.Equal(t, types.StateChangeReasonCredit, *sc.StateChangeReason) - assert.Equal(t, address, sc.AccountID) + assert.Equal(t, address, sc.AccountID.String()) } }) @@ -786,7 +791,7 @@ func TestStateChangeModel_GetAll(t *testing.T) { // Create test account address := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) // Create test transactions first @@ -806,7 +811,7 @@ func TestStateChangeModel_GetAll(t *testing.T) { (1, 1, 'BALANCE', $1, 1, $2, 123), (2, 1, 'BALANCE', $1, 2, $2, 456), (3, 1, 'BALANCE', $1, 3, $2, 789) - `, now, address) + `, now, types.AddressBytea(address)) require.NoError(t, err) // Test GetAll without limit @@ -833,7 +838,7 @@ func TestStateChangeModel_BatchGetByToIDs(t *testing.T) { // Create test account address := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) // Create test transactions first @@ -856,7 +861,7 @@ func TestStateChangeModel_BatchGetByToIDs(t *testing.T) { (2, 1, 'BALANCE', $1, 4, $2, 456), (2, 2, 'BALANCE', $1, 5, $2, 457), (3, 1, 'BALANCE', $1, 6, $2, 789) - `, now, address) + `, now, types.AddressBytea(address)) require.NoError(t, err) testCases := []struct { @@ -989,7 +994,7 @@ func TestStateChangeModel_BatchGetByOperationIDs(t *testing.T) { // Create test account address := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) // Create test transactions first @@ -1009,7 +1014,7 @@ func TestStateChangeModel_BatchGetByOperationIDs(t *testing.T) { (1, 1, 'BALANCE', $1, 1, $2, 123), (2, 1, 'BALANCE', $1, 2, $2, 456), (3, 1, 'BALANCE', $1, 3, $2, 123) - `, now, address) + `, now, types.AddressBytea(address)) require.NoError(t, err) // Test BatchGetByOperationID @@ -1049,7 +1054,7 @@ func TestStateChangeModel_BatchGetByToID(t *testing.T) { // Create test account address := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) // Create test transactions first @@ -1069,7 +1074,7 @@ func TestStateChangeModel_BatchGetByToID(t *testing.T) { (1, 2, 'BALANCE', $1, 2, $2, 124), (1, 3, 'BALANCE', $1, 3, $2, 125), (2, 1, 'BALANCE', $1, 4, $2, 456) - `, now, address) + `, now, types.AddressBytea(address)) require.NoError(t, err) t.Run("get all state changes for single to_id", func(t *testing.T) { @@ -1161,6 +1166,12 @@ func BenchmarkStateChangeModel_BatchInsert(b *testing.B) { b.Fatalf("failed to create parent transaction: %v", err) } + // Pre-generate auxiliary addresses for nullable account_id fields + auxAddresses := make([]string, 10) + for i := range auxAddresses { + auxAddresses[i] = keypair.MustRandom().Address() + } + batchSizes := []int{1000, 5000, 10000, 50000, 100000} for _, size := range batchSizes { @@ -1173,7 +1184,7 @@ func BenchmarkStateChangeModel_BatchInsert(b *testing.B) { //nolint:errcheck // truncate is best-effort cleanup in benchmarks dbConnectionPool.ExecContext(ctx, "TRUNCATE state_changes CASCADE") // Generate fresh test data for each iteration - scs := generateTestStateChanges(size, accountID, int64(i*size)) + scs := generateTestStateChanges(size, accountID, int64(i*size), auxAddresses) b.StartTimer() _, err := m.BatchInsert(ctx, nil, scs) @@ -1226,6 +1237,12 @@ func BenchmarkStateChangeModel_BatchCopy(b *testing.B) { b.Fatalf("failed to create parent transaction: %v", err) } + // Pre-generate auxiliary addresses for nullable account_id fields + auxAddresses := make([]string, 10) + for i := range auxAddresses { + auxAddresses[i] = keypair.MustRandom().Address() + } + batchSizes := []int{1000, 5000, 10000, 50000, 100000} for _, size := range batchSizes { @@ -1241,7 +1258,7 @@ func BenchmarkStateChangeModel_BatchCopy(b *testing.B) { } // Generate fresh test data for each iteration - scs := generateTestStateChanges(size, accountID, int64(i*size)) + scs := generateTestStateChanges(size, accountID, int64(i*size), auxAddresses) // Start a pgx transaction pgxTx, err := conn.Begin(ctx) diff --git a/internal/data/transactions.go b/internal/data/transactions.go index e33b07cb1..f25e13ebd 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -212,13 +212,17 @@ func (m *TransactionModel) BatchInsert( isFeeBumps[i] = t.IsFeeBump } - // 2. Flatten the stellarAddressesByToID into parallel slices + // 2. Flatten the stellarAddressesByToID into parallel slices, converting to BYTEA var txToIDs []int64 - var stellarAddresses []string + var stellarAddressBytes [][]byte for toID, addresses := range stellarAddressesByToID { for address := range addresses.Iter() { txToIDs = append(txToIDs, toID) - stellarAddresses = append(stellarAddresses, address) + addrBytes, err := types.AddressBytea(address).Value() + if err != nil { + return nil, fmt.Errorf("converting address %s to bytes: %w", address, err) + } + stellarAddressBytes = append(stellarAddressBytes, addrBytes.([]byte)) } } @@ -256,7 +260,7 @@ func (m *TransactionModel) BatchInsert( FROM ( SELECT UNNEST($10::bigint[]) AS tx_to_id, - UNNEST($11::text[]) AS account_id + UNNEST($11::bytea[]) AS account_id ) ta ON CONFLICT DO NOTHING ) @@ -278,7 +282,7 @@ func (m *TransactionModel) BatchInsert( pq.Array(ledgerCreatedAts), pq.Array(isFeeBumps), pq.Array(txToIDs), - pq.Array(stellarAddresses), + pq.Array(stellarAddressBytes), ) duration := time.Since(start).Seconds() for _, dbTableName := range []string{"transactions", "transactions_accounts"} { @@ -354,7 +358,11 @@ func (m *TransactionModel) BatchCopy( for toID, addresses := range stellarAddressesByToID { toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} for _, addr := range addresses.ToSlice() { - taRows = append(taRows, []any{toIDPgtype, pgtype.Text{String: addr, Valid: true}}) + addrBytes, err := types.AddressBytea(addr).Value() + if err != nil { + return 0, fmt.Errorf("converting address %s to bytes: %w", addr, err) + } + taRows = append(taRows, []any{toIDPgtype, addrBytes}) } } diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index 8a978ac9b..34bd6a405 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -66,8 +66,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { // Create test data kp1 := keypair.MustRandom() kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) SELECT UNNEST(ARRAY[$1, $2])" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address(), kp2.Address()) + const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) require.NoError(t, err) meta1, meta2 := "meta1", "meta2" @@ -194,8 +194,8 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { // Verify the account links if len(tc.wantAccountLinks) > 0 { var accountLinks []struct { - TxToID int64 `db:"tx_to_id"` - AccountID string `db:"account_id"` + TxToID int64 `db:"tx_to_id"` + AccountID types.AddressBytea `db:"account_id"` } err = sqlExecuter.SelectContext(ctx, &accountLinks, "SELECT tx_to_id, account_id FROM transactions_accounts ORDER BY tx_to_id, account_id") require.NoError(t, err) @@ -203,7 +203,7 @@ func Test_TransactionModel_BatchInsert(t *testing.T) { // Create a map of tx_to_id -> set of account_ids for O(1) lookups accountLinksMap := make(map[int64][]string) for _, link := range accountLinks { - accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], link.AccountID) + accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], string(link.AccountID)) } // Verify each transaction has its expected account links @@ -231,8 +231,8 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { // Create test accounts kp1 := keypair.MustRandom() kp2 := keypair.MustRandom() - const q = "INSERT INTO accounts (stellar_address) SELECT UNNEST(ARRAY[$1, $2])" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address(), kp2.Address()) + const q = "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)" + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address()), types.AddressBytea(kp2.Address())) require.NoError(t, err) meta1, meta2 := "meta1", "meta2" @@ -364,8 +364,8 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { // Verify account links if expected if len(tc.stellarAddressesByToID) > 0 && tc.wantCount > 0 { var accountLinks []struct { - TxToID int64 `db:"tx_to_id"` - AccountID string `db:"account_id"` + TxToID int64 `db:"tx_to_id"` + AccountID types.AddressBytea `db:"account_id"` } err = dbConnectionPool.SelectContext(ctx, &accountLinks, "SELECT tx_to_id, account_id FROM transactions_accounts ORDER BY tx_to_id, account_id") require.NoError(t, err) @@ -373,7 +373,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { // Create a map of tx_to_id -> set of account_ids accountLinksMap := make(map[int64][]string) for _, link := range accountLinks { - accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], link.AccountID) + accountLinksMap[link.TxToID] = append(accountLinksMap[link.TxToID], string(link.AccountID)) } // Verify each expected transaction has its account links @@ -399,7 +399,7 @@ func Test_TransactionModel_BatchCopy_DuplicateFails(t *testing.T) { // Create test account kp1 := keypair.MustRandom() const q = "INSERT INTO accounts (stellar_address) VALUES ($1)" - _, err = dbConnectionPool.ExecContext(ctx, q, kp1.Address()) + _, err = dbConnectionPool.ExecContext(ctx, q, types.AddressBytea(kp1.Address())) require.NoError(t, err) meta := "meta1" @@ -568,7 +568,8 @@ func TestTransactionModel_BatchGetByAccountAddress(t *testing.T) { // Create test accounts address1 := keypair.MustRandom().Address() address2 := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", address1, address2) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1), ($2)", + types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Create test transactions @@ -581,14 +582,14 @@ func TestTransactionModel_BatchGetByAccountAddress(t *testing.T) { `, now) require.NoError(t, err) - // Create test transactions_accounts links + // Create test transactions_accounts links (account_id is BYTEA) _, err = dbConnectionPool.ExecContext(ctx, ` INSERT INTO transactions_accounts (tx_to_id, account_id) VALUES (1, $1), (2, $1), (3, $2) - `, address1, address2) + `, types.AddressBytea(address1), types.AddressBytea(address2)) require.NoError(t, err) // Test BatchGetByAccount @@ -682,7 +683,7 @@ func TestTransactionModel_BatchGetByStateChangeIDs(t *testing.T) { // Create test account address := keypair.MustRandom().Address() - _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", address) + _, err = dbConnectionPool.ExecContext(ctx, "INSERT INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) // Create test transactions diff --git a/internal/db/migrations/2024-04-29.1-accounts.sql b/internal/db/migrations/2024-04-29.1-accounts.sql index 5d2f81e11..7c861b598 100644 --- a/internal/db/migrations/2024-04-29.1-accounts.sql +++ b/internal/db/migrations/2024-04-29.1-accounts.sql @@ -1,7 +1,7 @@ -- +migrate Up CREATE TABLE accounts ( - stellar_address text NOT NULL, + stellar_address BYTEA NOT NULL, created_at timestamp with time zone NOT NULL DEFAULT NOW(), PRIMARY KEY (stellar_address) ); diff --git a/internal/db/migrations/2025-06-10.2-transactions.sql b/internal/db/migrations/2025-06-10.2-transactions.sql index 16cac0572..4ec3abf16 100644 --- a/internal/db/migrations/2025-06-10.2-transactions.sql +++ b/internal/db/migrations/2025-06-10.2-transactions.sql @@ -19,7 +19,7 @@ CREATE INDEX idx_transactions_ledger_created_at ON transactions(ledger_created_a -- Table: transactions_accounts CREATE TABLE transactions_accounts ( tx_to_id BIGINT NOT NULL REFERENCES transactions(to_id) ON DELETE CASCADE, - account_id TEXT NOT NULL, + account_id BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (account_id, tx_to_id) ); diff --git a/internal/db/migrations/2025-06-10.3-operations.sql b/internal/db/migrations/2025-06-10.3-operations.sql index 22ad0a854..a7cb91616 100644 --- a/internal/db/migrations/2025-06-10.3-operations.sql +++ b/internal/db/migrations/2025-06-10.3-operations.sql @@ -30,7 +30,7 @@ CREATE INDEX idx_operations_ledger_created_at ON operations(ledger_created_at); -- Table: operations_accounts CREATE TABLE operations_accounts ( operation_id BIGINT NOT NULL REFERENCES operations(id) ON DELETE CASCADE, - account_id TEXT NOT NULL, + account_id BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (account_id, operation_id) ); diff --git a/internal/db/migrations/2025-06-10.4-statechanges.sql b/internal/db/migrations/2025-06-10.4-statechanges.sql index 6dfcfacc0..eb41b37ac 100644 --- a/internal/db/migrations/2025-06-10.4-statechanges.sql +++ b/internal/db/migrations/2025-06-10.4-statechanges.sql @@ -21,16 +21,16 @@ CREATE TABLE state_changes ( ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), ledger_created_at TIMESTAMPTZ NOT NULL, ledger_number INTEGER NOT NULL, - account_id TEXT NOT NULL, + account_id BYTEA NOT NULL, operation_id BIGINT NOT NULL, token_id TEXT, amount TEXT, - signer_account_id TEXT, - spender_account_id TEXT, - sponsored_account_id TEXT, - sponsor_account_id TEXT, - deployer_account_id TEXT, - funder_account_id TEXT, + signer_account_id BYTEA, + spender_account_id BYTEA, + sponsored_account_id BYTEA, + sponsor_account_id BYTEA, + deployer_account_id BYTEA, + funder_account_id BYTEA, claimable_balance_id TEXT, liquidity_pool_id TEXT, sponsored_data TEXT, diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index c435e3c81..4adad122e 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -184,7 +184,7 @@ func (i *Indexer) processTransaction(ctx context.Context, tx ingest.LedgerTransa allParticipants = allParticipants.Union(opParticipants.Participants) } for _, stateChange := range stateChanges { - allParticipants.Add(stateChange.AccountID) + allParticipants.Add(string(stateChange.AccountID)) } // Insert transaction participants @@ -251,7 +251,7 @@ func (i *Indexer) processTransaction(ctx context.Context, tx ingest.LedgerTransa // Only store contract changes when contract token is SEP41 if stateChange.ContractType == types.ContractTypeSEP41 { contractChange := types.ContractChange{ - AccountID: stateChange.AccountID, + AccountID: string(stateChange.AccountID), OperationID: stateChange.OperationID, ContractID: stateChange.TokenID.String, LedgerNumber: tx.Ledger.LedgerSequence(), diff --git a/internal/indexer/indexer_buffer.go b/internal/indexer/indexer_buffer.go index 6594e8fa7..3f5714c38 100644 --- a/internal/indexer/indexer_buffer.go +++ b/internal/indexer/indexer_buffer.go @@ -382,10 +382,10 @@ func (b *IndexerBuffer) PushStateChange(transaction types.Transaction, operation defer b.mu.Unlock() b.stateChanges = append(b.stateChanges, stateChange) - b.pushTransactionUnsafe(stateChange.AccountID, &transaction) + b.pushTransactionUnsafe(string(stateChange.AccountID), &transaction) // Fee changes dont have an operation ID associated with them if stateChange.OperationID != 0 { - b.pushOperationUnsafe(stateChange.AccountID, &operation) + b.pushOperationUnsafe(string(stateChange.AccountID), &operation) } } diff --git a/internal/indexer/indexer_buffer_test.go b/internal/indexer/indexer_buffer_test.go index f2088480d..93818cffc 100644 --- a/internal/indexer/indexer_buffer_test.go +++ b/internal/indexer/indexer_buffer_test.go @@ -17,7 +17,7 @@ func buildStateChange(toID int64, reason types.StateChangeReason, accountID stri ToID: toID, StateChangeCategory: types.StateChangeCategoryBalance, StateChangeReason: &reason, - AccountID: accountID, + AccountID: types.AddressBytea(accountID), OperationID: operationID, SortKey: fmt.Sprintf("%d:%s:%s", toID, types.StateChangeCategoryBalance, accountID), } diff --git a/internal/indexer/indexer_test.go b/internal/indexer/indexer_test.go index e54c312de..7725f5b8e 100644 --- a/internal/indexer/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -647,19 +647,19 @@ func TestIndexer_ProcessLedgerTransactions(t *testing.T) { require.Len(t, stateChanges, 3, "should have 3 state changes") // Verify first state change - assert.Equal(t, "alice", stateChanges[0].AccountID) + assert.Equal(t, "alice", stateChanges[0].AccountID.String()) assert.Equal(t, int64(1), stateChanges[0].ToID) assert.Equal(t, int64(1), stateChanges[0].OperationID) assert.Equal(t, int64(1), stateChanges[0].StateChangeOrder, "first state change should have order 1") // Verify second state change - assert.Equal(t, "alice", stateChanges[1].AccountID) + assert.Equal(t, "alice", stateChanges[1].AccountID.String()) assert.Equal(t, int64(2), stateChanges[1].ToID) assert.Equal(t, int64(1), stateChanges[1].OperationID) assert.Equal(t, int64(2), stateChanges[1].StateChangeOrder, "second state change should have order 2") // Verify third state change - assert.Equal(t, "alice", stateChanges[2].AccountID) + assert.Equal(t, "alice", stateChanges[2].AccountID.String()) assert.Equal(t, int64(3), stateChanges[2].ToID) assert.Equal(t, int64(1), stateChanges[2].OperationID) assert.Equal(t, int64(3), stateChanges[2].StateChangeOrder, "third state change should have order 3") @@ -728,7 +728,7 @@ func TestIndexer_getTransactionStateChanges(t *testing.T) { foundBob := false foundCharlie := false for _, sc := range stateChanges { - switch sc.AccountID { + switch sc.AccountID.String() { case "alice": assert.Equal(t, int64(1), sc.ToID) assert.Equal(t, int64(1), sc.OperationID) @@ -914,7 +914,7 @@ func TestIndexer_getTransactionStateChanges(t *testing.T) { // Verify it's the correct state change sc := stateChanges[0] - assert.Equal(t, "alice", sc.AccountID) + assert.Equal(t, "alice", sc.AccountID.String()) assert.Equal(t, int64(1), sc.ToID) assert.Equal(t, int64(1), sc.OperationID) diff --git a/internal/indexer/processors/contracts/test_utils.go b/internal/indexer/processors/contracts/test_utils.go index 66e68d0e7..d6ca9c03e 100644 --- a/internal/indexer/processors/contracts/test_utils.go +++ b/internal/indexer/processors/contracts/test_utils.go @@ -625,7 +625,7 @@ func createInvalidBalanceMapTx(contractAccount, admin string, asset xdr.Asset, i func assertContractEvent(t *testing.T, change types.StateChange, reason types.StateChangeReason, expectedAccount string, expectedContractID string) { t.Helper() require.Equal(t, types.StateChangeCategoryBalanceAuthorization, change.StateChangeCategory) - require.Equal(t, expectedAccount, change.AccountID) + require.Equal(t, expectedAccount, change.AccountID.String()) if expectedContractID != "" { require.NotNil(t, change.TokenID) require.Equal(t, expectedContractID, change.TokenID.String) diff --git a/internal/indexer/processors/contracts_test_utils.go b/internal/indexer/processors/contracts_test_utils.go index d544c3ef7..db4f284f7 100644 --- a/internal/indexer/processors/contracts_test_utils.go +++ b/internal/indexer/processors/contracts_test_utils.go @@ -170,11 +170,11 @@ func assertStateChangesElementsMatch(t *testing.T, want []types.StateChange, got wantMap := make(map[string]types.StateChange) for _, w := range want { - wantMap[fmt.Sprintf("%d-%s-%s", w.ToID, w.AccountID, w.DeployerAccountID.String)] = w + wantMap[fmt.Sprintf("%d-%s-%s", w.ToID, w.AccountID, w.DeployerAccountID.String())] = w } for _, g := range got { - key := fmt.Sprintf("%d-%s-%s", g.ToID, g.AccountID, g.DeployerAccountID.String) + key := fmt.Sprintf("%d-%s-%s", g.ToID, g.AccountID, g.DeployerAccountID.String()) if _, ok := wantMap[key]; !ok { assert.Fail(t, "state change not found", "state change id: %s", key) } diff --git a/internal/indexer/processors/effects_test.go b/internal/indexer/processors/effects_test.go index 623ee254e..335b79033 100644 --- a/internal/indexer/processors/effects_test.go +++ b/internal/indexer/processors/effects_test.go @@ -85,12 +85,12 @@ func TestEffects_ProcessTransaction(t *testing.T) { switch *change.StateChangeReason { case types.StateChangeReasonUpdate: assert.True(t, change.SignerAccountID.Valid) - assert.Equal(t, "GC4XF7RE3R4P77GY5XNGICM56IOKUURWAAANPXHFC7G5H6FCNQVVH3OH", change.SignerAccountID.String) + assert.Equal(t, "GC4XF7RE3R4P77GY5XNGICM56IOKUURWAAANPXHFC7G5H6FCNQVVH3OH", change.SignerAccountID.String()) assert.Equal(t, int16(1), change.SignerWeightOld.Int16) assert.Equal(t, int16(3), change.SignerWeightNew.Int16) case types.StateChangeReasonAdd: assert.True(t, change.SignerAccountID.Valid) - assert.Equal(t, "GAQHWQYBBW272OOXNQMMLCA5WY2XAZPODGB7Q3S5OKKIXVESKO55ZQ7C", change.SignerAccountID.String) + assert.Equal(t, "GAQHWQYBBW272OOXNQMMLCA5WY2XAZPODGB7Q3S5OKKIXVESKO55ZQ7C", change.SignerAccountID.String()) assert.False(t, change.SignerWeightOld.Valid) // New signer has no old weight assert.Equal(t, int16(2), change.SignerWeightNew.Int16) } @@ -295,45 +295,45 @@ func TestEffects_ProcessTransaction(t *testing.T) { // TxHash removed - lookup via to_id instead assert.Equal(t, types.StateChangeCategoryReserves, changes[1].StateChangeCategory) assert.Equal(t, types.StateChangeReasonUnsponsor, *changes[1].StateChangeReason) - assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[1].AccountID) - assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[1].SponsoredAccountID.String) + assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[1].AccountID.String()) + assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[1].SponsoredAccountID.String()) assert.Equal(t, types.StateChangeCategoryReserves, changes[2].StateChangeCategory) assert.Equal(t, types.StateChangeReasonUnsponsor, *changes[2].StateChangeReason) - assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[2].AccountID) - assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[2].SponsorAccountID.String) + assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[2].AccountID.String()) + assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[2].SponsorAccountID.String()) // Updating sponsorship creates 4 state changes - one for the new sponsor, one for the former sponsor, and two for the target account assert.Equal(t, types.StateChangeCategoryReserves, changes[3].StateChangeCategory) assert.Equal(t, types.StateChangeReasonSponsor, *changes[3].StateChangeReason) - assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[3].AccountID) - assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[3].SponsoredAccountID.String) + assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[3].AccountID.String()) + assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[3].SponsoredAccountID.String()) assert.Equal(t, types.StateChangeCategoryReserves, changes[4].StateChangeCategory) assert.Equal(t, types.StateChangeReasonUnsponsor, *changes[4].StateChangeReason) - assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[4].AccountID) - assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[4].SponsoredAccountID.String) + assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[4].AccountID.String()) + assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[4].SponsoredAccountID.String()) assert.Equal(t, types.StateChangeCategoryReserves, changes[5].StateChangeCategory) assert.Equal(t, types.StateChangeReasonSponsor, *changes[5].StateChangeReason) - assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[5].AccountID) - assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[5].SponsorAccountID.String) + assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[5].AccountID.String()) + assert.Equal(t, "GACMZD5VJXTRLKVET72CETCYKELPNCOTTBDC6DHFEUPLG5DHEK534JQX", changes[5].SponsorAccountID.String()) assert.Equal(t, types.StateChangeCategoryReserves, changes[6].StateChangeCategory) assert.Equal(t, types.StateChangeReasonUnsponsor, *changes[6].StateChangeReason) - assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[6].AccountID) - assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[6].SponsorAccountID.String) + assert.Equal(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", changes[6].AccountID.String()) + assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[6].SponsorAccountID.String()) // Sponsorship created creates two state changes - one for the sponsor and one for the target account assert.Equal(t, types.StateChangeCategoryReserves, changes[7].StateChangeCategory) assert.Equal(t, types.StateChangeReasonSponsor, *changes[7].StateChangeReason) - assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[7].AccountID) - assert.Equal(t, "GCQZP3IU7XU6EJ63JZXKCQOYT2RNXN3HB5CNHENNUEUHSMA4VUJJJSEN", changes[7].SponsoredAccountID.String) + assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[7].AccountID.String()) + assert.Equal(t, "GCQZP3IU7XU6EJ63JZXKCQOYT2RNXN3HB5CNHENNUEUHSMA4VUJJJSEN", changes[7].SponsoredAccountID.String()) assert.Equal(t, types.StateChangeCategoryReserves, changes[8].StateChangeCategory) assert.Equal(t, types.StateChangeReasonSponsor, *changes[8].StateChangeReason) - assert.Equal(t, "GCQZP3IU7XU6EJ63JZXKCQOYT2RNXN3HB5CNHENNUEUHSMA4VUJJJSEN", changes[8].AccountID) - assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[8].SponsorAccountID.String) + assert.Equal(t, "GCQZP3IU7XU6EJ63JZXKCQOYT2RNXN3HB5CNHENNUEUHSMA4VUJJJSEN", changes[8].AccountID.String()) + assert.Equal(t, "GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A", changes[8].SponsorAccountID.String()) }) t.Run("ChangeTrust - trustline created", func(t *testing.T) { envelopeXDR := "AAAAAgAAAAAf1miSBZ7jc0TxIHULMUqdj+dibtkh1JEEwITVtQ05ZgAAAGQAB1eLAAAAAwAAAAEAAAAAAAAAAAAAAABowwQqAAAAAAAAAAEAAAAAAAAABgAAAAFURVNUAAAAAFrnJwiWP46hSSjcYc6wY93h556Qpe47SA8bIQGXMJTlf/////////8AAAAAAAAAAbUNOWYAAABAzWelNCrF4Q+iSKX30xHrBm76FMa2h89pPauijrWAVlcj/swEyYZqjU94SYU+8XEWUuvg2rpjCIHGPHHyzSXlAw==" diff --git a/internal/indexer/processors/processors_test_utils.go b/internal/indexer/processors/processors_test_utils.go index 9f98f7a37..a7e7c014b 100644 --- a/internal/indexer/processors/processors_test_utils.go +++ b/internal/indexer/processors/processors_test_utils.go @@ -806,7 +806,7 @@ func requireEventCount(t *testing.T, changes []types.StateChange, expectedCount func assertStateChangeBase(t *testing.T, change types.StateChange, category types.StateChangeCategory, expectedAccount string, expectedAmount string, expectedToken string) { t.Helper() require.Equal(t, category, change.StateChangeCategory) - require.Equal(t, expectedAccount, change.AccountID) + require.Equal(t, expectedAccount, change.AccountID.String()) if expectedAmount != "" { require.Equal(t, utils.SQLNullString(expectedAmount), change.Amount) } diff --git a/internal/indexer/processors/state_change_builder.go b/internal/indexer/processors/state_change_builder.go index b50436e71..a6b6e26d1 100644 --- a/internal/indexer/processors/state_change_builder.go +++ b/internal/indexer/processors/state_change_builder.go @@ -76,13 +76,13 @@ func (b *StateChangeBuilder) WithFlags(flags []string) *StateChangeBuilder { // WithAccount sets the account ID func (b *StateChangeBuilder) WithAccount(accountID string) *StateChangeBuilder { - b.base.AccountID = accountID + b.base.AccountID = types.AddressBytea(accountID) return b } // WithSigner sets the signer account ID and the weights directly func (b *StateChangeBuilder) WithSigner(signer string, oldWeight, newWeight *int16) *StateChangeBuilder { - b.base.SignerAccountID = utils.SQLNullString(signer) + b.base.SignerAccountID = utils.NullAddressBytea(signer) if oldWeight != nil { b.base.SignerWeightOld = sql.NullInt16{Int16: *oldWeight, Valid: true} } @@ -94,19 +94,19 @@ func (b *StateChangeBuilder) WithSigner(signer string, oldWeight, newWeight *int // WithDeployer sets the deployer account ID, usually associated with a contract deployment. func (b *StateChangeBuilder) WithDeployer(deployer string) *StateChangeBuilder { - b.base.DeployerAccountID = utils.SQLNullString(deployer) + b.base.DeployerAccountID = utils.NullAddressBytea(deployer) return b } // WithFunder sets the funder account ID func (b *StateChangeBuilder) WithFunder(funder string) *StateChangeBuilder { - b.base.FunderAccountID = utils.SQLNullString(funder) + b.base.FunderAccountID = utils.NullAddressBytea(funder) return b } // WithSponsor sets the sponsor func (b *StateChangeBuilder) WithSponsor(sponsor string) *StateChangeBuilder { - b.base.SponsorAccountID = utils.SQLNullString(sponsor) + b.base.SponsorAccountID = utils.NullAddressBytea(sponsor) return b } @@ -136,7 +136,7 @@ func (b *StateChangeBuilder) WithTokenType(tokenType types.ContractType) *StateC // WithSponsoredAccountID sets the sponsored account ID for a sponsorship state change func (b *StateChangeBuilder) WithSponsoredAccountID(sponsoredAccountID string) *StateChangeBuilder { - b.base.SponsoredAccountID = utils.SQLNullString(sponsoredAccountID) + b.base.SponsoredAccountID = utils.NullAddressBytea(sponsoredAccountID) return b } @@ -192,10 +192,10 @@ func (b *StateChangeBuilder) generateSortKey() string { b.base.AccountID, b.base.TokenID.String, b.base.Amount.String, - b.base.SignerAccountID.String, - b.base.SpenderAccountID.String, - b.base.SponsoredAccountID.String, - b.base.SponsorAccountID.String, + b.base.SignerAccountID.String(), + b.base.SpenderAccountID.String(), + b.base.SponsoredAccountID.String(), + b.base.SponsorAccountID.String(), b.base.SignerWeightOld.Int16, b.base.SignerWeightNew.Int16, b.base.ThresholdOld.Int16, diff --git a/internal/indexer/processors/token_transfer_test.go b/internal/indexer/processors/token_transfer_test.go index c1e85cac7..35cb46b88 100644 --- a/internal/indexer/processors/token_transfer_test.go +++ b/internal/indexer/processors/token_transfer_test.go @@ -75,7 +75,7 @@ func TestTokenTransferProcessor_Process(t *testing.T) { assertFeeEvent(t, changes[0], "100") assertStateChangeBase(t, changes[1], types.StateChangeCategoryAccount, accountB.ToAccountId().Address(), "", "") require.Equal(t, types.StateChangeReasonCreate, *changes[1].StateChangeReason) - require.Equal(t, accountA.ToAccountId().Address(), changes[1].FunderAccountID.String) + require.Equal(t, accountA.ToAccountId().Address(), changes[1].FunderAccountID.String()) assertDebitEvent(t, changes[2], accountA.ToAccountId().Address(), "1000000000", nativeContractAddress) assertCreditEvent(t, changes[3], accountB.ToAccountId().Address(), "1000000000", nativeContractAddress) }) diff --git a/internal/indexer/types/types.go b/internal/indexer/types/types.go index d1458c486..06c024a4a 100644 --- a/internal/indexer/types/types.go +++ b/internal/indexer/types/types.go @@ -38,9 +38,91 @@ import ( "fmt" "time" + "github.com/stellar/go-stellar-sdk/strkey" "github.com/stellar/go-stellar-sdk/xdr" ) +// AddressBytea represents a Stellar address stored as BYTEA in the database. +// Storage format: 33 bytes (1 version byte + 32 raw key bytes) +// Go representation: StrKey string (G.../C...) +type AddressBytea string + +// Scan implements sql.Scanner - converts BYTEA (33 bytes) to StrKey string +func (a *AddressBytea) Scan(value any) error { + if value == nil { + *a = "" + return nil + } + bytes, ok := value.([]byte) + if !ok { + return fmt.Errorf("expected []byte, got %T", value) + } + if len(bytes) != 33 { + return fmt.Errorf("expected 33 bytes, got %d", len(bytes)) + } + versionByte := strkey.VersionByte(bytes[0]) + rawKey := bytes[1:33] + encoded, err := strkey.Encode(versionByte, rawKey) + if err != nil { + return fmt.Errorf("encoding stellar address: %w", err) + } + *a = AddressBytea(encoded) + return nil +} + +// Value implements driver.Valuer - converts StrKey string to 33-byte []byte +func (a AddressBytea) Value() (driver.Value, error) { + if a == "" { + return nil, nil + } + versionByte, rawBytes, err := strkey.DecodeAny(string(a)) + if err != nil { + return nil, fmt.Errorf("decoding stellar address %s: %w", a, err) + } + result := make([]byte, 33) + result[0] = byte(versionByte) + copy(result[1:], rawBytes) + return result, nil +} + +// String returns the Stellar address as a string. +func (a AddressBytea) String() string { + return string(a) +} + +// NullAddressBytea represents a nullable Stellar address stored as BYTEA in the database. +// Similar to sql.NullString but handles BYTEA encoding/decoding for Stellar addresses. +type NullAddressBytea struct { + AddressBytea AddressBytea // The Stellar address (G.../C...) + Valid bool // Valid is true if AddressBytea is not NULL +} + +// Scan implements sql.Scanner - converts nullable BYTEA (33 bytes) to StrKey string +func (n *NullAddressBytea) Scan(value any) error { + if value == nil { + n.AddressBytea, n.Valid = "", false + return nil + } + if err := n.AddressBytea.Scan(value); err != nil { + return err + } + n.Valid = true + return nil +} + +// Value implements driver.Valuer - converts StrKey string to 33-byte []byte or nil +func (n NullAddressBytea) Value() (driver.Value, error) { + if !n.Valid { + return nil, nil + } + return n.AddressBytea.Value() +} + +// String returns the Stellar address as a string (convenience accessor). +func (n NullAddressBytea) String() string { + return string(n.AddressBytea) +} + type ContractType string const ( @@ -118,8 +200,8 @@ const ( ) type Account struct { - StellarAddress string `json:"address,omitempty" db:"stellar_address"` - CreatedAt time.Time `json:"createdAt,omitempty" db:"created_at"` + StellarAddress AddressBytea `json:"address,omitempty" db:"stellar_address"` + CreatedAt time.Time `json:"createdAt,omitempty" db:"created_at"` } type AccountWithToID struct { @@ -405,14 +487,16 @@ type StateChange struct { LedgerNumber uint32 `json:"ledgerNumber,omitempty" db:"ledger_number"` // Nullable string fields: - TokenID sql.NullString `json:"tokenId,omitempty" db:"token_id"` - Amount sql.NullString `json:"amount,omitempty" db:"amount"` - SignerAccountID sql.NullString `json:"signerAccountId,omitempty" db:"signer_account_id"` - SpenderAccountID sql.NullString `json:"spenderAccountId,omitempty" db:"spender_account_id"` - SponsoredAccountID sql.NullString `json:"sponsoredAccountId,omitempty" db:"sponsored_account_id"` - SponsorAccountID sql.NullString `json:"sponsorAccountId,omitempty" db:"sponsor_account_id"` - DeployerAccountID sql.NullString `json:"deployerAccountId,omitempty" db:"deployer_account_id"` - FunderAccountID sql.NullString `json:"funderAccountId,omitempty" db:"funder_account_id"` + TokenID sql.NullString `json:"tokenId,omitempty" db:"token_id"` + Amount sql.NullString `json:"amount,omitempty" db:"amount"` + + // Nullable address fields (stored as BYTEA in database): + SignerAccountID NullAddressBytea `json:"signerAccountId,omitempty" db:"signer_account_id"` + SpenderAccountID NullAddressBytea `json:"spenderAccountId,omitempty" db:"spender_account_id"` + SponsoredAccountID NullAddressBytea `json:"sponsoredAccountId,omitempty" db:"sponsored_account_id"` + SponsorAccountID NullAddressBytea `json:"sponsorAccountId,omitempty" db:"sponsor_account_id"` + DeployerAccountID NullAddressBytea `json:"deployerAccountId,omitempty" db:"deployer_account_id"` + FunderAccountID NullAddressBytea `json:"funderAccountId,omitempty" db:"funder_account_id"` // Entity identifiers (moved from key_value JSONB): ClaimableBalanceID sql.NullString `json:"claimableBalanceId,omitempty" db:"claimable_balance_id"` @@ -438,7 +522,7 @@ type StateChange struct { KeyValue NullableJSONB `json:"keyValue,omitempty" db:"key_value"` // Relationships: - AccountID string `json:"accountId,omitempty" db:"account_id"` + AccountID AddressBytea `json:"accountId,omitempty" db:"account_id"` Account *Account `json:"account,omitempty"` OperationID int64 `json:"operationId,omitempty" db:"operation_id"` Operation *Operation `json:"operation,omitempty"` diff --git a/internal/indexer/types/types_test.go b/internal/indexer/types/types_test.go index 78a71a93c..3c5076560 100644 --- a/internal/indexer/types/types_test.go +++ b/internal/indexer/types/types_test.go @@ -4,7 +4,10 @@ import ( "database/sql/driver" "testing" + "github.com/stellar/go-stellar-sdk/keypair" + "github.com/stellar/go-stellar-sdk/strkey" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNullableJSONB_Scan(t *testing.T) { @@ -51,6 +54,156 @@ func TestNullableJSONB_Scan(t *testing.T) { } } +func TestAddressBytea_Scan(t *testing.T) { + // Generate a valid G... address for testing + kp := keypair.MustRandom() + validAddress := kp.Address() + + // Build expected 33-byte representation + _, rawBytes, err := strkey.DecodeAny(validAddress) + require.NoError(t, err) + validBytes := make([]byte, 33) + validBytes[0] = byte(strkey.VersionByteAccountID) + copy(validBytes[1:], rawBytes) + + testCases := []struct { + name string + input any + want AddressBytea + wantErrContains string + }{ + { + name: "🟢nil value", + input: nil, + want: "", + }, + { + name: "🟢valid 33-byte address", + input: validBytes, + want: AddressBytea(validAddress), + }, + { + name: "🔴wrong type", + input: 12345, + wantErrContains: "expected []byte", + }, + { + name: "🔴wrong length", + input: []byte{1, 2, 3}, + wantErrContains: "expected 33 bytes", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var s AddressBytea + err := s.Scan(tc.input) + if tc.wantErrContains != "" { + assert.ErrorContains(t, err, tc.wantErrContains) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.want, s) + } + }) + } +} + +func TestAddressBytea_Value(t *testing.T) { + // Generate a valid G... address for testing + kp := keypair.MustRandom() + validAddress := kp.Address() + + // Build expected 33-byte representation + _, rawBytes, err := strkey.DecodeAny(validAddress) + require.NoError(t, err) + expectedBytes := make([]byte, 33) + expectedBytes[0] = byte(strkey.VersionByteAccountID) + copy(expectedBytes[1:], rawBytes) + + testCases := []struct { + name string + input AddressBytea + want driver.Value + wantErrContains string + }{ + { + name: "🟢empty string", + input: "", + want: nil, + }, + { + name: "🟢valid address", + input: AddressBytea(validAddress), + want: expectedBytes, + }, + { + name: "🔴invalid address", + input: "not-a-valid-address", + wantErrContains: "decoding stellar address", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got, err := tc.input.Value() + if tc.wantErrContains != "" { + assert.ErrorContains(t, err, tc.wantErrContains) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.want, got) + } + }) + } +} + +func TestAddressBytea_Roundtrip(t *testing.T) { + // Test that Value -> Scan produces the original address + kp := keypair.MustRandom() + original := AddressBytea(kp.Address()) + + // Convert to bytes + bytes, err := original.Value() + require.NoError(t, err) + + // Convert back to address + var restored AddressBytea + err = restored.Scan(bytes) + require.NoError(t, err) + + assert.Equal(t, original, restored) +} + +func TestAddressBytea_String(t *testing.T) { + testCases := []struct { + name string + input AddressBytea + want string + }{ + { + name: "🟢empty string", + input: "", + want: "", + }, + { + name: "🟢valid G address", + input: AddressBytea("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"), + want: "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", + }, + { + name: "🟢valid C address", + input: AddressBytea("CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"), + want: "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := tc.input.String() + assert.Equal(t, tc.want, got) + }) + } +} + func TestNullableJSONB_Value(t *testing.T) { testCases := []struct { name string diff --git a/internal/integrationtests/infrastructure/backfill_helpers.go b/internal/integrationtests/infrastructure/backfill_helpers.go index 9448175ad..526e584f6 100644 --- a/internal/integrationtests/infrastructure/backfill_helpers.go +++ b/internal/integrationtests/infrastructure/backfill_helpers.go @@ -10,6 +10,8 @@ import ( "time" "github.com/stellar/go-stellar-sdk/support/log" + + "github.com/stellar/wallet-backend/internal/indexer/types" ) // GetIngestCursor retrieves a cursor value from the ingest_store table. @@ -59,7 +61,7 @@ func (s *SharedContainers) GetTransactionCountForAccount(ctx context.Context, ac WHERE ta.account_id = $1 AND t.ledger_number BETWEEN $2 AND $3 ` - err = db.QueryRowContext(ctx, query, accountAddr, startLedger, endLedger).Scan(&count) + err = db.QueryRowContext(ctx, query, types.AddressBytea(accountAddr), startLedger, endLedger).Scan(&count) if err != nil { return 0, fmt.Errorf("counting transactions for account %s: %w", accountAddr, err) } @@ -89,7 +91,7 @@ func (s *SharedContainers) HasOperationForAccount(ctx context.Context, accountAd AND o.ledger_number BETWEEN $3 AND $4 ) ` - err = db.QueryRowContext(ctx, query, accountAddr, opType, startLedger, endLedger).Scan(&exists) + err = db.QueryRowContext(ctx, query, types.AddressBytea(accountAddr), opType, startLedger, endLedger).Scan(&exists) if err != nil { return false, fmt.Errorf("checking operation for account %s: %w", accountAddr, err) } @@ -117,7 +119,7 @@ func (s *SharedContainers) GetTransactionAccountLinkCount(ctx context.Context, a WHERE ta.account_id = $1 AND t.ledger_number BETWEEN $2 AND $3 ` - err = db.QueryRowContext(ctx, query, accountAddr, startLedger, endLedger).Scan(&count) + err = db.QueryRowContext(ctx, query, types.AddressBytea(accountAddr), startLedger, endLedger).Scan(&count) if err != nil { return 0, fmt.Errorf("counting transaction-account links for %s: %w", accountAddr, err) } diff --git a/internal/serve/graphql/resolvers/account.resolvers.go b/internal/serve/graphql/resolvers/account.resolvers.go index 85e78a2c7..42cce7eb3 100644 --- a/internal/serve/graphql/resolvers/account.resolvers.go +++ b/internal/serve/graphql/resolvers/account.resolvers.go @@ -15,7 +15,7 @@ import ( // Address is the resolver for the address field. func (r *accountResolver) Address(ctx context.Context, obj *types.Account) (string, error) { - return obj.StellarAddress, nil + return string(obj.StellarAddress), nil } // Transactions is the resolver for the transactions field. @@ -30,7 +30,7 @@ func (r *accountResolver) Transactions(ctx context.Context, obj *types.Account, queryLimit := *params.Limit + 1 // +1 to check if there is a next page dbColumns := GetDBColumnsForFields(ctx, types.Transaction{}) - transactions, err := r.models.Transactions.BatchGetByAccountAddress(ctx, obj.StellarAddress, strings.Join(dbColumns, ", "), &queryLimit, params.Cursor, params.SortOrder) + transactions, err := r.models.Transactions.BatchGetByAccountAddress(ctx, string(obj.StellarAddress), strings.Join(dbColumns, ", "), &queryLimit, params.Cursor, params.SortOrder) if err != nil { return nil, fmt.Errorf("getting transactions from db for account %s: %w", obj.StellarAddress, err) } @@ -63,7 +63,7 @@ func (r *accountResolver) Operations(ctx context.Context, obj *types.Account, fi queryLimit := *params.Limit + 1 // +1 to check if there is a next page dbColumns := GetDBColumnsForFields(ctx, types.Operation{}) - operations, err := r.models.Operations.BatchGetByAccountAddress(ctx, obj.StellarAddress, strings.Join(dbColumns, ", "), &queryLimit, params.Cursor, params.SortOrder) + operations, err := r.models.Operations.BatchGetByAccountAddress(ctx, string(obj.StellarAddress), strings.Join(dbColumns, ", "), &queryLimit, params.Cursor, params.SortOrder) if err != nil { return nil, fmt.Errorf("getting operations from db for account %s: %w", obj.StellarAddress, err) } @@ -115,7 +115,7 @@ func (r *accountResolver) StateChanges(ctx context.Context, obj *types.Account, } dbColumns := GetDBColumnsForFields(ctx, types.StateChange{}) - stateChanges, err := r.models.StateChanges.BatchGetByAccountAddress(ctx, obj.StellarAddress, txHash, operationID, category, reason, strings.Join(dbColumns, ", "), &queryLimit, params.StateChangeCursor, params.SortOrder) + stateChanges, err := r.models.StateChanges.BatchGetByAccountAddress(ctx, string(obj.StellarAddress), txHash, operationID, category, reason, strings.Join(dbColumns, ", "), &queryLimit, params.StateChangeCursor, params.SortOrder) if err != nil { return nil, fmt.Errorf("getting state changes from db for account %s: %w", obj.StellarAddress, err) } diff --git a/internal/serve/graphql/resolvers/account_resolvers_test.go b/internal/serve/graphql/resolvers/account_resolvers_test.go index a70e67977..9107b53ab 100644 --- a/internal/serve/graphql/resolvers/account_resolvers_test.go +++ b/internal/serve/graphql/resolvers/account_resolvers_test.go @@ -16,7 +16,7 @@ import ( ) func TestAccountResolver_Transactions(t *testing.T) { - parentAccount := &types.Account{StellarAddress: "test-account"} + parentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedTestAccountAddress)} mockMetricsService := &metrics.MockMetricsService{} mockMetricsService.On("IncDBQuery", "BatchGetByAccountAddress", "transactions").Return() @@ -105,7 +105,7 @@ func TestAccountResolver_Transactions(t *testing.T) { }) t.Run("account with no transactions", func(t *testing.T) { - nonExistentAccount := &types.Account{StellarAddress: "non-existent-account"} + nonExistentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedNonExistentAccountAddress)} ctx := getTestCtx("transactions", []string{"hash"}) transactions, err := resolver.Transactions(ctx, nonExistentAccount, nil, nil, nil, nil) @@ -144,7 +144,7 @@ func TestAccountResolver_Transactions(t *testing.T) { } func TestAccountResolver_Operations(t *testing.T) { - parentAccount := &types.Account{StellarAddress: "test-account"} + parentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedTestAccountAddress)} mockMetricsService := &metrics.MockMetricsService{} mockMetricsService.On("IncDBQuery", "BatchGetByAccountAddress", "operations").Return() @@ -245,7 +245,7 @@ func TestAccountResolver_Operations(t *testing.T) { }) t.Run("account with no operations", func(t *testing.T) { - nonExistentAccount := &types.Account{StellarAddress: "non-existent-account"} + nonExistentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedNonExistentAccountAddress)} ctx := getTestCtx("operations", []string{"id"}) operations, err := resolver.Operations(ctx, nonExistentAccount, nil, nil, nil, nil) @@ -255,7 +255,7 @@ func TestAccountResolver_Operations(t *testing.T) { } func TestAccountResolver_StateChanges(t *testing.T) { - parentAccount := &types.Account{StellarAddress: "test-account"} + parentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedTestAccountAddress)} mockMetricsService := &metrics.MockMetricsService{} mockMetricsService.On("IncDBQuery", "BatchGetByAccountAddress", "state_changes").Return() @@ -490,7 +490,7 @@ func TestAccountResolver_StateChanges(t *testing.T) { }) t.Run("account with no state changes", func(t *testing.T) { - nonExistentAccount := &types.Account{StellarAddress: "non-existent-account"} + nonExistentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedNonExistentAccountAddress)} ctx := getTestCtx("state_changes", []string{"to_id", "state_change_order"}) stateChanges, err := resolver.StateChanges(ctx, nonExistentAccount, nil, nil, nil, nil, nil) @@ -500,7 +500,7 @@ func TestAccountResolver_StateChanges(t *testing.T) { } func TestAccountResolver_StateChanges_WithFilters(t *testing.T) { - parentAccount := &types.Account{StellarAddress: "test-account"} + parentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedTestAccountAddress)} mockMetricsService := &metrics.MockMetricsService{} mockMetricsService.On("IncDBQuery", "BatchGetByAccountAddress", "state_changes").Return() @@ -701,7 +701,7 @@ func TestAccountResolver_StateChanges_WithFilters(t *testing.T) { } func TestAccountResolver_StateChanges_WithCategoryReasonFilters(t *testing.T) { - parentAccount := &types.Account{StellarAddress: "test-account"} + parentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedTestAccountAddress)} mockMetricsService := &metrics.MockMetricsService{} mockMetricsService.On("IncDBQuery", "BatchGetByAccountAddress", "state_changes").Return() diff --git a/internal/serve/graphql/resolvers/mutations.resolvers.go b/internal/serve/graphql/resolvers/mutations.resolvers.go index c2e8a2851..64396a7e7 100644 --- a/internal/serve/graphql/resolvers/mutations.resolvers.go +++ b/internal/serve/graphql/resolvers/mutations.resolvers.go @@ -53,7 +53,7 @@ func (r *mutationResolver) RegisterAccount(ctx context.Context, input graphql1.R // Return the account data directly since we know the address account := &types.Account{ - StellarAddress: input.Address, + StellarAddress: types.AddressBytea(input.Address), CreatedAt: time.Now(), } diff --git a/internal/serve/graphql/resolvers/mutations_resolvers_test.go b/internal/serve/graphql/resolvers/mutations_resolvers_test.go index 924aa3afe..48c76bcef 100644 --- a/internal/serve/graphql/resolvers/mutations_resolvers_test.go +++ b/internal/serve/graphql/resolvers/mutations_resolvers_test.go @@ -86,7 +86,7 @@ func TestMutationResolver_RegisterAccount(t *testing.T) { assert.NotNil(t, result) assert.True(t, result.Success) assert.NotNil(t, result.Account) - assert.Equal(t, input.Address, result.Account.StellarAddress) + assert.Equal(t, input.Address, string(result.Account.StellarAddress)) mockService.AssertExpectations(t) }) diff --git a/internal/serve/graphql/resolvers/operation_resolvers_test.go b/internal/serve/graphql/resolvers/operation_resolvers_test.go index 0c1ebc4e1..241f02b7e 100644 --- a/internal/serve/graphql/resolvers/operation_resolvers_test.go +++ b/internal/serve/graphql/resolvers/operation_resolvers_test.go @@ -92,7 +92,7 @@ func TestOperationResolver_Accounts(t *testing.T) { require.NoError(t, err) require.Len(t, accounts, 1) - assert.Equal(t, "test-account", accounts[0].StellarAddress) + assert.Equal(t, sharedTestAccountAddress, string(accounts[0].StellarAddress)) }) t.Run("nil operation panics", func(t *testing.T) { diff --git a/internal/serve/graphql/resolvers/queries.resolvers.go b/internal/serve/graphql/resolvers/queries.resolvers.go index 1fdb97d3c..3677303ab 100644 --- a/internal/serve/graphql/resolvers/queries.resolvers.go +++ b/internal/serve/graphql/resolvers/queries.resolvers.go @@ -78,7 +78,7 @@ func (r *queryResolver) AccountByAddress(ctx context.Context, address string) (* } // When participant filtering is disabled, we return the account object so that the resolver can return a valid object. - return &types.Account{StellarAddress: address}, nil + return &types.Account{StellarAddress: types.AddressBytea(address)}, nil } return nil, err } diff --git a/internal/serve/graphql/resolvers/queries_resolvers_test.go b/internal/serve/graphql/resolvers/queries_resolvers_test.go index 7a3b7b104..2696b88b6 100644 --- a/internal/serve/graphql/resolvers/queries_resolvers_test.go +++ b/internal/serve/graphql/resolvers/queries_resolvers_test.go @@ -235,16 +235,16 @@ func TestQueryResolver_Account(t *testing.T) { } t.Run("success", func(t *testing.T) { - acc, err := resolver.AccountByAddress(testCtx, "test-account") + acc, err := resolver.AccountByAddress(testCtx, sharedTestAccountAddress) require.NoError(t, err) - assert.Equal(t, "test-account", acc.StellarAddress) + assert.Equal(t, sharedTestAccountAddress, string(acc.StellarAddress)) }) t.Run("non-existent account", func(t *testing.T) { - acc, err := resolver.AccountByAddress(testCtx, "non-existent-account") + acc, err := resolver.AccountByAddress(testCtx, sharedNonExistentAccountAddress) require.NoError(t, err) assert.NotNil(t, acc) - assert.Equal(t, "non-existent-account", acc.StellarAddress) + assert.Equal(t, sharedNonExistentAccountAddress, string(acc.StellarAddress)) }) t.Run("empty address", func(t *testing.T) { diff --git a/internal/serve/graphql/resolvers/resolver.go b/internal/serve/graphql/resolvers/resolver.go index 15a4d4beb..b853bc102 100644 --- a/internal/serve/graphql/resolvers/resolver.go +++ b/internal/serve/graphql/resolvers/resolver.go @@ -109,6 +109,16 @@ func (r *Resolver) resolveNullableString(field sql.NullString) *string { return nil } +// resolveNullableAddress resolves nullable address fields from the database +// Returns pointer to string if valid, nil if null +func (r *Resolver) resolveNullableAddress(field types.NullAddressBytea) *string { + if field.Valid { + s := field.String() + return &s + } + return nil +} + // resolveRequiredString resolves required string fields from the database // Returns empty string if null to satisfy non-nullable GraphQL fields func (r *Resolver) resolveRequiredString(field sql.NullString) string { diff --git a/internal/serve/graphql/resolvers/statechange.resolvers.go b/internal/serve/graphql/resolvers/statechange.resolvers.go index f70a83050..9583c2b93 100644 --- a/internal/serve/graphql/resolvers/statechange.resolvers.go +++ b/internal/serve/graphql/resolvers/statechange.resolvers.go @@ -39,7 +39,7 @@ func (r *accountChangeResolver) Transaction(ctx context.Context, obj *types.Acco // FunderAddress is the resolver for the funderAddress field. func (r *accountChangeResolver) FunderAddress(ctx context.Context, obj *types.AccountStateChangeModel) (*string, error) { - return r.resolveNullableString(obj.FunderAccountID), nil + return r.resolveNullableAddress(obj.FunderAccountID), nil } // Type is the resolver for the type field. @@ -177,12 +177,12 @@ func (r *reservesChangeResolver) Transaction(ctx context.Context, obj *types.Res // SponsoredAddress is the resolver for the sponsoredAddress field. func (r *reservesChangeResolver) SponsoredAddress(ctx context.Context, obj *types.ReservesStateChangeModel) (*string, error) { - return r.resolveNullableString(obj.SponsoredAccountID), nil + return r.resolveNullableAddress(obj.SponsoredAccountID), nil } // SponsorAddress is the resolver for the sponsorAddress field. func (r *reservesChangeResolver) SponsorAddress(ctx context.Context, obj *types.ReservesStateChangeModel) (*string, error) { - return r.resolveNullableString(obj.SponsorAccountID), nil + return r.resolveNullableAddress(obj.SponsorAccountID), nil } // LiquidityPoolID is the resolver for the liquidityPoolID field. @@ -232,7 +232,7 @@ func (r *signerChangeResolver) Transaction(ctx context.Context, obj *types.Signe // SignerAddress is the resolver for the signerAddress field. func (r *signerChangeResolver) SignerAddress(ctx context.Context, obj *types.SignerStateChangeModel) (*string, error) { - return r.resolveNullableString(obj.SignerAccountID), nil + return r.resolveNullableAddress(obj.SignerAccountID), nil } // SignerWeights is the resolver for the signerWeights field. diff --git a/internal/serve/graphql/resolvers/statechange_resolvers_test.go b/internal/serve/graphql/resolvers/statechange_resolvers_test.go index 8330b767c..39a04468a 100644 --- a/internal/serve/graphql/resolvers/statechange_resolvers_test.go +++ b/internal/serve/graphql/resolvers/statechange_resolvers_test.go @@ -255,7 +255,7 @@ func TestStateChangeResolver_Account(t *testing.T) { account, err := resolver.Account(ctx, &parentSC) require.NoError(t, err) - assert.Equal(t, "test-account", account.StellarAddress) + assert.Equal(t, sharedTestAccountAddress, string(account.StellarAddress)) }) t.Run("nil state change panics", func(t *testing.T) { diff --git a/internal/serve/graphql/resolvers/test_utils.go b/internal/serve/graphql/resolvers/test_utils.go index 0da392cec..e18cedafa 100644 --- a/internal/serve/graphql/resolvers/test_utils.go +++ b/internal/serve/graphql/resolvers/test_utils.go @@ -7,6 +7,7 @@ import ( "time" "github.com/99designs/gqlgen/graphql" + "github.com/stellar/go-stellar-sdk/keypair" "github.com/stellar/go-stellar-sdk/toid" "github.com/stretchr/testify/require" @@ -46,9 +47,16 @@ func ptr[T any](v T) *T { return &v } +// sharedTestAccountAddress is a fixed test address used by tests that rely on setupDB. +// It's generated once and reused to ensure test data consistency. +var sharedTestAccountAddress = keypair.MustRandom().Address() + +// sharedNonExistentAccountAddress is a valid Stellar address that doesn't exist in the test DB. +var sharedNonExistentAccountAddress = keypair.MustRandom().Address() + func setupDB(ctx context.Context, t *testing.T, dbConnectionPool db.ConnectionPool) { testLedger := int32(1000) - parentAccount := &types.Account{StellarAddress: "test-account"} + parentAccount := &types.Account{StellarAddress: types.AddressBytea(sharedTestAccountAddress)} txns := make([]*types.Transaction, 0, 4) ops := make([]*types.Operation, 0, 8) opIdx := 1 diff --git a/internal/serve/graphql/resolvers/transaction_resolvers_test.go b/internal/serve/graphql/resolvers/transaction_resolvers_test.go index cac40c729..17df19e4c 100644 --- a/internal/serve/graphql/resolvers/transaction_resolvers_test.go +++ b/internal/serve/graphql/resolvers/transaction_resolvers_test.go @@ -177,7 +177,7 @@ func TestTransactionResolver_Accounts(t *testing.T) { require.NoError(t, err) require.Len(t, accounts, 1) - assert.Equal(t, "test-account", accounts[0].StellarAddress) + assert.Equal(t, sharedTestAccountAddress, string(accounts[0].StellarAddress)) }) t.Run("nil transaction panics", func(t *testing.T) { diff --git a/internal/services/account_service_test.go b/internal/services/account_service_test.go index 5bbebc576..252ac6209 100644 --- a/internal/services/account_service_test.go +++ b/internal/services/account_service_test.go @@ -14,6 +14,7 @@ import ( "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" "github.com/stellar/wallet-backend/internal/db/dbtest" + "github.com/stellar/wallet-backend/internal/indexer/types" "github.com/stellar/wallet-backend/internal/metrics" ) @@ -42,12 +43,12 @@ func TestAccountRegister(t *testing.T) { err = accountService.RegisterAccount(ctx, address) require.NoError(t, err) - var dbAddress sql.NullString - err = dbConnectionPool.GetContext(ctx, &dbAddress, "SELECT stellar_address FROM accounts WHERE stellar_address = $1", address) + var dbAddress types.NullAddressBytea + err = dbConnectionPool.GetContext(ctx, &dbAddress, "SELECT stellar_address FROM accounts WHERE stellar_address = $1", types.AddressBytea(address)) require.NoError(t, err) assert.True(t, dbAddress.Valid) - assert.Equal(t, address, dbAddress.String) + assert.Equal(t, address, dbAddress.String()) }) t.Run("duplicate registration fails", func(t *testing.T) { @@ -122,7 +123,7 @@ func TestAccountDeregister(t *testing.T) { ctx := context.Background() address := keypair.MustRandom().Address() - result, err := dbConnectionPool.ExecContext(ctx, "Insert INTO accounts (stellar_address) VALUES ($1)", address) + result, err := dbConnectionPool.ExecContext(ctx, "Insert INTO accounts (stellar_address) VALUES ($1)", types.AddressBytea(address)) require.NoError(t, err) rowAffected, err := result.RowsAffected() require.NoError(t, err) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 1dcf4c412..1b6bb4f83 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -300,7 +300,7 @@ func (m *ingestService) filterByRegisteredAccounts( // Filter state changes: include if account is registered filteredSC := make([]types.StateChange, 0) for _, sc := range stateChanges { - if registeredAccounts.Contains(sc.AccountID) { + if registeredAccounts.Contains(string(sc.AccountID)) { filteredSC = append(filteredSC, sc) } } diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 2d5b12c32..65fc4ebdd 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/lib/pq" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/keypair" "github.com/stellar/go-stellar-sdk/network" "github.com/stellar/go-stellar-sdk/toid" "github.com/stellar/go-stellar-sdk/xdr" @@ -28,6 +29,17 @@ import ( "github.com/stellar/wallet-backend/internal/signing/store" ) +// Test addresses generated from valid keypairs for use in tests. +// These are deterministic seeds to ensure consistent test addresses. +var ( + testKP1 = keypair.MustRandom() + testKP2 = keypair.MustRandom() + testKP3 = keypair.MustRandom() + testAddr1 = testKP1.Address() + testAddr2 = testKP2.Address() + testAddrUnreg = testKP3.Address() +) + const ( defaultGetLedgersLimit = 50 @@ -530,7 +542,7 @@ func createTestStateChange(toID int64, accountID string, opID int64) types.State StateChangeOrder: 1, StateChangeCategory: types.StateChangeCategoryBalance, StateChangeReason: &reason, - AccountID: accountID, + AccountID: types.AddressBytea(accountID), OperationID: opID, LedgerNumber: 1000, LedgerCreatedAt: now, @@ -1144,13 +1156,13 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { tx2 := createTestTransaction("flush_tx_2", 2) op1 := createTestOperation(200) op2 := createTestOperation(201) - sc1 := createTestStateChange(1, "GABC1111111111111111111111111111111111111111111111111", 200) - sc2 := createTestStateChange(2, "GDEF2222222222222222222222222222222222222222222222222", 201) + sc1 := createTestStateChange(1, testAddr1, 200) + sc2 := createTestStateChange(2, testAddr2, 201) - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) - buf.PushTransaction("GDEF2222222222222222222222222222222222222222222222222", tx2) - buf.PushOperation("GABC1111111111111111111111111111111111111111111111111", op1, tx1) - buf.PushOperation("GDEF2222222222222222222222222222222222222222222222222", op2, tx2) + buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddr2, tx2) + buf.PushOperation(testAddr1, op1, tx1) + buf.PushOperation(testAddr2, op2, tx2) buf.PushStateChange(tx1, op1, sc1) buf.PushStateChange(tx2, op2, sc2) return buf @@ -1168,7 +1180,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("flush_tx_3", 3) - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) + buf.PushTransaction(testAddr1, tx1) return buf }, updateCursorTo: ptrUint32(50), @@ -1184,7 +1196,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("flush_tx_4", 4) - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) + buf.PushTransaction(testAddr1, tx1) return buf }, updateCursorTo: ptrUint32(150), @@ -1202,12 +1214,12 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { tx1 := createTestTransaction("flush_tx_5", 5) // Registered participant tx2 := createTestTransaction("flush_tx_6", 6) // No registered participant - buf.PushTransaction("GREGISTERED111111111111111111111111111111111111111", tx1) - buf.PushTransaction("GUNREGISTERED11111111111111111111111111111111111111", tx2) + buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddrUnreg, tx2) return buf }, enableParticipantFiltering: true, - registeredAccounts: []string{"GREGISTERED111111111111111111111111111111111111111"}, + registeredAccounts: []string{testAddr1}, updateCursorTo: nil, initialCursor: 100, wantCursor: 100, @@ -1239,7 +1251,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { // Add registered accounts if any for _, acc := range tc.registeredAccounts { _, insertErr := dbConnectionPool.ExecContext(ctx, - `INSERT INTO accounts (stellar_address) VALUES ($1) ON CONFLICT DO NOTHING`, acc) + `INSERT INTO accounts (stellar_address) VALUES ($1) ON CONFLICT DO NOTHING`, types.AddressBytea(acc)) require.NoError(t, insertErr) } @@ -1356,13 +1368,13 @@ func Test_ingestService_filterParticipantData(t *testing.T) { tx2 := createTestTransaction("tx_hash_2", 2) op1 := createTestOperation(100) op2 := createTestOperation(101) - sc1 := createTestStateChange(1, "GABC1111111111111111111111111111111111111111111111111", 100) - sc2 := createTestStateChange(2, "GDEF2222222222222222222222222222222222222222222222222", 101) + sc1 := createTestStateChange(1, testAddr1, 100) + sc2 := createTestStateChange(2, testAddr2, 101) - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) - buf.PushTransaction("GDEF2222222222222222222222222222222222222222222222222", tx2) - buf.PushOperation("GABC1111111111111111111111111111111111111111111111111", op1, tx1) - buf.PushOperation("GDEF2222222222222222222222222222222222222222222222222", op2, tx2) + buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddr2, tx2) + buf.PushOperation(testAddr1, op1, tx1) + buf.PushOperation(testAddr2, op2, tx2) buf.PushStateChange(tx1, op1, sc1) buf.PushStateChange(tx2, op2, sc2) return buf @@ -1374,17 +1386,17 @@ func Test_ingestService_filterParticipantData(t *testing.T) { { name: "filtering_enabled_includes_tx_with_registered_participant", enableParticipantFiltering: true, - registeredAccounts: []string{"GABC1111111111111111111111111111111111111111111111111"}, + registeredAccounts: []string{testAddr1}, setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("tx_hash_1", 1) op1 := createTestOperation(100) - sc1 := createTestStateChange(1, "GABC1111111111111111111111111111111111111111111111111", 100) + sc1 := createTestStateChange(1, testAddr1, 100) // Tx has 2 participants but only 1 is registered - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) - buf.PushTransaction("GXYZ9999999999999999999999999999999999999999999999999", tx1) // Unregistered participant on same tx - buf.PushOperation("GABC1111111111111111111111111111111111111111111111111", op1, tx1) + buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddrUnreg, tx1) // Unregistered participant on same tx + buf.PushOperation(testAddr1, op1, tx1) buf.PushStateChange(tx1, op1, sc1) return buf }, @@ -1395,14 +1407,14 @@ func Test_ingestService_filterParticipantData(t *testing.T) { // Verify ALL participants are preserved (not just registered ones) participants := filtered.txParticipants[int64(1)] assert.Equal(t, 2, participants.Cardinality()) - assert.True(t, participants.Contains("GABC1111111111111111111111111111111111111111111111111")) - assert.True(t, participants.Contains("GXYZ9999999999999999999999999999999999999999999999999")) + assert.True(t, participants.Contains(testAddr1)) + assert.True(t, participants.Contains(testAddrUnreg)) }, }, { name: "filtering_enabled_excludes_tx_without_registered", enableParticipantFiltering: true, - registeredAccounts: []string{"GABC1111111111111111111111111111111111111111111111111"}, + registeredAccounts: []string{testAddr1}, setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("tx_hash_1", 1) // Has registered @@ -1410,10 +1422,10 @@ func Test_ingestService_filterParticipantData(t *testing.T) { op1 := createTestOperation(100) op2 := createTestOperation(101) - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) - buf.PushTransaction("GUNREGISTERED11111111111111111111111111111111111111", tx2) - buf.PushOperation("GABC1111111111111111111111111111111111111111111111111", op1, tx1) - buf.PushOperation("GUNREGISTERED11111111111111111111111111111111111111", op2, tx2) + buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddrUnreg, tx2) + buf.PushOperation(testAddr1, op1, tx1) + buf.PushOperation(testAddrUnreg, op2, tx2) return buf }, wantTxCount: 1, // Only tx1 @@ -1427,7 +1439,7 @@ func Test_ingestService_filterParticipantData(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("tx_hash_1", 1) - buf.PushTransaction("GUNREGISTERED11111111111111111111111111111111111111", tx1) + buf.PushTransaction(testAddrUnreg, tx1) return buf }, wantTxCount: 0, @@ -1437,19 +1449,19 @@ func Test_ingestService_filterParticipantData(t *testing.T) { { name: "filtering_state_changes_only_for_registered_accounts", enableParticipantFiltering: true, - registeredAccounts: []string{"GABC1111111111111111111111111111111111111111111111111", "GDEF2222222222222222222222222222222222222222222222222"}, + registeredAccounts: []string{testAddr1, testAddr2}, setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("tx_hash_1", 1) op1 := createTestOperation(100) // 3 state changes: 2 for registered accounts, 1 for unregistered - sc1 := createTestStateChange(1, "GABC1111111111111111111111111111111111111111111111111", 100) - sc2 := createTestStateChange(2, "GDEF2222222222222222222222222222222222222222222222222", 100) - sc3 := createTestStateChange(3, "GUNREGISTERED11111111111111111111111111111111111111", 100) + sc1 := createTestStateChange(1, testAddr1, 100) + sc2 := createTestStateChange(2, testAddr2, 100) + sc3 := createTestStateChange(3, testAddrUnreg, 100) - buf.PushTransaction("GABC1111111111111111111111111111111111111111111111111", tx1) - buf.PushOperation("GABC1111111111111111111111111111111111111111111111111", op1, tx1) + buf.PushTransaction(testAddr1, tx1) + buf.PushOperation(testAddr1, op1, tx1) buf.PushStateChange(tx1, op1, sc1) buf.PushStateChange(tx1, op1, sc2) buf.PushStateChange(tx1, op1, sc3) @@ -1470,7 +1482,7 @@ func Test_ingestService_filterParticipantData(t *testing.T) { // Add registered accounts if any for _, acc := range tc.registeredAccounts { _, insertErr := dbConnectionPool.ExecContext(ctx, - `INSERT INTO accounts (stellar_address) VALUES ($1) ON CONFLICT DO NOTHING`, acc) + `INSERT INTO accounts (stellar_address) VALUES ($1) ON CONFLICT DO NOTHING`, types.AddressBytea(acc)) require.NoError(t, insertErr) } @@ -2639,9 +2651,9 @@ func Test_ingestService_flushBatchBuffer_batchChanges(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("catchup_tx_1", 1) - buf.PushTransaction("GTEST111111111111111111111111111111111111111111111111", tx1) + buf.PushTransaction(testAddr1, tx1) buf.PushTrustlineChange(types.TrustlineChange{ - AccountID: "GTEST111111111111111111111111111111111111111111111111", + AccountID: testAddr1, Asset: "USDC:GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVN", OperationID: 100, LedgerNumber: 1000, @@ -2658,9 +2670,9 @@ func Test_ingestService_flushBatchBuffer_batchChanges(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("catchup_tx_2", 2) - buf.PushTransaction("GTEST222222222222222222222222222222222222222222222222", tx1) + buf.PushTransaction(testAddr2, tx1) buf.PushContractChange(types.ContractChange{ - AccountID: "GTEST222222222222222222222222222222222222222222222222", + AccountID: testAddr2, ContractID: "CCONTRACTID", OperationID: 101, LedgerNumber: 1001, @@ -2672,7 +2684,7 @@ func Test_ingestService_flushBatchBuffer_batchChanges(t *testing.T) { wantTrustlineChangesCount: 0, wantContractChanges: []types.ContractChange{ { - AccountID: "GTEST222222222222222222222222222222222222222222222222", + AccountID: testAddr2, ContractID: "CCONTRACTID", OperationID: 101, LedgerNumber: 1001, @@ -2685,9 +2697,9 @@ func Test_ingestService_flushBatchBuffer_batchChanges(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("catchup_tx_5", 5) - buf.PushTransaction("GTEST555555555555555555555555555555555555555555555555", tx1) + buf.PushTransaction(testAddr1, tx1) buf.PushTrustlineChange(types.TrustlineChange{ - AccountID: "GTEST555555555555555555555555555555555555555555555555", + AccountID: testAddr1, Asset: "EUR:GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVN", OperationID: 102, LedgerNumber: 1002, @@ -2704,9 +2716,9 @@ func Test_ingestService_flushBatchBuffer_batchChanges(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction("catchup_tx_6", 6) - buf.PushTransaction("GTEST666666666666666666666666666666666666666666666666", tx1) + buf.PushTransaction(testAddr1, tx1) buf.PushTrustlineChange(types.TrustlineChange{ - AccountID: "GTEST666666666666666666666666666666666666666666666666", + AccountID: testAddr1, Asset: "GBP:GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVN", OperationID: 103, LedgerNumber: 1003, diff --git a/internal/utils/sql.go b/internal/utils/sql.go index a965b2e03..824dc7d61 100644 --- a/internal/utils/sql.go +++ b/internal/utils/sql.go @@ -3,6 +3,8 @@ package utils import ( "database/sql" "time" + + "github.com/stellar/wallet-backend/internal/indexer/types" ) func SQLNullString(s string) sql.NullString { @@ -18,3 +20,10 @@ func SQLNullTime(t time.Time) sql.NullTime { Valid: !t.IsZero(), } } + +func NullAddressBytea(s string) types.NullAddressBytea { + return types.NullAddressBytea{ + AddressBytea: types.AddressBytea(s), + Valid: s != "", + } +}