Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Settings struct {
Prewarm bool `json:"prewarm,omitempty"`
RampUp bool `json:"rampUp,omitempty"`
ReportPath string `json:"reportPath,omitempty"`
TxsDir string `json:"txsDir,omitempty"`
TargetGas uint64 `json:"targetGas,omitempty"`
NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"`
}

// DefaultSettings returns the default configuration values
Expand All @@ -41,6 +44,9 @@ func DefaultSettings() Settings {
Prewarm: false,
RampUp: false,
ReportPath: "",
TxsDir: "",
TargetGas: 10_000_000,
NumBlocksToWrite: 100,
}
}

Expand All @@ -60,6 +66,9 @@ func InitializeViper(cmd *cobra.Command) error {
"workers": "workers",
"rampUp": "ramp-up",
"reportPath": "report-path",
"txsDir": "txs-dir",
"targetGas": "target-gas",
"numBlocksToWrite": "num-blocks-to-write",
}

for viperKey, flagName := range flagBindings {
Expand All @@ -82,6 +91,9 @@ func InitializeViper(cmd *cobra.Command) error {
viper.SetDefault("workers", defaults.Workers)
viper.SetDefault("rampUp", defaults.RampUp)
viper.SetDefault("reportPath", defaults.ReportPath)
viper.SetDefault("txsDir", defaults.TxsDir)
viper.SetDefault("targetGas", defaults.TargetGas)
viper.SetDefault("numBlocksToWrite", defaults.NumBlocksToWrite)
return nil
}

Expand Down Expand Up @@ -120,5 +132,8 @@ func ResolveSettings() Settings {
Prewarm: viper.GetBool("prewarm"),
RampUp: viper.GetBool("rampUp"),
ReportPath: viper.GetString("reportPath"),
TxsDir: viper.GetString("txsDir"),
TargetGas: viper.GetUint64("targetGas"),
NumBlocksToWrite: viper.GetInt("numBlocksToWrite"),
}
}
6 changes: 6 additions & 0 deletions config/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func TestArgumentPrecedence(t *testing.T) {
cmd.Flags().Int("buffer-size", 0, "Buffer size")
cmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")
cmd.Flags().String("report-path", "", "Report path")
cmd.Flags().String("txs-dir", "", "Txs dir")
cmd.Flags().Uint64("target-gas", 0, "Target gas")
cmd.Flags().Int("num-blocks-to-write", 0, "Number of blocks to write")

// Parse CLI args
if len(tt.cliArgs) > 0 {
Expand Down Expand Up @@ -133,6 +136,9 @@ func TestDefaultSettings(t *testing.T) {
Prewarm: false,
RampUp: false,
ReportPath: "",
TxsDir: "",
TargetGas: 10_000_000,
NumBlocksToWrite: 100,
}

if defaults != expected {
Expand Down
45 changes: 34 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -65,6 +66,9 @@ func init() {
rootCmd.Flags().String("metricsListenAddr", "0.0.0.0:9090", "The ip:port on which to export prometheus metrics.")
rootCmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")
rootCmd.Flags().String("report-path", "", "Path to save the report")
rootCmd.Flags().String("txs-dir", "", "Path to save the transactions")
rootCmd.Flags().Uint64("target-gas", 10_000_000, "Target gas per block")
rootCmd.Flags().Int("num-blocks-to-write", 100, "Number of blocks to write")

// Initialize Viper with proper error handling
if err := config.InitializeViper(rootCmd); err != nil {
Expand Down Expand Up @@ -169,12 +173,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
}

// Create the sender from the config struct
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}

// Create and start block collector if endpoints are available
var blockCollector *stats.BlockCollector
if len(cfg.Endpoints) > 0 && settings.TrackBlocks {
Expand Down Expand Up @@ -207,6 +205,12 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
})
}

// Create the sender from the config struct
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}

// Enable dry-run mode in sender if specified
if settings.DryRun {
snd.SetDryRun(true)
Expand All @@ -225,7 +229,25 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
snd.SetStatsCollector(collector, logger)

// Create dispatcher
dispatcher := sender.NewDispatcher(gen, snd)
var dispatcher *sender.Dispatcher
if settings.TxsDir != "" {
// get latest height
ethclient, err := ethclient.Dial(cfg.Endpoints[0])
if err != nil {
return fmt.Errorf("failed to create ethclient: %w", err)
}
latestHeight, err := ethclient.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("failed to get latest height: %w", err)
}
numBlocksToWrite := settings.NumBlocksToWrite
writerHeight := latestHeight + 10 // some buffer
log.Printf("🔍 Latest height: %d, writer start height: %d", latestHeight, writerHeight)
writer := sender.NewTxsWriter(settings.TargetGas, settings.TxsDir, writerHeight, uint64(numBlocksToWrite))
dispatcher = sender.NewDispatcher(gen, writer)
} else {
dispatcher = sender.NewDispatcher(gen, snd)
}

// Set statistics collector for dispatcher
dispatcher.SetStatsCollector(collector)
Expand All @@ -239,10 +261,11 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
log.Printf("📝 Prewarm mode: Accounts will be prewarmed")
}

// Start the sender (starts all workers)
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
log.Printf("✅ Connected to %d endpoints", snd.GetNumShards())

