Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 53 additions & 29 deletions chains/ethereum/runner/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
inttypes "github.com/skip-mev/catalyst/chains/ethereum/types"
"github.com/skip-mev/catalyst/chains/ethereum/wallet"
loadtesttypes "github.com/skip-mev/catalyst/chains/types"
orderedmap "github.com/wk8/go-ordered-map/v2"
"go.uber.org/zap"
Expand Down Expand Up @@ -142,36 +144,45 @@ func (r *Runner) recordBlockTxs(blk *gethtypes.Header, tracker *orderedmap.Order
func (r *Runner) submitLoadPersistent(ctx context.Context, maxLoadSize int, tracker *orderedmap.OrderedMap[common.Hash, time.Time]) int {
// first we build the tx load. this constructs all the ethereum txs based in the spec.
r.logger.Info("building loads", zap.Int("num_msg_specs", len(r.spec.Msgs)))
var txs []*gethtypes.Transaction
batchTxs := make(map[*wallet.InteractingWallet]gethtypes.Transactions)
for _, msgSpec := range r.spec.Msgs {
load := r.buildLoadPersistent(msgSpec, maxLoadSize, false)
if len(load) == 0 {
continue
}
txs = append(txs, load...)
for k, v := range load {
r.logger.Info("built load of txs", zap.Int("load_size", len(v)))
batchTxs[k] = append(batchTxs[k], v...)
}
}

// submit each tx in a go routine
// submit each tx batch in a go routine
wg := sync.WaitGroup{}
sentTxs := make([]*inttypes.SentTx, len(txs))
for i, tx := range txs {
sentTxs := make(map[*wallet.InteractingWallet][]*inttypes.SentTx, len(batchTxs))
for fromWallet, txs := range batchTxs {
sentTxs[fromWallet] = make([]*inttypes.SentTx, len(txs))
wg.Go(func() {
// send the tx from the wallet assigned to this transaction's sender
fromWallet := r.getWalletForTx(tx)
err := fromWallet.SendTransaction(ctx, tx)
// send the txs from the wallet assigned to this transaction's sender
r.logger.Info("sending batch of txs", zap.Int("batch_size", len(txs)))
elems, err := fromWallet.BatchSendTransactions(ctx, txs)
if err != nil {
r.logger.Info("failed to send transaction", zap.String("tx_hash", tx.Hash().String()), zap.Error(err))
r.promMetrics.BroadcastFailure.Add(1)
} else {
r.promMetrics.BroadcastSuccess.Add(1)
r.logger.Info("failed to send batch transactions", zap.Int("len", len(txs)), zap.Error(err))
return
}
for i, elem := range elems {
if elem.Error != nil {
r.logger.Error("failed to send tx", zap.Error(elem.Error))
r.promMetrics.BroadcastFailure.Add(1)

txType := inttypes.ContractCall
sentTxs[i] = &inttypes.SentTx{
TxHash: tx.Hash(),
MsgType: txType,
Err: err,
Tx: tx,
} else {
r.promMetrics.BroadcastSuccess.Add(float64(len(txs)))
}
sentTxs[fromWallet][i] = &inttypes.SentTx{
TxHash: txs[i].Hash(),
MsgType: inttypes.ContractCall,
Err: elem.Error,
Tx: txs[i],
}
}
})
}
Expand All @@ -180,23 +191,31 @@ func (r *Runner) submitLoadPersistent(ctx context.Context, maxLoadSize int, trac

// Record broadcast time for all as now to get a rough sense--they should be pretty close anyways.
broadcastTime := time.Now()
for _, tx := range sentTxs {
if tx.Err != nil {
continue
var broadcastedTxs int
for _, txs := range sentTxs {
for _, tx := range txs {
if tx == nil {
continue
}
if tx.Err != nil {
continue
}
tracker.Set(tx.TxHash, broadcastTime)
}
tracker.Set(tx.TxHash, broadcastTime)
r.sentTxs = append(r.sentTxs, txs...)
broadcastedTxs += len(txs)
}

r.sentTxs = append(r.sentTxs, sentTxs...)
r.txFactory.ResetWalletAllocation()
return len(sentTxs)
return broadcastedTxs
}

func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadSize int, useBaseline bool) []*gethtypes.Transaction {
func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadSize int, useBaseline bool) map[*wallet.InteractingWallet]gethtypes.Transactions {
r.logger.Info("building load", zap.Int("maxLoadSize", maxLoadSize))
var txnLoad []*gethtypes.Transaction
txnLoad := make(map[*wallet.InteractingWallet]gethtypes.Transactions)
var wg sync.WaitGroup
txChan := make(chan *gethtypes.Transaction, maxLoadSize)
var builtTxs atomic.Int64
txChan := make(chan *gethtypes.Transaction)
for range maxLoadSize {
wg.Go(func() {
sender := r.txFactory.GetNextSender()
Expand All @@ -215,13 +234,17 @@ func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadS
r.logger.Error("failed to build txs", zap.Error(err))
return
}
if len(tx) > 1 {
r.logger.Error("generated more than 1 tx", zap.Int("generated_txs", len(tx)))
}
lastTx := tx[len(tx)-1]
if lastTx == nil {
return
}
r.nonces.Store(sender.Address(), lastTx.Nonce()+1)
// Only use single txn builders here
for _, txn := range tx {
builtTxs.Add(1)
txChan <- txn
}
})
Expand All @@ -234,9 +257,10 @@ func (r *Runner) buildLoadPersistent(msgSpec loadtesttypes.LoadTestMsg, maxLoadS
for {
select {
case txn := <-txChan:
txnLoad = append(txnLoad, txn)
batchWallet := r.walletGroups[r.getWalletForTx(txn)]
txnLoad[batchWallet] = append(txnLoad[batchWallet], txn)
case <-doneChan:
r.logger.Info("Generated load txs", zap.Int("num_txs", len(txnLoad)))
r.logger.Info("Generated load txs", zap.Int("num_batches", len(txnLoad)), zap.Int64("built_txs", builtTxs.Load()))
return txnLoad
}
}
Expand Down
24 changes: 17 additions & 7 deletions chains/ethereum/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Runner struct {
// senderToWallet maps sender addresses to their assigned wallet
senderToWallet map[common.Address]*wallet.InteractingWallet
promMetrics *metrics.Metrics
// walletGroups maps each wallet to a canonical wallet which is connected to the same client.
walletGroups map[*wallet.InteractingWallet]*wallet.InteractingWallet
}

func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadTestSpec) (*Runner, error) {
Expand Down Expand Up @@ -100,18 +102,25 @@ func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadT
if i%10000 == 0 {
logger.Info("Initializing nonces for accounts", zap.Int("progress", i))
}
nonce, err := wallet.GetNonce(ctx)
if err != nil {
logger.Warn("Failed getting nonce for wallet setting to 0", zap.String("address",
wallet.Address().String()), zap.Error(err))
}
nonces.Store(wallet.Address(), nonce)
go func() {
nonce, err := wallet.GetNonce(ctx)
if err != nil {
logger.Warn("Failed getting nonce for wallet setting to 0", zap.String("address",
wallet.Address().String()), zap.Error(err))
}
nonces.Store(wallet.Address(), nonce)
}()

}
logger.Info("Finished initializing nonces")

// Create sender to wallet mapping for consistent endpoint usage
senderToWallet := make(map[common.Address]*wallet.InteractingWallet)
for _, w := range wallets {
// Create wallet grouping for batching
walletGroups := make(map[*wallet.InteractingWallet]*wallet.InteractingWallet, len(wallets))
for i, w := range wallets {
senderToWallet[w.Address()] = w
walletGroups[w] = wallets[i%len(clients)]
}

promMetrics := metrics.NewMetrics()
Expand All @@ -131,6 +140,7 @@ func NewRunner(ctx context.Context, logger *zap.Logger, spec loadtesttypes.LoadT
nonces: &nonces,
senderToWallet: senderToWallet,
promMetrics: promMetrics,
walletGroups: walletGroups,
}

return r, nil
Expand Down
6 changes: 5 additions & 1 deletion chains/ethereum/wallet/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package wallet

import "github.com/ethereum/go-ethereum"
import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
)

// Client is the interface for a go-ethereum client.
type Client interface {
Expand All @@ -18,4 +21,5 @@ type Client interface {
ethereum.TransactionReader
ethereum.TransactionSender
ethereum.ChainIDReader
Client() *rpc.Client
}
18 changes: 18 additions & 0 deletions chains/ethereum/wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
loadtesttypes "github.com/skip-mev/catalyst/chains/types"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -306,6 +308,22 @@ func (w *InteractingWallet) SendTransaction(ctx context.Context, signedTx *types
return w.client.SendTransaction(ctx, signedTx)
}

func (w *InteractingWallet) BatchSendTransactions(ctx context.Context, signedTxs types.Transactions) ([]rpc.BatchElem, error) {
elems := make([]rpc.BatchElem, len(signedTxs))
for i, tx := range signedTxs {
data, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
elems[i] = rpc.BatchElem{
Method: "eth_sendRawTransaction",
Args: []interface{}{hexutil.Encode(data)},
Result: new(string),
}
}
return elems, w.client.Client().BatchCallContext(ctx, elems)
}

// CreateAndSendTransaction creates, signs, and sends a transaction in one call
func (w *InteractingWallet) CreateAndSendTransaction(ctx context.Context, to *common.Address, value *big.Int,
gasLimit uint64, gasPrice *big.Int, data []byte, nonce *uint64,
Expand Down
Loading
Loading