diff --git a/chains/ethereum/runner/persistent.go b/chains/ethereum/runner/persistent.go index 85b6259..b837ca7 100644 --- a/chains/ethereum/runner/persistent.go +++ b/chains/ethereum/runner/persistent.go @@ -5,12 +5,14 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" inttypes "github.com/skip-mev/catalyst/chains/ethereum/types" + "github.com/skip-mev/catalyst/chains/ethereum/wallet" loadtesttypes "github.com/skip-mev/catalyst/chains/types" orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" @@ -142,36 +144,45 @@ func (r *Runner) recordBlockTxs(blk *gethtypes.Header, tracker *orderedmap.Order func (r *Runner) submitLoadPersistent(ctx context.Context, maxLoadSize int, tracker *orderedmap.OrderedMap[common.Hash, time.Time]) int { // first we build the tx load. this constructs all the ethereum txs based in the spec. r.logger.Info("building loads", zap.Int("num_msg_specs", len(r.spec.Msgs))) - var txs []*gethtypes.Transaction + batchTxs := make(map[*wallet.InteractingWallet]gethtypes.Transactions) for _, msgSpec := range r.spec.Msgs { load := r.buildLoadPersistent(msgSpec, maxLoadSize, false) if len(load) == 0 { continue } - txs = append(txs, load...) + for k, v := range load { + r.logger.Info("built load of txs", zap.Int("load_size", len(v))) + batchTxs[k] = append(batchTxs[k], v...) + } } - // submit each tx in a go routine + // submit each tx batch in a go routine wg := sync.WaitGroup{} - sentTxs := make([]*inttypes.SentTx, len(txs)) - for i, tx := range txs { + sentTxs := make(map[*wallet.InteractingWallet][]*inttypes.SentTx, len(batchTxs)) + for fromWallet, txs := range batchTxs { + sentTxs[fromWallet] = make([]*inttypes.SentTx, len(txs)) wg.Go(func() { - // send the tx from the wallet assigned to this transaction's sender - fromWallet := r.getWalletForTx(tx) - err := fromWallet.SendTransaction(ctx, tx) + // send the txs from the wallet assigned to this transaction's sender + r.logger.Info("sending batch of txs", zap.Int("batch_size", len(txs))) + elems, err := fromWallet.BatchSendTransactions(ctx, txs) if err != nil { - r.logger.Info("failed to send transaction", zap.String("tx_hash", tx.Hash().String()), zap.Error(err)) - r.promMetrics.BroadcastFailure.Add(1) - } else { - r.promMetrics.BroadcastSuccess.Add(1) + r.logger.Info("failed to send batch transactions", zap.Int("len", len(txs)), zap.Error(err)) + return } + for i, elem := range elems { + if elem.Error != nil { + r.logger.Error("failed to send tx", zap.Error(elem.Error)) + r.promMetrics.BroadcastFailure.Add(1) - txType := inttypes.ContractCall - sentTxs[i] = &inttypes.SentTx{ - TxHash: tx.Hash(), - MsgType: txType, - Err: err, - Tx: tx, + } else { + r.promMetrics.BroadcastSuccess.Add(float64(len(txs))) + } + sentTxs[fromWallet][i] = &inttypes.SentTx{ + TxHash: txs[i].Hash(), + MsgType: inttypes.ContractCall, + Err: elem.Error, + Tx: txs[i], + } } }) } @@ -180,23 +191,31 @@ func (r *Runner) submitLoadPersistent(ctx context.Context, maxLoadSize int, trac // Record broadcast time for all as now to get a rough sense--they should be pretty close anyways. broadcastTime := time.Now() - for _, tx := range sentTxs { - if tx.Err != nil { - continue + var broadcastedTxs int + for _, txs := range sentTxs { + for _, tx := range txs { + if tx == nil { + continue + } + if tx.Err != nil { + continue + } + tracker.Set(tx.TxHash, broadcastTime) } - tracker.Set(tx.TxHash, broadcastTime) + r.sentTxs = append(r.sentTxs, txs...) + broadcastedTxs += len(txs) } - r.sentTxs = append(r.sentTxs, sentTxs...) r.txFactory.ResetWalletAllocation() - return len(sentTxs) + return broadcastedTxs } -func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadSize int, useBaseline bool) []*gethtypes.Transaction { +func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadSize int, useBaseline bool) map[*wallet.InteractingWallet]gethtypes.Transactions { r.logger.Info("building load", zap.Int("maxLoadSize", maxLoadSize)) - var txnLoad []*gethtypes.Transaction + txnLoad := make(map[*wallet.InteractingWallet]gethtypes.Transactions) var wg sync.WaitGroup - txChan := make(chan *gethtypes.Transaction, maxLoadSize) + var builtTxs atomic.Int64 + txChan := make(chan *gethtypes.Transaction) for range maxLoadSize { wg.Go(func() { sender := r.txFactory.GetNextSender() @@ -215,6 +234,9 @@ func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadS r.logger.Error("failed to build txs", zap.Error(err)) return } + if len(tx) > 1 { + r.logger.Error("generated more than 1 tx", zap.Int("generated_txs", len(tx))) + } lastTx := tx[len(tx)-1] if lastTx == nil { return @@ -222,6 +244,7 @@ func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadS r.nonces.Store(sender.Address(), lastTx.Nonce()+1) // Only use single txn builders here for _, txn := range tx { + builtTxs.Add(1) txChan <- txn } }) @@ -234,9 +257,10 @@ func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadS for { select { case txn := <-txChan: - txnLoad = append(txnLoad, txn) + batchWallet := r.walletGroups[r.getWalletForTx(txn)] + txnLoad[batchWallet] = append(txnLoad[batchWallet], txn) case <-doneChan: - r.logger.Info("Generated load txs", zap.Int("num_txs", len(txnLoad))) + r.logger.Info("Generated load txs", zap.Int("num_batches", len(txnLoad)), zap.Int64("built_txs", builtTxs.Load())) return txnLoad } } diff --git a/chains/ethereum/runner/runner.go b/chains/ethereum/runner/runner.go index 6232560..30cbb64 100644 --- a/chains/ethereum/runner/runner.go +++ b/chains/ethereum/runner/runner.go @@ -41,6 +41,8 @@ type Runner struct { // senderToWallet maps sender addresses to their assigned wallet senderToWallet map[common.Address]*wallet.InteractingWallet promMetrics *metrics.Metrics + // walletGroups maps each wallet to a canonical wallet which is connected to the same client. + walletGroups map[*wallet.InteractingWallet]*wallet.InteractingWallet } func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadTestSpec) (*Runner, error) { @@ -100,18 +102,25 @@ func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadT if i%10000 == 0 { logger.Info("Initializing nonces for accounts", zap.Int("progress", i)) } - nonce, err := wallet.GetNonce(ctx) - if err != nil { - logger.Warn("Failed getting nonce for wallet setting to 0", zap.String("address", - wallet.Address().String()), zap.Error(err)) - } - nonces.Store(wallet.Address(), nonce) + go func() { + nonce, err := wallet.GetNonce(ctx) + if err != nil { + logger.Warn("Failed getting nonce for wallet setting to 0", zap.String("address", + wallet.Address().String()), zap.Error(err)) + } + nonces.Store(wallet.Address(), nonce) + }() + } + logger.Info("Finished initializing nonces") // Create sender to wallet mapping for consistent endpoint usage senderToWallet := make(map[common.Address]*wallet.InteractingWallet) - for _, w := range wallets { + // Create wallet grouping for batching + walletGroups := make(map[*wallet.InteractingWallet]*wallet.InteractingWallet, len(wallets)) + for i, w := range wallets { senderToWallet[w.Address()] = w + walletGroups[w] = wallets[i%len(clients)] } promMetrics := metrics.NewMetrics() @@ -131,6 +140,7 @@ func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadT nonces: &nonces, senderToWallet: senderToWallet, promMetrics: promMetrics, + walletGroups: walletGroups, } return r, nil diff --git a/chains/ethereum/wallet/client.go b/chains/ethereum/wallet/client.go index b9819f1..91bc0a4 100644 --- a/chains/ethereum/wallet/client.go +++ b/chains/ethereum/wallet/client.go @@ -1,6 +1,9 @@ package wallet -import "github.com/ethereum/go-ethereum" +import ( + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/rpc" +) // Client is the interface for a go-ethereum client. type Client interface { @@ -18,4 +21,5 @@ type Client interface { ethereum.TransactionReader ethereum.TransactionSender ethereum.ChainIDReader + Client() *rpc.Client } diff --git a/chains/ethereum/wallet/wallet.go b/chains/ethereum/wallet/wallet.go index d434a66..de6954c 100644 --- a/chains/ethereum/wallet/wallet.go +++ b/chains/ethereum/wallet/wallet.go @@ -16,9 +16,11 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" loadtesttypes "github.com/skip-mev/catalyst/chains/types" "go.uber.org/zap" ) @@ -306,6 +308,22 @@ func (w *InteractingWallet) SendTransaction(ctx context.Context, signedTx *types return w.client.SendTransaction(ctx, signedTx) } +func (w *InteractingWallet) BatchSendTransactions(ctx context.Context, signedTxs types.Transactions) ([]rpc.BatchElem, error) { + elems := make([]rpc.BatchElem, len(signedTxs)) + for i, tx := range signedTxs { + data, err := tx.MarshalBinary() + if err != nil { + return nil, err + } + elems[i] = rpc.BatchElem{ + Method: "eth_sendRawTransaction", + Args: []interface{}{hexutil.Encode(data)}, + Result: new(string), + } + } + return elems, w.client.Client().BatchCallContext(ctx, elems) +} + // CreateAndSendTransaction creates, signs, and sends a transaction in one call func (w *InteractingWallet) CreateAndSendTransaction(ctx context.Context, to *common.Address, value *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte, nonce *uint64, diff --git a/chains/ethereum/wallet/wallet_test.go b/chains/ethereum/wallet/wallet_test.go index a74ca2c..a619dcc 100644 --- a/chains/ethereum/wallet/wallet_test.go +++ b/chains/ethereum/wallet/wallet_test.go @@ -5,16 +5,156 @@ import ( "math/big" "testing" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient/simulated" + "github.com/ethereum/go-ethereum/rpc" loadtesttypes "github.com/skip-mev/catalyst/chains/types" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) +// simulatedClientWrapper wraps a simulated.Client to add the Client() method +// We use a named field instead of embedding to avoid naming conflicts +type simulatedClientWrapper struct { + sc simulated.Client +} + +// Client implements the wallet.Client interface +func (w *simulatedClientWrapper) Client() *rpc.Client { + // The simulated backend doesn't expose the underlying RPC client + // For testing purposes, we can return nil since batch operations + // aren't used in these tests + return nil +} + +// Forward the necessary methods from simulated.Client +func (w *simulatedClientWrapper) BlockNumber(ctx context.Context) (uint64, error) { + return w.sc.BlockNumber(ctx) +} + +func (w *simulatedClientWrapper) ChainID(ctx context.Context) (*big.Int, error) { + return w.sc.ChainID(ctx) +} + +func (w *simulatedClientWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + return w.sc.EstimateGas(ctx, msg) +} + +func (w *simulatedClientWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) { + return w.sc.SuggestGasPrice(ctx) +} + +func (w *simulatedClientWrapper) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + return w.sc.SuggestGasTipCap(ctx) +} + +func (w *simulatedClientWrapper) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + return w.sc.BalanceAt(ctx, account, blockNumber) +} + +func (w *simulatedClientWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + return w.sc.PendingNonceAt(ctx, account) +} + +func (w *simulatedClientWrapper) SendTransaction(ctx context.Context, tx *types.Transaction) error { + return w.sc.SendTransaction(ctx, tx) +} + +func (w *simulatedClientWrapper) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return w.sc.HeaderByNumber(ctx, number) +} + +func (w *simulatedClientWrapper) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { + return w.sc.TransactionByHash(ctx, hash) +} + +func (w *simulatedClientWrapper) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + return w.sc.TransactionReceipt(ctx, txHash) +} + +func (w *simulatedClientWrapper) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + return w.sc.BlockByHash(ctx, hash) +} + +func (w *simulatedClientWrapper) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return w.sc.BlockByNumber(ctx, number) +} + +func (w *simulatedClientWrapper) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { + return w.sc.CodeAt(ctx, account, blockNumber) +} + +func (w *simulatedClientWrapper) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + return w.sc.CallContract(ctx, msg, blockNumber) +} + +func (w *simulatedClientWrapper) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return w.sc.HeaderByHash(ctx, hash) +} + +func (w *simulatedClientWrapper) TransactionSender(ctx context.Context, tx *types.Transaction, block common.Hash, index uint) (common.Address, error) { + // Simulated backend doesn't implement this directly, return empty address + return common.Address{}, ethereum.NotFound +} + +func (w *simulatedClientWrapper) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { + // Simulated backend doesn't implement this directly + return 0, ethereum.NotFound +} + +func (w *simulatedClientWrapper) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { + // Simulated backend doesn't implement this directly + return nil, ethereum.NotFound +} + +func (w *simulatedClientWrapper) PendingBalanceAt(ctx context.Context, account common.Address) (*big.Int, error) { + return w.sc.PendingBalanceAt(ctx, account) +} + +func (w *simulatedClientWrapper) PendingStorageAt(ctx context.Context, account common.Address, key common.Hash) ([]byte, error) { + return w.sc.PendingStorageAt(ctx, account, key) +} + +func (w *simulatedClientWrapper) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { + return w.sc.PendingCodeAt(ctx, account) +} + +func (w *simulatedClientWrapper) PendingTransactionCount(ctx context.Context) (uint, error) { + return w.sc.PendingTransactionCount(ctx) +} + +func (w *simulatedClientWrapper) PendingCallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { + return w.sc.PendingCallContract(ctx, msg) +} + +func (w *simulatedClientWrapper) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + return w.sc.FilterLogs(ctx, q) +} + +func (w *simulatedClientWrapper) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + return w.sc.SubscribeFilterLogs(ctx, q, ch) +} + +func (w *simulatedClientWrapper) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { + return w.sc.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) +} + +func (w *simulatedClientWrapper) StorageAt(ctx context.Context, account common.Address, key common.Hash, blockNumber *big.Int) ([]byte, error) { + return w.sc.StorageAt(ctx, account, key, blockNumber) +} + +func (w *simulatedClientWrapper) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return w.sc.NonceAt(ctx, account, blockNumber) +} + +func (w *simulatedClientWrapper) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + return w.sc.SubscribeNewHead(ctx, ch) +} + func setupSimulatedBackend(alloc types.GenesisAlloc) *simulated.Backend { backend := simulated.NewBackend(alloc) return backend @@ -58,10 +198,11 @@ func TestTransaction(t *testing.T) { sim := setupSimulatedBackend(alloc) ctx := context.Background() - id, err := sim.Client().ChainID(ctx) + client := &simulatedClientWrapper{sc: sim.Client()} + id, err := client.ChainID(ctx) require.NoError(t, err) - wallet := NewInteractingWallet(key, id, sim.Client()) + wallet := NewInteractingWallet(key, id, client) addr2 := getRandomAddr(t) nonce := uint64(0) @@ -83,7 +224,7 @@ func TestTransaction(t *testing.T) { require.NoError(t, err) require.Equal(t, receipt.Status, types.ReceiptStatusSuccessful) - gotTx, isPending, err := GetTxByHash(ctx, sim.Client(), receipt.TxHash) + gotTx, isPending, err := GetTxByHash(ctx, client, receipt.TxHash) require.NoError(t, err) require.False(t, isPending) require.Equal(t, gotTx.Hash(), tx.Hash()) @@ -101,10 +242,11 @@ func TestCreateSignedTransaction(t *testing.T) { defer sim.Close() ctx := context.Background() - id, err := sim.Client().ChainID(ctx) + client := &simulatedClientWrapper{sc: sim.Client()} + id, err := client.ChainID(ctx) require.NoError(t, err) - wallet := NewInteractingWallet(key, id, sim.Client()) + wallet := NewInteractingWallet(key, id, client) toAddr := getRandomAddr(t) value := big.NewInt(1000) data := []byte("test data") @@ -230,10 +372,11 @@ func TestCreateSignedDynamicFeeTx(t *testing.T) { defer sim.Close() ctx := context.Background() - id, err := sim.Client().ChainID(ctx) + client := &simulatedClientWrapper{sc: sim.Client()} + id, err := client.ChainID(ctx) require.NoError(t, err) - wallet := NewInteractingWallet(key, id, sim.Client()) + wallet := NewInteractingWallet(key, id, client) toAddr := getRandomAddr(t) value := big.NewInt(1000) data := []byte("test data")