Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package catalyst

import (
"context"
"errors"
"fmt"
"reflect"
Expand All @@ -43,6 +44,10 @@ import (
"github.com/ethereum/go-ethereum/params/forks"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// Register adds the engine API to the full node.
Expand Down Expand Up @@ -609,16 +614,33 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo
// Helper for NewPayload* methods.
var invalidStatus = engine.PayloadStatusV1{Status: engine.INVALID}

func startNewPayloadSpan(ctx context.Context, name string, params engine.ExecutableData) (context.Context, trace.Span) {
tracer := getEngineTracer()
ctx, span := tracer.Start(ctx, name)
span.SetAttributes(
attribute.Int64("block.number", int64(params.Number)),
attribute.String("block.hash", params.BlockHash.Hex()),
attribute.Int("tx.count", len(params.Transactions)),
)
return ctx, span
}

// NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) NewPayloadV1(params engine.ExecutableData) (engine.PayloadStatusV1, error) {
// TODO(jrhea): Decide whether validation errors should be recorded as span errors.
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadV1", params)
defer span.End()
if params.Withdrawals != nil {
return invalidStatus, paramsErr("withdrawals not supported in V1")
}
return api.newPayload(params, nil, nil, nil, false)
return api.newPayload(ctx, params, nil, nil, nil, false)
}

// NewPayloadV2 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) NewPayloadV2(params engine.ExecutableData) (engine.PayloadStatusV1, error) {
// TODO(jrhea): Decide whether validation errors should be recorded as span errors.
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadV2", params)
defer span.End()
var (
cancun = api.config().IsCancun(api.config().LondonBlock, params.Timestamp)
shanghai = api.config().IsShanghai(api.config().LondonBlock, params.Timestamp)
Expand All @@ -635,11 +657,14 @@ func (api *ConsensusAPI) NewPayloadV2(params engine.ExecutableData) (engine.Payl
case params.BlobGasUsed != nil:
return invalidStatus, paramsErr("non-nil blobGasUsed pre-cancun")
}
return api.newPayload(params, nil, nil, nil, false)
return api.newPayload(ctx, params, nil, nil, nil, false)
}

// NewPayloadV3 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) NewPayloadV3(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) {
// TODO(jrhea): Decide whether validation errors should be recorded as span errors.
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadV3", params)
defer span.End()
switch {
case params.Withdrawals == nil:
return invalidStatus, paramsErr("nil withdrawals post-shanghai")
Expand All @@ -654,11 +679,14 @@ func (api *ConsensusAPI) NewPayloadV3(params engine.ExecutableData, versionedHas
case !api.checkFork(params.Timestamp, forks.Cancun):
return invalidStatus, unsupportedForkErr("newPayloadV3 must only be called for cancun payloads")
}
return api.newPayload(params, versionedHashes, beaconRoot, nil, false)
return api.newPayload(ctx, params, versionedHashes, beaconRoot, nil, false)
}

// NewPayloadV4 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) NewPayloadV4(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash, executionRequests []hexutil.Bytes) (engine.PayloadStatusV1, error) {
// TODO(jrhea): Decide whether validation errors should be recorded as span errors.
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadV4", params)
defer span.End()
switch {
case params.Withdrawals == nil:
return invalidStatus, paramsErr("nil withdrawals post-shanghai")
Expand All @@ -679,10 +707,10 @@ func (api *ConsensusAPI) NewPayloadV4(params engine.ExecutableData, versionedHas
if err := validateRequests(requests); err != nil {
return engine.PayloadStatusV1{Status: engine.INVALID}, engine.InvalidParams.With(err)
}
return api.newPayload(params, versionedHashes, beaconRoot, requests, false)
return api.newPayload(ctx, params, versionedHashes, beaconRoot, requests, false)
}

func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash, requests [][]byte, witness bool) (engine.PayloadStatusV1, error) {
func (api *ConsensusAPI) newPayload(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash, requests [][]byte, witness bool) (engine.PayloadStatusV1, error) {
// The locking here is, strictly, not required. Without these locks, this can happen:
//
// 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to
Expand All @@ -700,7 +728,17 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
defer api.newPayloadLock.Unlock()

log.Trace("Engine API request received", "method", "NewPayload", "number", params.Number, "hash", params.BlockHash)

tracer := getEngineTracer()
rootSpan := trace.SpanFromContext(ctx)
_, decodeSpan := tracer.Start(ctx, "payload.decode")
block, err := engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot, requests)
if err != nil {
decodeSpan.RecordError(err)
decodeSpan.SetStatus(codes.Error, err.Error())
rootSpan.SetStatus(codes.Error, err.Error())
}
decodeSpan.End()
if err != nil {
bgu := "nil"
if params.BlobGasUsed != nil {
Expand Down Expand Up @@ -773,7 +811,14 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
}
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number())
_, importSpan := tracer.Start(ctx, "block.import")
proofs, err := api.eth.BlockChain().InsertBlockWithoutSetHead(block, witness)
if err != nil {
importSpan.RecordError(err)
importSpan.SetStatus(codes.Error, err.Error())
rootSpan.SetStatus(codes.Error, err.Error())
}
importSpan.End()
if err != nil {
log.Warn("NewPayload: inserting block failed", "error", err)

Expand Down
7 changes: 6 additions & 1 deletion eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,12 @@ func setupBlocks(t *testing.T, ethservice *eth.Ethereum, n int, parent *types.He
}

envelope := getNewEnvelope(t, api, parent, w, h)
execResp, err := api.newPayload(*envelope.ExecutionPayload, []common.Hash{}, h, envelope.Requests, false)

// NOTE: This span is for the test harness only. Engine RPC root spans are created
// in NewPayloadV* entrypoints; this test calls newPayload directly.
ctx, span := startNewPayloadSpan(context.Background(), "engine.api_test.setupBlocks", *envelope.ExecutionPayload)
defer span.End()
execResp, err := api.newPayload(ctx, *envelope.ExecutionPayload, []common.Hash{}, h, envelope.Requests, false)
if err != nil {
t.Fatalf("can't execute payload: %v", err)
}
Expand Down
78 changes: 78 additions & 0 deletions eth/catalyst/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package catalyst

import (
"context"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

var (
tracerInitOnce sync.Once

engineTracer trace.TracerProvider = noop.NewTracerProvider()
engineSDKTracer *sdktrace.TracerProvider

Check failure on line 24 in eth/catalyst/otel.go

View workflow job for this annotation

GitHub Actions / Lint

var engineSDKTracer is unused (unused)
)

// initEngineTelemetry initializes the OpenTelemetry tracing for the engine API.
func initEngineTelemetry() {
// TODO(jrhea): allow caller to provide init context once lifecycle plumbing exists.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// TODO(jrhea): make endpoint configurable via flags.
exporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint("localhost:4317"),
otlptracegrpc.WithInsecure(),
)
if err != nil {
log.Warn("OpenTelemetry exporter init failed, using no-op tracer", "err", err)
return
}
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName("geth"),
semconv.ServiceNamespace("engine"),
),
)
if err != nil {
log.Warn("OpenTelemetry resource init failed", "err", err)
res = resource.Empty()
}

// TODO(jrhea): Intentionally use a synchronous exporter + AlwaysSample
// for initial testing. This will be replaced with a batched exporter
// and production-ready sampling.
tracer := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
sdktrace.WithResource(res),
)
otel.SetTracerProvider(tracer)
otel.SetTextMapPropagator(propagation.TraceContext{})
engineTracer = tracer
engineSDKTracer = tracer
}

