Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ gomods: ## Install gomods
go install github.com/jmank88/gomods@v0.1.6

.PHONY: tidy
tidy: gomods
tidy: gomods ## Tidy go.mod and go.sum files
gomods tidy

.PHONY: mockery
mockery: ## Install mockery.
mockery: ## Install mockery
go install github.com/vektra/mockery/v2@v2.53.3

.PHONY: codecgen
Expand All @@ -20,15 +20,20 @@ protoc: ## Install protoc
go install google.golang.org/protobuf/cmd/protoc-gen-go@`go list -m -json google.golang.org/protobuf | jq -r .Version`

.PHONY: generate
generate: gomods codecgen mockery protoc modgraph
generate: gomods codecgen mockery protoc modgraph ## Generate code for all modules
export PATH="$(HOME)/.local/bin:$(PATH)"; gomods -s gethwrappers,contracts/cre/ -go generate ./...
find . -type f -name .mockery.yaml -not -path "./contracts/" -not -path "./gethwrappers/" -execdir mockery \; ## Execute mockery for all .mockery.yaml files

.PHONY: rm-mocked
rm-mocked:
rm-mocked: ## Remove mocked code
grep -rl "^// Code generated by mockery" | grep --exclude-dir ./contracts/ --exclude-dir ./gethwrappers/ .go$ | xargs -r rm

.PHONY: modgraph
modgraph: gomods
modgraph: gomods ## Generate module graph
go install github.com/jmank88/modgraph@v0.1.0
./modgraph > go.md

.PHONY: help
help: ## Show help for all targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | \
awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ require (
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20251210101658-1c5c8e4c4f15
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251020150604-8ab84f7bad1a
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20251021173435-e86785845942
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260127180652-a6229ee26d64
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251124151448-0448aefdaab9
github.com/smartcontractkit/chainlink-protos/svr v1.1.0
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250815105909-75499abc4335
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e
github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,12 @@ github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251124151448-0448ae
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251124151448-0448aefdaab9/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY=
github.com/smartcontractkit/chainlink-protos/svr v1.1.0 h1:79Z9N9dMbMVRGaLoDPAQ+vOwbM+Hnx8tIN2xCPG8H4o=
github.com/smartcontractkit/chainlink-protos/svr v1.1.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20260126141633-6e2417998710 h1:yaQ5/rGhFTONw0fbofB0u3rGGCrzDgfqpFJaoR7n6e0=
github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20260126141633-6e2417998710/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260126141927-b984c6f68455 h1:7iIIRgvejQ2dFHI7YhYI4X0supXvkZHrzo2ueJta494=
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260126141927-b984c6f68455/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260127180652-a6229ee26d64 h1:tHdBvgxEB4ABZaEyYN3SBcmhFfFU5ML6WhA2s9dnncQ=
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260127180652-a6229ee26d64/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250815105909-75499abc4335 h1:7bxYNrPpygn8PUSBiEKn8riMd7CXMi/4bjTy0fHhcrY=
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250815105909-75499abc4335/go.mod h1:ccjEgNeqOO+bjPddnL4lUrNLzyCvGCxgBjJdhFX3wa8=
github.com/smartcontractkit/chainlink-tron/relayer/gotron-sdk v0.0.5-0.20250528121202-292529af39df h1:36e3ROIZyV/qE8SvFOACXtXfMOMd9vG4+zY2v2ScXkI=
Expand Down
141 changes: 119 additions & 22 deletions pkg/txm/clientwrappers/dualbroadcast/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
evmtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/gogo/protobuf/proto"
"google.golang.org/protobuf/encoding/protojson"

"github.com/mitchellh/mapstructure"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-evm/pkg/txm"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
atlaspb "github.com/smartcontractkit/chainlink-protos/svr/v1"
)

const (
Expand Down Expand Up @@ -131,27 +135,31 @@ type MetaClientRPC interface {
}

type MetaClient struct {
lggr logger.SugaredLogger
c MetaClientRPC
ks MetaClientKeystore
customURL *url.URL
chainID *big.Int
metrics *MetaMetrics
lggr logger.SugaredLogger
c MetaClientRPC
ks MetaClientKeystore
customURL *url.URL
chainID *big.Int
metrics *MetaMetrics
beholderEmitter beholder.Emitter
}

const schemaBasePath = "/oev-fastlane-atlas-error/versions/1"

func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, customURL *url.URL, chainID *big.Int) (*MetaClient, error) {
metrics, err := NewMetaMetrics(chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to create Meta metrics: %w", err)
}

return &MetaClient{
lggr: logger.Sugared(logger.Named(lggr, "Txm.MetaClient")),
c: c,
ks: ks,
customURL: customURL,
chainID: chainID,
metrics: metrics,
lggr: logger.Sugared(logger.Named(lggr, "Txm.MetaClient")),
c: c,
ks: ks,
customURL: customURL,
chainID: chainID,
metrics: metrics,
beholderEmitter: beholder.GetEmitter(),
}, nil
}

