Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3428613
(feat): new approach for private tx relay and preconfs
manav2401 Jan 21, 2026
ab2d12c
(feat): refactor relay service
manav2401 Jan 22, 2026
dbe5ae0
small nit
manav2401 Jan 22, 2026
6b24f92
eth/relay: fix lint
manav2401 Jan 22, 2026
5ac4830
eth: add tests for disable tx propagation
manav2401 Jan 26, 2026
0cebc99
eth: add tests for private transactions
manav2401 Jan 26, 2026
fe59340
eth/relay: small fixes
manav2401 Jan 26, 2026
c7860d4
eth/relay: small fixes
manav2401 Jan 26, 2026
6f84bb4
eth/relay: add tests for private tx store
manav2401 Jan 26, 2026
1a65d24
eth/relay: add tests for multiclient
manav2401 Jan 27, 2026
034c76f
eth/relay: fix call to send private tx
manav2401 Jan 27, 2026
9de8b0b
eth/relay: add a sub test for multiclient
manav2401 Jan 27, 2026
0944b9a
eth/relay: add tests for relay service
manav2401 Jan 27, 2026
abd8b52
Merge branch 'raneet10/new-upstream-v1.16.7' into preconf-private-tx-…
manav2401 Jan 27, 2026
204e962
address claude comments, fix non-deterministic tests
manav2401 Jan 27, 2026
b90f4e3
eth/relay: update relay to server preconf for any tx hash, update tests
manav2401 Jan 28, 2026
550c61c
Merge branch 'raneet10/new-upstream-v1.16.7' into preconf-private-tx-…
manav2401 Jan 28, 2026
927bce6
eth: close relay service in backend
manav2401 Jan 28, 2026
68842ca
eth: disable tx propagation for rebroadcast
manav2401 Jan 28, 2026
3073f7a
eth/relay: handle 'already known' errors
manav2401 Jan 28, 2026
eb0baa8
eth/relay: small nits
manav2401 Jan 28, 2026
db91992
internal/ethapi: update txpool_txStatus api comment
manav2401 Jan 28, 2026
4abd039
eth: check tx status in local db
manav2401 Jan 28, 2026
fc9e4a1
eth/relay: fix tests
manav2401 Jan 29, 2026
07c2618
address claude comments
manav2401 Jan 29, 2026
052d0c3
eth/relay: handle race condition for updating task in cache
manav2401 Feb 3, 2026
e0713eb
Merge branch 'develop' into preconf-private-tx-relay
manav2401 Feb 3, 2026
170ca18
internal/ethapi: ignore already known error for preconfs
manav2401 Feb 3, 2026
8d73cb6
eth/relay: add CLAUDE.md with some context
manav2401 Feb 3, 2026
1b8ebb5
internal/ethapi: update hash when tx is already known
manav2401 Feb 3, 2026
c8422dc
eth/relay: fix lint
manav2401 Feb 3, 2026
d17c585
eth/relay: small delay before task being consumed in test
manav2401 Feb 3, 2026
f153d32
internal/ethapi: lint fix
manav2401 Feb 3, 2026
a33980b
internal/ethapi: use tx subscription for offering preconf
manav2401 Feb 4, 2026
344ea8f
Merge branch 'develop' into preconf-private-tx-relay
manav2401 Feb 4, 2026
6e07e52
Revert "internal/ethapi: use tx subscription for offering preconf"
manav2401 Feb 4, 2026
8a4e7b1
eth: move log to debug
manav2401 Feb 4, 2026
fc7bdca
eth/relay: add private tx store metrics
manav2401 Feb 4, 2026
b95941d
eth/relay: add more metrics
manav2401 Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/relay"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand All @@ -54,6 +55,8 @@ type EthAPIBackend struct {
allowUnprotectedTxs bool
eth *Ethereum
gpo *gasprice.Oracle

relay *relay.RelayService
}

// ChainConfig returns the active chain configuration.
Expand Down Expand Up @@ -481,6 +484,10 @@ func (b *EthAPIBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transac
return b.eth.txPool.ContentFrom(addr)
}

func (b *EthAPIBackend) TxStatus(hash common.Hash) txpool.TxStatus {
return b.eth.txPool.Status(hash)
}