// TODO(jrhea): wire into geth/node lifecycle.
func shutdownEngineTelemetry(ctx context.Context) error {

Check failure on line 68 in eth/catalyst/otel.go

View workflow job for this annotation

GitHub Actions / Lint

func shutdownEngineTelemetry is unused (unused)
if engineSDKTracer != nil {
return engineSDKTracer.Shutdown(ctx)
}
return nil
}

func getEngineTracer() trace.Tracer {
tracerInitOnce.Do(initEngineTelemetry)
return engineTracer.Tracer("geth/engine")
}
10 changes: 8 additions & 2 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package catalyst

import (
"context"
"crypto/rand"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -255,8 +256,13 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
requests = envelope.Requests
}

// Mark the payload as canon
_, err = c.engineAPI.newPayload(*payload, blobHashes, beaconRoot, requests, false)
// NOTE: This span is for the simulated beacon harness only. It is intentionally
// named differently from engine.newPayload* to avoid double-rooting or
// mislabeling Engine RPC spans. Real tracing of engine_newPayload* is
// performed at the Engine API entrypoints.
ctx, span := startNewPayloadSpan(context.Background(), "engine.simulatedBeacon.sealBlock", *payload)
defer span.End()
_, err = c.engineAPI.newPayload(ctx, *payload, blobHashes, beaconRoot, requests, false)
if err != nil {
return err
}
Expand Down
17 changes: 13 additions & 4 deletions eth/catalyst/witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package catalyst