Expand All @@ -163,6 +171,69 @@ func (a *MetaClient) PendingNonceAt(ctx context.Context, address common.Address)
return a.c.PendingNonceAt(ctx, address)
}

// TODO(gg): move this to meta_metrics.go
// emitAtlasErrorWithHttpStatusCode emits an OTel event to track FastLane Atlas errors with an HTTP status code
func (a *MetaClient) emitAtlasErrorWithHttpStatusCode(ctx context.Context, errType string, cause error, httpStatusCode int, tx *types.Transaction) {
var nonce string
if tx.Nonce != nil {
nonce = fmt.Sprintf("%d", *tx.Nonce)
}

meta, err := tx.GetMeta()
if err != nil {
a.lggr.Errorw(fmt.Sprintf("Failed to get meta for tx. Error to emit was: %v", cause), "txId", tx.ID, "err", err)
return
}

var destAddress string
if meta != nil && meta.FwdrDestAddress != nil {
destAddress = meta.FwdrDestAddress.String()
}

msg := &atlaspb.FastLaneAtlasError{
ChainId: a.chainID.String(),
FromAddress: tx.FromAddress.Hex(),
ToAddress: tx.ToAddress.Hex(),
FeedAddress: destAddress,
Nonce: nonce,
ErrorType: errType,
ErrorMessage: cause.Error(),
HttpStatusCode: int32(httpStatusCode),
TransactionId: int64(tx.ID), //nolint:gosec // overflow is acceptable for telemetry
AtlasUrl: a.customURL.String(),
CreatedAt: time.Now().UnixMicro(),
}

a.lggr.Infow("Emitting Atlas error event", "msg", msg)

messageBytes, err := proto.Marshal(msg)
if err != nil {
a.lggr.Errorw("Failed to marshal Atlas error event", "err", err)
return
}

attrKVs := []any{
"beholder_domain", "svr",
"beholder_entity", "svr.v1.FastLaneAtlasError",
"beholder_data_schema", "/fastlane-atlas-error/versions/1",
}

mStr := protojson.MarshalOptions{
UseProtoNames: true,
EmitUnpopulated: true,
}.Format(msg)
a.lggr.Infow("[Beholder.emit]", "message", mStr, "attributes", attrKVs)

if emitErr := a.beholderEmitter.Emit(ctx, messageBytes, attrKVs...); emitErr != nil {
a.lggr.Errorw("Failed to emit Atlas error event", "err", emitErr)
}
}

// emitAtlasError emits an OTel event to track FastLane Atlas errors
func (a *MetaClient) emitAtlasError(ctx context.Context, errType string, err error, tx *types.Transaction) {
a.emitAtlasErrorWithHttpStatusCode(ctx, errType, err, -1, tx)
}