func (b *EthAPIBackend) TxPool() *txpool.TxPool {
return b.eth.txPool
}
Expand Down Expand Up @@ -720,3 +727,40 @@ func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration {
func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration {
return b.eth.config.TxSyncMaxTimeout
}

// Preconf / Private tx related API for relay
func (b *EthAPIBackend) PreconfEnabled() bool {
return b.relay.PreconfEnabled()
}
func (b *EthAPIBackend) SubmitTxForPreconf(tx *types.Transaction) error {
return b.relay.SubmitPreconfTransaction(tx)
}

func (b *EthAPIBackend) CheckPreconfStatus(hash common.Hash) (bool, error) {
return b.relay.CheckPreconfStatus(hash)
}

func (b *EthAPIBackend) PrivateTxEnabled() bool {
return b.relay.PrivateTxEnabled()
}

func (b *EthAPIBackend) SubmitPrivateTx(tx *types.Transaction) error {
return b.relay.SubmitPrivateTransaction(tx)
}

// Preconf / Private tx related API for block producers
func (b *EthAPIBackend) AcceptPreconfTxs() bool {
return b.relay.AcceptPreconfTxs()
}

func (b *EthAPIBackend) AcceptPrivateTxs() bool {
return b.relay.AcceptPrivateTxs()
}

func (b *EthAPIBackend) RecordPrivateTx(hash common.Hash) {
b.relay.RecordPrivateTx(hash)
}

func (b *EthAPIBackend) PurgePrivateTx(hash common.Hash) {
b.relay.PurgePrivateTx(hash)
}
38 changes: 19 additions & 19 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/protocols/wit"
"github.com/ethereum/go-ethereum/eth/relay"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -228,14 +229,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
closeCh: make(chan struct{}),
}

relayService := relay.Init(config.EnablePreconfs, config.EnablePrivateTx, config.AcceptPreconfTx, config.AcceptPrivateTx, config.BlockProducerRpcEndpoints)
privateTxGetter := relayService.GetPrivateTxGetter()

// START: Bor changes
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil, relayService}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("------Unprotected transactions allowed-------")
log.Info("Unprotected transactions allowed")
config.TxPool.AllowUnprotectedTxs = true
}

gpoParams := config.GPO
// Set transaction getter for relay service to query local database
relayService.SetTxGetter(eth.APIBackend.GetCanonicalTransaction)

blockChainAPI := ethapi.NewBlockChainAPI(eth.APIBackend)
engine, err := ethconfig.CreateConsensusEngine(config.Genesis.Config, config, chainDb, blockChainAPI)
Expand Down Expand Up @@ -327,6 +332,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.blockchain, err = core.NewBlockChain(chainDb, config.Genesis, eth.engine, options)
}

// Set the chain head event subscription function for private tx store
relayService.SetchainEventSubFn(eth.blockchain.SubscribeChainEvent)

// Set parallel stateless import toggle on blockchain
if err == nil && eth.blockchain != nil && config.EnableParallelStatelessImport {
eth.blockchain.ParallelStatelessImportEnable()
Expand All @@ -342,18 +350,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Set blockchain reference for fork detection in whitelist service
checker.SetBlockchain(eth.blockchain)

// 1.14.8: NewOracle function definition was changed to accept (startPrice *big.Int) param.
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams, config.Miner.GasPrice)

// bor: this is nor present in geth
/*
_ = eth.engine.VerifyHeader(eth.blockchain, eth.blockchain.CurrentHeader()) // TODO think on it
*/

// BOR changes
eth.APIBackend.gpo.ProcessCache()
// BOR changes

// Initialize filtermaps log index.
fmConfig := filtermaps.Config{
History: config.LogHistory,
Expand Down Expand Up @@ -423,11 +419,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
checker: checker,
enableBlockTracking: eth.config.EnableBlockTracking,
txAnnouncementOnly: eth.p2pServer.TxAnnouncementOnly,
disableTxPropagation: eth.p2pServer.DisableTxPropagation,
witnessProtocol: eth.config.WitnessProtocol,
syncWithWitnesses: eth.config.SyncWithWitnesses,
syncAndProduceWitnesses: eth.config.SyncAndProduceWitnesses,
fastForwardThreshold: config.FastForwardThreshold,
p2pServer: eth.p2pServer,
privateTxGetter: privateTxGetter,
}); err != nil {
return nil, err
}
Expand All @@ -440,12 +438,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.miner.SetPrioAddresses(config.TxPool.Locals)
}

eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("Unprotected transactions allowed")
}
// 1.14.8: NewOracle function definition was changed to accept (startPrice *big.Int) param.
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, config.GPO, config.Miner.GasPrice)
eth.APIBackend.gpo.ProcessCache()

// Start the RPC service
eth.netRPCService = ethapi.NewNetAPI(eth.p2pServer, config.NetworkId)
Expand Down Expand Up @@ -1010,6 +1005,11 @@ func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.discmix.Close()

// Close the tx relay service if enabled
if s.APIBackend.relay != nil {
s.APIBackend.relay.Close()
}