import (
"context"
"errors"
"strconv"
"time"
Expand Down Expand Up @@ -90,7 +91,9 @@ func (api *ConsensusAPI) NewPayloadWithWitnessV1(params engine.ExecutableData) (
if params.Withdrawals != nil {
return engine.PayloadStatusV1{Status: engine.INVALID}, engine.InvalidParams.With(errors.New("withdrawals not supported in V1"))
}
return api.newPayload(params, nil, nil, nil, true)
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadWithWitnessV1", params)
defer span.End()
return api.newPayload(ctx, params, nil, nil, nil, true)
}

// NewPayloadWithWitnessV2 is analogous to NewPayloadV2, only it also generates
Expand All @@ -112,7 +115,9 @@ func (api *ConsensusAPI) NewPayloadWithWitnessV2(params engine.ExecutableData) (
case params.BlobGasUsed != nil:
return invalidStatus, paramsErr("non-nil blobGasUsed pre-cancun")
}
return api.newPayload(params, nil, nil, nil, true)
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadWithWitnessV2", params)
defer span.End()
return api.newPayload(ctx, params, nil, nil, nil, true)
}

// NewPayloadWithWitnessV3 is analogous to NewPayloadV3, only it also generates
Expand All @@ -132,7 +137,9 @@ func (api *ConsensusAPI) NewPayloadWithWitnessV3(params engine.ExecutableData, v
case !api.checkFork(params.Timestamp, forks.Cancun):
return invalidStatus, unsupportedForkErr("newPayloadV3 must only be called for cancun payloads")
}
return api.newPayload(params, versionedHashes, beaconRoot, nil, true)
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadWithWitnessV3", params)
defer span.End()
return api.newPayload(ctx, params, versionedHashes, beaconRoot, nil, true)
}

// NewPayloadWithWitnessV4 is analogous to NewPayloadV4, only it also generates
Expand All @@ -158,7 +165,9 @@ func (api *ConsensusAPI) NewPayloadWithWitnessV4(params engine.ExecutableData, v
if err := validateRequests(requests); err != nil {
return engine.PayloadStatusV1{Status: engine.INVALID}, engine.InvalidParams.With(err)
}
return api.newPayload(params, versionedHashes, beaconRoot, requests, true)
ctx, span := startNewPayloadSpan(context.Background(), "engine.newPayloadWithWitnessV4", params)
defer span.End()
return api.newPayload(ctx, params, versionedHashes, beaconRoot, requests, true)
}

// ExecuteStatelessPayloadV1 is analogous to NewPayloadV1, only it operates in
Expand Down
27 changes: 23 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.2
github.com/golang/snappy v1.0.0
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v1.3.0
github.com/hashicorp/go-bexpr v0.1.10
Expand All @@ -58,16 +58,21 @@ require (
github.com/rs/cors v1.7.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/status-im/keycard-go v0.2.0
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/urfave/cli/v2 v2.27.5
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
go.uber.org/automaxprocs v1.5.2
go.uber.org/goleak v1.3.0
golang.org/x/crypto v0.36.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/sync v0.12.0
golang.org/x/sys v0.36.0
golang.org/x/sys v0.39.0
golang.org/x/text v0.23.0
golang.org/x/time v0.9.0
golang.org/x/tools v0.29.0
Expand All @@ -76,6 +81,20 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.63.2 // indirect
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
Expand Down Expand Up @@ -138,7 +157,7 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
Expand Down
Loading
Loading