func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error {
meta, err := tx.GetMeta()
if err != nil {
Expand All @@ -173,11 +244,13 @@ func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction,
meta, err := a.SendRequest(ctx, tx, attempt, *meta.DualBroadcastParams, tx.ToAddress)
if err != nil {
a.metrics.RecordSendRequestError(ctx)
a.emitAtlasError(ctx, "send_request", err, tx)
return fmt.Errorf("error sending request for transactionID(%d): %w", tx.ID, err)
}
if meta != nil {
if err := a.SendOperation(ctx, tx, attempt, *meta); err != nil {
a.metrics.RecordSendOperationError(ctx)
a.emitAtlasError(ctx, "send_operation", err, tx)
return fmt.Errorf("failed to send operation for transactionID(%d): %w", tx.ID, err)
}
return nil
Expand Down Expand Up @@ -314,17 +387,23 @@ func (a *MetaClient) SendRequest(parentCtx context.Context, tx *types.Transactio

signature, err := a.ks.SignMessage(parentCtx, tx.FromAddress, []byte(payload))
if err != nil {
return nil, fmt.Errorf("failed to sign message: %w", err)
wrappedErr := fmt.Errorf("failed to sign message: %w", err)
a.emitAtlasError(ctx, "sign_message", wrappedErr, tx)
return nil, wrappedErr
}
params.Signature = signature
marshalledParamsExtended, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("failed to marshal signed params: %w", err)
wrappedErr := fmt.Errorf("failed to marshal signed params: %w", err)
a.emitAtlasError(ctx, "marshal_params", wrappedErr, tx)
return nil, wrappedErr
}
body := fmt.Appendf(nil, `{"jsonrpc":"2.0","method":"%s","params":[%s], "id":1}`, string(m), marshalledParamsExtended)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, a.customURL.String(), bytes.NewBuffer(body))
if err != nil {
return nil, fmt.Errorf("failed to create POST request: %w", err)
wrappedErr := fmt.Errorf("failed to create POST request: %w", err)
a.emitAtlasError(ctx, "create_request", wrappedErr, tx)
return nil, wrappedErr
}
req.Header.Add("Content-Type", "application/json")

Expand All @@ -336,25 +415,31 @@ func (a *MetaClient) SendRequest(parentCtx context.Context, tx *types.Transactio
// Record latency
a.metrics.RecordLatency(ctx, latency)
if err != nil {
return nil, fmt.Errorf("failed to send POST request: %w", err)
wrappedErr := fmt.Errorf("failed to send POST request: %w", err)
a.emitAtlasError(ctx, "http_request", wrappedErr, tx)
return nil, wrappedErr
}
defer resp.Body.Close()

// Record status code
a.metrics.RecordStatusCode(ctx, resp.StatusCode)

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request %v failed with status: %d", req, resp.StatusCode)
httpErr := fmt.Errorf("request %v failed with status: %d", req, resp.StatusCode)
a.emitAtlasErrorWithHttpStatusCode(ctx, "http_status", httpErr, resp.StatusCode, tx)
return nil, httpErr
}

data, err := io.ReadAll(resp.Body)
if err != nil {
a.emitAtlasErrorWithHttpStatusCode(ctx, "read_response", err, resp.StatusCode, tx)
return nil, err
}

var response Response
err = json.Unmarshal(data, &response)
if err != nil {
a.emitAtlasErrorWithHttpStatusCode(ctx, "unmarshal_response", err, resp.StatusCode, tx)
return nil, err
}

Expand All @@ -363,7 +448,9 @@ func (a *MetaClient) SendRequest(parentCtx context.Context, tx *types.Transactio
a.metrics.RecordBidsReceived(ctx, 0)
return nil, nil
}
return nil, errors.New(response.Error.ErrorMessage)
atlasErr := errors.New(response.Error.ErrorMessage)
a.emitAtlasErrorWithHttpStatusCode(ctx, "atlas_error", atlasErr, resp.StatusCode, tx)
return nil, atlasErr
}

if response.Result == nil {
Expand Down Expand Up @@ -500,7 +587,9 @@ func VerifyMetadata(txData []byte, fromAddress common.Address, result Metacallda

func (a *MetaClient) SendOperation(ctx context.Context, tx *types.Transaction, attempt *types.Attempt, meta MetacalldataResponse) error {
if tx.Nonce == nil {
return fmt.Errorf("failed to create attempt for txID: %v: nonce empty", tx.ID)
nonceErr := fmt.Errorf("failed to create attempt for txID: %v: nonce empty", tx.ID)
a.emitAtlasError(ctx, "nonce_empty", nonceErr, tx)
return nonceErr
}

// TODO: fastest way to avoid overpaying, but might require additional checks.
Expand All @@ -510,7 +599,9 @@ func (a *MetaClient) SendOperation(ctx context.Context, tx *types.Transaction, a
}
gas := meta.GasLimit.ToInt()
if !gas.IsUint64() {
return fmt.Errorf("gas value does not fit in uint64: %s", gas)
gasErr := fmt.Errorf("gas value does not fit in uint64: %s", gas)
a.emitAtlasError(ctx, "gas_overflow", gasErr, tx)
return gasErr
}
dynamicTx := evmtypes.DynamicFeeTx{
Nonce: *tx.Nonce,
Expand All @@ -523,9 +614,15 @@ func (a *MetaClient) SendOperation(ctx context.Context, tx *types.Transaction, a

signedTx, err := a.ks.SignTx(ctx, tx.FromAddress, evmtypes.NewTx(&dynamicTx))
if err != nil {
return fmt.Errorf("failed to sign attempt for txID: %v, err: %w", tx.ID, err)
signErr := fmt.Errorf("failed to sign attempt for txID: %v, err: %w", tx.ID, err)
a.emitAtlasError(ctx, "sign_tx", signErr, tx)
return signErr
}
a.lggr.Infow("Intercepted attempt for tx", "txID", tx.ID, "hash", signedTx.Hash(), "toAddress", meta.ToAddress, "gasLimit", meta.GasLimit,
"TipCap", tip, "FeeCap", meta.MaxFeePerGas)
return a.c.SendTransaction(ctx, signedTx)
if err := a.c.SendTransaction(ctx, signedTx); err != nil {
a.emitAtlasError(ctx, "send_tx", err, tx)
return err
}
return nil
}
Loading
Loading