Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
35b0ed1
Switch to grpc instead of connectweb
billettc Feb 3, 2026
a17a4d2
_ "google.golang.org/grpc/experimental"
billettc Feb 3, 2026
9aa939c
Setting stream response compressor
billettc Feb 3, 2026
371bf75
Refactor compressor selection to use a map-based lookup and remove `s…
billettc Feb 3, 2026
948d746
Initialize `out` map in `compressorsFromHeader` function to prevent n…
billettc Feb 3, 2026
21ae6b4
Move compression management to grpc
billettc Feb 4, 2026
e9661c3
Pass `false` parameter to `ListenTier2` call to disable default behav…
billettc Feb 4, 2026
9dd1d4e
Log error when Substreams client initialization fails and update hand…
billettc Feb 4, 2026
e49b276
Parallel support of grpc and connect servers
billettc Feb 10, 2026
dfc96aa
SAdd missing file bump dgrpc
billettc Feb 10, 2026
6eacb86
Refactor `ListenTier1` to unify gRPC and Connect server handling and …
billettc Feb 10, 2026
9e93fbf
Add logging for request content type in HTTP handler
billettc Feb 10, 2026
b0eb994
Remove unused imports and commented-out VT Proto registration code
billettc Feb 10, 2026
6196f15
Add vtprotobuf gRPC codec import to server initialization
billettc Feb 10, 2026
ed958e9
Remove vtprotobuf gRPC codec import and associated registration code
billettc Feb 10, 2026
95c099e
Fix dead lock in message buffer
billettc Feb 10, 2026
0c19a1b
Add Connect handler for `InfoServer` and integrate it into Tier1 serv…
billettc Feb 11, 2026
4a95c2a
Update `dgrpc` dependency, add buffer size configuration, and refacto…
billettc Feb 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions app/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,28 @@ import (
"fmt"
"net/url"
"os"
"strings"
"time"

"connectrpc.com/connect"
connectrpc "connectrpc.com/connect"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/blockstream"
"github.com/streamingfast/bstream/hub"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
dauth "github.com/streamingfast/dauth"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/dsession"
"github.com/streamingfast/dstore"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/shutter"
"github.com/streamingfast/substreams/client"
"github.com/streamingfast/substreams/metrics"
pbsubstreamsrpcv2connect "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcv2connect"
pbsubstreamsrpcv2 "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcv2connect"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/service"
"github.com/streamingfast/substreams/service/connect"
"github.com/streamingfast/substreams/wasm"

_ "github.com/streamingfast/substreams/wasm/wasmtime"
"github.com/streamingfast/substreams/wasm/wazero"
"go.uber.org/atomic"
Expand Down Expand Up @@ -86,7 +88,8 @@ type Tier1Config struct {
SubrequestsInsecure bool
SubrequestsPlaintext bool

SharedCacheSize uint64
SharedCacheSize uint64
ExecOutMessageBufferSize uint64

WASMExtensions wasm.WASMExtensioner
Tracing bool
Expand Down Expand Up @@ -248,7 +251,7 @@ func (a *Tier1App) Run() error {
FoundationalStoreEndpoints: foundationalStoreEndpoints,
}

svc, err := service.NewTier1(
tier1Service, err := service.NewTier1(
a.logger,
mergedBlocksStore,
forkedBlocksStore,
Expand All @@ -266,6 +269,7 @@ func (a *Tier1App) Run() error {
a.config.ActiveRequestsSoftLimit,
a.config.ActiveRequestsHardLimit,
a.config.SharedCacheSize,
a.config.ExecOutMessageBufferSize,
a.modules.SessionPool,
foundationalStoreEndpoints,
opts...,
Expand All @@ -274,14 +278,16 @@ func (a *Tier1App) Run() error {
return err
}

tier1ServiceConnect := connect.NewService(tier1Service)

a.OnTerminating(func(err error) {
metrics.AppReadinessTier1.SetNotReady()

svc.Shutdown(err)
tier1Service.Shutdown(err)
})

go func() {
var infoServer pbsubstreamsrpcv2connect.EndpointInfoHandler
var infoServer pbsubstreamsrpcv2.EndpointInfoServer
if a.modules.InfoServer != nil {
a.logger.Info("waiting until info server is ready")
infoServer = &InfoServerWrapper{a.modules.InfoServer}
Expand All @@ -291,6 +297,12 @@ func (a *Tier1App) Run() error {
}
}

var infoServerConnect pbsubstreamsrpcv2connect.EndpointInfoHandler
if a.modules.InfoServer != nil {
infoServerConnect = &InfoServerConnectWrapper{a.modules.InfoServer}
a.logger.Info("info server ready")
}

if withLive {
a.logger.Info("waiting until hub is real-time synced")
select {
Expand All @@ -304,7 +316,23 @@ func (a *Tier1App) Run() error {
a.logger.Info("launching gRPC server", zap.Bool("live_support", withLive))
a.setIsReady(true)

err := service.ListenTier1(a.config.GRPCListenAddr, svc, infoServer, a.modules.Authenticator, a.logger, a.HealthCheck)
secureGrpcProxyAddr := ":9000"
plaintextGrpcProxyAddr := ":8080"

addresses := strings.Split(a.config.GRPCListenAddr, ",")
addressCount := len(addresses)
if addressCount == 0 {
a.logger.Error("no gRPC listen addresses provided")
return
}
if addressCount > 0 {
secureGrpcProxyAddr = addresses[0]
}
if addressCount > 1 {
plaintextGrpcProxyAddr = addresses[1]
}

err := service.ListenTier1(secureGrpcProxyAddr, plaintextGrpcProxyAddr, tier1Service, tier1ServiceConnect, infoServer, infoServerConnect, a.modules.Authenticator, a.logger, a.HealthCheck, a.config.EnforceCompression)
a.Shutdown(err)
}()

Expand Down Expand Up @@ -348,17 +376,29 @@ func (config *Tier1Config) Validate() error {
return nil
}

var _ pbsubstreamsrpcv2connect.EndpointInfoHandler = (*InfoServerWrapper)(nil)
var _ pbsubstreamsrpcv2.EndpointInfoServer = (*InfoServerWrapper)(nil)

type InfoServerWrapper struct {
rpcInfoServer InfoServer
rpcInfoServer pbsubstreamsrpcv2.EndpointInfoServer
}

// Info implements pbsubstreamsrpcconnect.EndpointInfoHandler.
func (i *InfoServerWrapper) Info(ctx context.Context, req *connect.Request[pbfirehose.InfoRequest]) (*connect.Response[pbfirehose.InfoResponse], error) {
func (i *InfoServerWrapper) Info(ctx context.Context, req *pbfirehose.InfoRequest) (*pbfirehose.InfoResponse, error) {
resp, err := i.rpcInfoServer.Info(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}

type InfoServerConnectWrapper struct {
rpcInfoServer pbsubstreamsrpcv2.EndpointInfoServer
}

func (i *InfoServerConnectWrapper) Info(ctx context.Context, req *connectrpc.Request[pbfirehose.InfoRequest]) (*connectrpc.Response[pbfirehose.InfoResponse], error) {
resp, err := i.rpcInfoServer.Info(ctx, req.Msg)
if err != nil {
return nil, err
}
return connect.NewResponse(resp), nil
return connectrpc.NewResponse(resp), nil
}
2 changes: 1 addition & 1 deletion app/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (a *Tier2App) Run() error {
a.logger.Info("launching gRPC server")
a.setIsReady(true)

err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck)
err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck, false)
a.Shutdown(err)
}()

Expand Down
9 changes: 9 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ plugins:
- Msf/substreams/foundational-store/model/v2/model.proto=github.com/streamingfast/substreams/pb/sf/substreams/foundational-store/model/v2;pbmodel
- Msf/substreams/foundational-store/service/v2/service.proto=github.com/streamingfast/substreams/pb/sf/substreams/foundational-store/service/v2;pbservice
- Msf/substreams/foundational-store/service/v1/service.proto=github.com/streamingfast/substreams/pb/sf/substreams/foundational-store/service/v1;pbstore
- remote: buf.build/community/planetscale-vtprotobuf:v0.6.0
out: pb
opt:
- paths=source_relative
- features=marshal+unmarshal+size+equal+clone
- Msf/codegen/conversation/v1/conversation.proto=github.com/streamingfast/substreams/pb/sf/codegen/conversation/v1;pbconvo
- Msf/substreams/foundational-store/model/v2/model.proto=github.com/streamingfast/substreams/pb/sf/substreams/foundational-store/model/v2;pbmodel
- Msf/substreams/foundational-store/service/v2/service.proto=github.com/streamingfast/substreams/pb/sf/substreams/foundational-store/service/v2;pbservice
- Msf/substreams/foundational-store/service/v1/service.proto=github.com/streamingfast/substreams/pb/sf/substreams/foundational-store/service/v1;pbstore
- remote: buf.build/connectrpc/go:v1.15.0
out: pb
opt:
Expand Down
98 changes: 93 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package client

import (
"context"
"crypto/tls"
"fmt"
"log"
"net/url"
"os"
"regexp"
"time"

"github.com/dustin/go-humanize"
"github.com/mostynb/go-grpc-compression/experimental/s2"
"github.com/streamingfast/dgrpc"
networks "github.com/streamingfast/firehose-networks"
"github.com/streamingfast/logging/zapx"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/oauth2"
Expand All @@ -20,10 +24,16 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/oauth"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/encoding/gzip"
_ "google.golang.org/grpc/encoding/gzip"
stats "google.golang.org/grpc/stats"
_ "google.golang.org/grpc/xds"
)

//func init() {
// fmt.Println("------------------ VT Proto registered ------------------------")
// encoding.RegisterCodec(vt.Codec{})
//}

type AuthType int

const (
Expand Down Expand Up @@ -162,6 +172,70 @@ func (c *SubstreamsClientConfig) MarshalLogObject(encoder zapcore.ObjectEncoder)
return nil
}

type sizeLoggingHandler struct {
messageCount int
timeStart time.Time

uncompressedBytes int
compressedBytes int
wireBytes int
lastReceivedTime time.Time
waitToReceive time.Duration
}

func (h *sizeLoggingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
zlog.Info("gRPC client RPC started", zap.String("method", info.FullMethodName))
return ctx
}

func (h *sizeLoggingHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
//fmt.Println("handle rpc:", h.messageCount)
if inPayload, ok := rs.(*stats.InPayload); ok && inPayload != nil {
if h.messageCount == 0 {
h.timeStart = time.Now()
}
h.waitToReceive += time.Since(h.lastReceivedTime)
h.lastReceivedTime = time.Now()
h.messageCount++
h.uncompressedBytes += inPayload.Length
h.compressedBytes += inPayload.CompressedLength
h.wireBytes += inPayload.WireLength

if h.messageCount == 10000 {
secs := time.Since(h.timeStart).Seconds()
messagesPerSecond := float64(h.messageCount) / secs
compressedPercentage := 100.0 - (float64(h.compressedBytes) / float64(h.uncompressedBytes) * 100.0)

zlog.Info(
"grpc io stats",
zap.Float64("msg_sec", messagesPerSecond),
zapx.HumanDuration("duration", time.Since(h.timeStart)),
zapx.HumanDuration("wait_to_receive", h.waitToReceive),
zap.String("compression_ratio", fmt.Sprintf("%.2f%%", compressedPercentage)),
zap.String("uncompressed", humanize.Bytes(uint64(h.uncompressedBytes))),
zap.String("compressed", humanize.Bytes(uint64(h.compressedBytes))),
zap.Int("uncompressed_bytes", h.uncompressedBytes),
zap.Int("compressed_bytes", h.compressedBytes),
zap.Bool("keep", false),
)

h.timeStart = time.Now()
h.messageCount = 0
h.uncompressedBytes = 0
h.compressedBytes = 0
h.wireBytes = 0
h.waitToReceive = 0
}
}
}

func (h *sizeLoggingHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
return ctx
}

func (h *sizeLoggingHandler) HandleConn(ctx context.Context, cs stats.ConnStats) {
}

type InternalClientFactory = func() (cli pbssinternal.SubstreamsClient, closeFunc func() error, callOpts []grpc.CallOption, headers Headers, err error)

// NewSubstreamsClientConfig creates a new SubstreamsClientConfig using the provided options.
Expand Down Expand Up @@ -243,6 +317,9 @@ func NewInternalClientFactory(config *SubstreamsClientConfig) InternalClientFact

noop := func() error { return nil }
cli, _, callOpts, headers, err := NewSubstreamsInternalClient(config)
if err != nil {
zlog.Error("failed to create substreams client", zap.Error(err))
}
return func() (pbssinternal.SubstreamsClient, func() error, []grpc.CallOption, Headers, error) {
return cli, noop, callOpts, headers, err
}
Expand Down Expand Up @@ -285,7 +362,8 @@ func NewSubstreamsInternalClient(config *SubstreamsClientConfig) (cli pbssintern
}
}

dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
sizeHandler := &sizeLoggingHandler{}
dialOptions = append(dialOptions, grpc.WithStatsHandler(sizeHandler))

zlog.Debug("getting connection", zap.String("endpoint", endpoint))
conn, err := dgrpc.NewExternalClientConn(endpoint, dialOptions...)
Expand Down Expand Up @@ -354,12 +432,22 @@ func newConnection(config *SubstreamsClientConfig) (conn *grpc.ClientConn, close
}
}

dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
sizeHandler := &sizeLoggingHandler{}
dialOptions = append(dialOptions, grpc.WithStatsHandler(sizeHandler))

//compressor := os.Getenv("GRPC_COMPRESSOR")
//switch compressor {
//case "gzip":
//dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
//case "s2":
dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(s2.Name)))
//}

dialOptions = append(dialOptions, grpc.WithUserAgent(config.agent))

zlog.Debug("getting connection", zap.String("endpoint", endpoint))
conn, err = dgrpc.NewExternalClient(endpoint, dialOptions...)

if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to create external gRPC client: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.25.0

toolchain go1.25.4

replace github.com/streamingfast/dgrpc => ../dgrpc

require (
github.com/golang/protobuf v1.5.4
github.com/jhump/protoreflect v1.14.0
Expand All @@ -14,7 +16,7 @@ require (
github.com/streamingfast/dauth v0.0.0-20251218134044-fb716c7172b4
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20250814163534-bd7407bd89d7
github.com/streamingfast/dgrpc v0.0.0-20251218142640-027692a12722
github.com/streamingfast/dgrpc v0.0.0-20260211152336-d4e7023003dd
github.com/streamingfast/dhttp v0.1.3-0.20251218140957-6d46b8f12eb1
github.com/streamingfast/dstore v0.1.3-0.20260113210117-94d66eda2027
github.com/streamingfast/logging v0.0.0-20260108192805-38f96de0a641
Expand All @@ -29,7 +31,6 @@ require (
require (
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.16.1-20240117202343-bf8f65e8876c.1
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.33.0-20240117202343-bf8f65e8876c.1
buf.build/go/hyperpb v0.1.3
connectrpc.com/connect v1.19.1
github.com/KimMachineGun/automemlimit v0.7.5
github.com/RoaringBitmap/roaring v1.9.1
Expand Down Expand Up @@ -162,7 +163,6 @@ require (
github.com/streamingfast/validator v0.0.0-20231124184318-71ec8080e4ae // indirect
github.com/thedevsaddam/govalidator v1.9.6 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/timandy/routine v1.1.5 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
Expand Down
Loading
Loading