if settings.TxsDir == "" {
// Start the sender (starts all workers)
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
log.Printf("✅ Connected to %d endpoints", snd.GetNumShards())
}
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)
if settings.Prewarm {
if err := dispatcher.Prewarm(ctx); err != nil {
Expand Down
30 changes: 30 additions & 0 deletions profiles/conflict.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"chainId": 713714,
"seiChainId": "sei-chain",
"endpoints": [
"http://127.0.0.1:8545"
],
"accounts": {
"count": 5000,
"newAccountRate": 0.0
},
"scenarios": [
{
"name": "ERC20Conflict",
"weight": 1
}
],
"settings": {
"workers": 1,
"tps": 0,
"statsInterval": "5s",
"bufferSize": 1000,
"dryRun": false,
"debug": false,
"trackReceipts": false,
"trackBlocks": false,
"trackUserLatency": false,
"prewarm": false,
"rampUp": false
}
}
33 changes: 33 additions & 0 deletions profiles/evm_transfer_write_file.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"chainId": 713714,
"seiChainId": "sei-chain",
"endpoints": [
"http://127.0.0.1:8545"
],
"accounts": {
"count": 5000,
"newAccountRate": 0.0
},
"scenarios": [
{
"name": "EVMTransfer",
"weight": 1
}
],
"settings": {
"workers": 1,
"tps": 0,
"statsInterval": "5s",
"bufferSize": 1000,
"dryRun": false,
"debug": false,
"trackReceipts": false,
"trackBlocks": false,
"trackUserLatency": false,
"prewarm": false,
"rampUp": false,
"txsDir": "/root/load_txs",
"targetGas": 30000000,
"numBlocksToWrite": 1000
}
}
98 changes: 98 additions & 0 deletions sender/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sender

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"

"github.com/sei-protocol/sei-load/types"
)

// implements `Send`

type TxsWriter struct {
gasPerBlock uint64
nextHeight uint64
txsDir string
blocksGenerated uint64
numBlocks uint64

bufferGas uint64
txBuffer []*types.LoadTx
}

func NewTxsWriter(gasPerBlock uint64, txsDir string, startHeight uint64, numBlocks uint64) *TxsWriter {
// what height to start at?
return &TxsWriter{
gasPerBlock: gasPerBlock,
nextHeight: startHeight,
txsDir: txsDir,
blocksGenerated: 0,
numBlocks: numBlocks,

bufferGas: 0,
txBuffer: make([]*types.LoadTx, 0),
}
}

// Send writes the transaction to the writer
func (w *TxsWriter) Send(ctx context.Context, tx *types.LoadTx) error {
// if bwe would exceed gasPerBlock, flush
if w.bufferGas+tx.EthTx.Gas() > w.gasPerBlock {
if err := w.Flush(); err != nil {
return err
}
}

// add to buffer
w.txBuffer = append(w.txBuffer, tx)
w.bufferGas += tx.EthTx.Gas()
return nil
}

type TxWriteData struct {
TxPayloads [][]byte `json:"tx_payloads"`
}

func (w *TxsWriter) Flush() error {
defer func() {
// clear buffer and reset bufferGas and increment nextHeight
w.txBuffer = make([]*types.LoadTx, 0)
w.bufferGas = 0
w.nextHeight++
w.blocksGenerated++
}()
// write to dir `~/load_txs`
// make dir if it doesn't exist
err := os.MkdirAll(w.txsDir, 0755)
if err != nil {
return err
}
txsFile := filepath.Join(w.txsDir, fmt.Sprintf("%d_txs.json", w.nextHeight))
txData := TxWriteData{
TxPayloads: make([][]byte, 0),
}
for _, tx := range w.txBuffer {
txData.TxPayloads = append(txData.TxPayloads, tx.Payload)
}

txDataJSON, err := json.Marshal(txData)
if err != nil {
return err
}

if err := os.WriteFile(txsFile, txDataJSON, 0644); err != nil {
return err
}

log.Printf("Flushed %d transactions to %s", len(w.txBuffer), txsFile)

if w.blocksGenerated >= w.numBlocks {
return fmt.Errorf("reached max number of blocks: %d", w.numBlocks)
}

return nil
}
58 changes: 58 additions & 0 deletions sender/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package sender

import (
"context"
"testing"

"github.com/sei-protocol/sei-load/config"
"github.com/sei-protocol/sei-load/generator"
"github.com/sei-protocol/sei-load/generator/scenarios"
"github.com/sei-protocol/sei-load/types"
"github.com/stretchr/testify/require"
)

func TestTxsWriter_Flush(t *testing.T) {
// two evm transfer txs
writer := NewTxsWriter(42000, "/tmp", 1, 1)

loadConfig := &config.LoadConfig{
ChainID: 7777,
}

sharedAccounts := types.NewAccountPool(&types.AccountConfig{
Accounts: types.GenerateAccounts(10),
NewAccountRate: 0.0,
})

evmScenario := scenarios.CreateScenario(config.Scenario{
Name: "EVMTransfer",
Weight: 1,
})
evmScenario.Deploy(loadConfig, sharedAccounts.NextAccount())

generator := generator.NewScenarioGenerator(sharedAccounts, evmScenario)

txs := generator.GenerateN(3)

err := writer.Send(context.Background(), txs[0])
require.NoError(t, err)
require.Equal(t, uint64(1), writer.nextHeight)
require.Equal(t, uint64(21000), writer.bufferGas)
require.Len(t, writer.txBuffer, 1)
require.Equal(t, txs[0], writer.txBuffer[0])

err = writer.Send(context.Background(), txs[1])
require.NoError(t, err)
require.Equal(t, uint64(1), writer.nextHeight)
require.Equal(t, uint64(42000), writer.bufferGas)
require.Len(t, writer.txBuffer, 2)
require.Equal(t, txs[1], writer.txBuffer[1])

err = writer.Send(context.Background(), txs[2])
require.NoError(t, err)
// now should be flushed and have the new tx
require.Equal(t, uint64(2), writer.nextHeight)
require.Equal(t, uint64(21000), writer.bufferGas)
require.Len(t, writer.txBuffer, 1)

}
Loading