// Close the engine before handler else it may cause a deadlock where
// the heimdall is unresponsive and the syncing loop keeps waiting
// for a response and is unable to proceed to exit `Finalize` during
Expand Down
9 changes: 9 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ type Config struct {
// EIP-7966: eth_sendRawTransactionSync timeouts
TxSyncDefaultTimeout time.Duration `toml:",omitempty"`
TxSyncMaxTimeout time.Duration `toml:",omitempty"`

// Preconf / Private transaction relay related settings
EnablePreconfs bool
EnablePrivateTx bool
BlockProducerRpcEndpoints []string

// Preconf / Private transaction related settings for block producers
AcceptPreconfTx bool
AcceptPrivateTx bool
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
66 changes: 49 additions & 17 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/protocols/wit"
"github.com/ethereum/go-ethereum/eth/relay"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
Expand Down Expand Up @@ -120,12 +121,14 @@ type handlerConfig struct {
EthAPI *ethapi.BlockChainAPI // EthAPI to interact
enableBlockTracking bool // Whether to log information collected while tracking block lifecycle
txAnnouncementOnly bool // Whether to only announce txs to peers
disableTxPropagation bool // Whether to disable broadcasting and announcement of txs to peers
witnessProtocol bool // Whether to enable witness protocol
syncWithWitnesses bool // Whether to sync blocks with witnesses
syncAndProduceWitnesses bool // Whether to sync blocks and produce witnesses simultaneously
fastForwardThreshold uint64 // Minimum necessary distance between local header and peer to fast forward
gasCeil uint64 // Gas ceiling for dynamic witness page threshold calculation
p2pServer *p2p.Server // P2P server for jailing peers
privateTxGetter relay.PrivateTxGetter // privateTxGetter to check if a transaction needs to be treated as private or not
}

type handler struct {
Expand All @@ -150,6 +153,9 @@ type handler struct {

ethAPI *ethapi.BlockChainAPI // EthAPI to interact

// privateTxGetter to check if a transaction needs to be treated as private or not
privateTxGetter relay.PrivateTxGetter

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
Expand All @@ -160,9 +166,11 @@ type handler struct {

requiredBlocks map[uint64]common.Hash

enableBlockTracking bool
txAnnouncementOnly bool
p2pServer *p2p.Server // P2P server for jailing peers
enableBlockTracking bool
txAnnouncementOnly bool
disableTxPropagation bool

p2pServer *p2p.Server // P2P server for jailing peers

// Witness protocol related fields
syncWithWitnesses bool
Expand Down Expand Up @@ -199,12 +207,14 @@ func newHandler(config *handlerConfig) (*handler, error) {
requiredBlocks: config.RequiredBlocks,
enableBlockTracking: config.enableBlockTracking,
txAnnouncementOnly: config.txAnnouncementOnly,
disableTxPropagation: config.disableTxPropagation,
p2pServer: config.p2pServer,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
syncWithWitnesses: config.syncWithWitnesses,
syncAndProduceWitnesses: config.syncAndProduceWitnesses,
privateTxGetter: config.privateTxGetter,
}

log.Info("Sync with witnesses", "enabled", config.syncWithWitnesses)
Expand Down Expand Up @@ -423,9 +433,12 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
}
h.chainSync.handlePeerEvent()

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
h.syncTransactions(peer)
// Bor: skip propagating transactions if flag is set
if !h.disableTxPropagation {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
h.syncTransactions(peer)
}

// Create a notification channel for pending requests if the peer goes down
dead := make(chan struct{})
Expand Down Expand Up @@ -579,17 +592,27 @@ func (h *handler) unregisterPeer(id string) {
func (h *handler) Start(maxPeers int) {
h.maxPeers = maxPeers

// broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()
if h.disableTxPropagation {
log.Info("Disabling transaction propagation completely")
}

// Bor: block producers can choose to not propagate transactions to save p2p overhead
// broadcast and announce transactions (only new ones, not resurrected ones) only
// if transaction propagation is enabled
if !h.disableTxPropagation {
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()
}

// rebroadcast stuck transactions
h.wg.Add(1)
h.stuckTxsCh = make(chan core.StuckTxsEvent, txChanSize)
h.stuckTxsSub = h.txpool.SubscribeRebroadcastTransactions(h.stuckTxsCh)
go h.stuckTxBroadcastLoop()
if !h.disableTxPropagation {
h.wg.Add(1)
h.stuckTxsCh = make(chan core.StuckTxsEvent, txChanSize)
h.stuckTxsSub = h.txpool.SubscribeRebroadcastTransactions(h.stuckTxsCh)
go h.stuckTxBroadcastLoop()
}

// broadcast mined blocks
h.wg.Add(1)
Expand All @@ -610,8 +633,12 @@ func (h *handler) Start(maxPeers int) {
}

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.stuckTxsSub.Unsubscribe() // quits stuckTxBroadcastLoop
if h.txsSub != nil {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
}
if h.stuckTxsSub != nil {
h.stuckTxsSub.Unsubscribe() // quits stuckTxBroadcastLoop
}
h.minedBlockSub.Unsubscribe()
h.blockRange.stop()

Expand Down Expand Up @@ -736,6 +763,11 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
)

for _, tx := range txs {
// Skip gossip if transaction is marked as private
if h.privateTxGetter != nil && h.privateTxGetter.IsTxPrivate(tx.Hash()) {
log.Debug("[tx-relay] skip tx broadcast for private tx", "hash", tx.Hash())
continue
}
var directSet map[*ethPeer]struct{}
switch {
case tx.Type() == types.BlobTxType:
Expand Down
Loading
Loading