Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ require (
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0 // indirect
github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260128151123-605e9540b706 // indirect
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
github.com/smartcontractkit/chainlink-protos/svr v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,8 @@ github.com/smartcontractkit/chainlink-protos/op-catalog v0.0.4 h1:AEnxv4HM3WD1Rb
github.com/smartcontractkit/chainlink-protos/op-catalog v0.0.4/go.mod h1:PjZD54vr6rIKEKQj6HNA4hllvYI/QpT+Zefj3tqkFAs=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0 h1:0eroOyBwmdoGUwUdvMI0/J7m5wuzNnJDMglSOK1sfNY=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260128151123-605e9540b706 h1:z3sQK3dyfl9Rbm8Inj8irwvX6yQihASp1UvMjrfz6/w=
github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260128151123-605e9540b706/go.mod h1:aifeP3SnsVrO1eSN5Smur3iHjAmi3poaLt6TAbgK0Hw=
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA=
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w=
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+CMJ26elVw/cAJqqhBQ3Xa/mBYWK0/rQ5MuI=
Expand Down
2 changes: 1 addition & 1 deletion core/services/arbiter/arbiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-common/pkg/services"
ringpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)
Expand Down
14 changes: 7 additions & 7 deletions core/services/arbiter/arbiter_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
pb "github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"
)

// RingArbiterHandler implements the ArbiterScalerServer interface from chainlink-common.
Expand All @@ -15,7 +15,7 @@ import (
// - Status(): Returns routable shard count and per-shard health for Ring OCR routing decisions
// - ConsensusWantShards(): Receives the Ring consensus decision about desired shard count
type RingArbiterHandler struct {
pb.UnimplementedArbiterScalerServer
ringpb.UnimplementedArbiterScalerServer
state *State
lggr logger.Logger
}
Expand All @@ -31,7 +31,7 @@ func NewRingArbiterHandler(state *State, lggr logger.Logger) *RingArbiterHandler
// Status returns the current replica status for Ring OCR routing.
// Returns only READY shards count and per-shard health status.
// This is called by the Ring plugin to determine which shards can receive traffic.
func (h *RingArbiterHandler) Status(ctx context.Context, _ *emptypb.Empty) (*pb.ReplicaStatus, error) {
func (h *RingArbiterHandler) Status(ctx context.Context, _ *emptypb.Empty) (*ringpb.ReplicaStatus, error) {
routable := h.state.GetRoutableShards()

h.lggr.Debugw("Status requested",
Expand All @@ -40,25 +40,25 @@ func (h *RingArbiterHandler) Status(ctx context.Context, _ *emptypb.Empty) (*pb.
)

// Convert internal shard health to protobuf ShardStatus
shardStatus := make(map[uint32]*pb.ShardStatus, len(routable.ShardInfo))
shardStatus := make(map[uint32]*ringpb.ShardStatus, len(routable.ShardInfo))
for shardID, health := range routable.ShardInfo {
shardStatus[shardID] = &pb.ShardStatus{
shardStatus[shardID] = &ringpb.ShardStatus{
IsHealthy: health.IsHealthy,
}
}

// TODO: Rename WantShards to ReadyShards in protobuf (breaking change)
// The field name "WantShards" is misleading - it actually represents
// the number of shards ready for routing, not what Ring "wants".
return &pb.ReplicaStatus{
return &ringpb.ReplicaStatus{
WantShards: uint32(routable.ReadyCount), //nolint:gosec // G115: replica count bounded
Status: shardStatus,
}, nil
}

// ConsensusWantShards is called by the Ring consensus to report the desired number of shards.
// The consensus has agreed on how many shards the system should have.
func (h *RingArbiterHandler) ConsensusWantShards(ctx context.Context, req *pb.ConsensusWantShardsRequest) (*emptypb.Empty, error) {
func (h *RingArbiterHandler) ConsensusWantShards(ctx context.Context, req *ringpb.ConsensusWantShardsRequest) (*emptypb.Empty, error) {
nShards := req.GetNShards()

if nShards == 0 {
Expand Down
14 changes: 7 additions & 7 deletions core/services/arbiter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,33 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
pb "github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"
)

// RingArbiterClient implements pb.ArbiterScalerClient by calling
// RingArbiterClient implements ringpb.ArbiterScalerClient by calling
// the ArbiterScalerServer directly without going over gRPC.
// This is used by Ring OCR to communicate with the Arbiter in-process.
type RingArbiterClient struct {
server pb.ArbiterScalerServer
server ringpb.ArbiterScalerServer
lggr logger.Logger
}

var _ pb.ArbiterScalerClient = (*RingArbiterClient)(nil)
var _ ringpb.ArbiterScalerClient = (*RingArbiterClient)(nil)

// NewRingArbiterClient creates a new RingArbiterClient.
func NewRingArbiterClient(server pb.ArbiterScalerServer, lggr logger.Logger) *RingArbiterClient {
func NewRingArbiterClient(server ringpb.ArbiterScalerServer, lggr logger.Logger) *RingArbiterClient {
return &RingArbiterClient{
server: server,
lggr: logger.Named(lggr, "RingArbiterClient"),
}
}

// Status returns the current replica status by calling the server directly.
func (c *RingArbiterClient) Status(ctx context.Context, in *emptypb.Empty, _ ...grpc.CallOption) (*pb.ReplicaStatus, error) {
func (c *RingArbiterClient) Status(ctx context.Context, in *emptypb.Empty, _ ...grpc.CallOption) (*ringpb.ReplicaStatus, error) {
return c.server.Status(ctx, in)
}

// ConsensusWantShards notifies the Arbiter about the desired shard count by calling the server directly.
func (c *RingArbiterClient) ConsensusWantShards(ctx context.Context, in *pb.ConsensusWantShardsRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
func (c *RingArbiterClient) ConsensusWantShards(ctx context.Context, in *ringpb.ConsensusWantShardsRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
return c.server.ConsensusWantShards(ctx, in)
}
21 changes: 10 additions & 11 deletions core/services/arbiter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,28 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/emptypb"

pb "github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"

ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// mockArbiterScalerServer implements pb.ArbiterScalerServer for testing.
// mockArbiterScalerServer implements ringpb.ArbiterScalerServer for testing.
type mockArbiterScalerServer struct {
pb.UnimplementedArbiterScalerServer
statusResp *pb.ReplicaStatus
ringpb.UnimplementedArbiterScalerServer
statusResp *ringpb.ReplicaStatus
statusErr error
consensusErr error
consensusCalled bool
lastNShards uint32
}

func (m *mockArbiterScalerServer) Status(ctx context.Context, _ *emptypb.Empty) (*pb.ReplicaStatus, error) {
func (m *mockArbiterScalerServer) Status(ctx context.Context, _ *emptypb.Empty) (*ringpb.ReplicaStatus, error) {
if m.statusErr != nil {
return nil, m.statusErr
}
return m.statusResp, nil
}

func (m *mockArbiterScalerServer) ConsensusWantShards(ctx context.Context, req *pb.ConsensusWantShardsRequest) (*emptypb.Empty, error) {
func (m *mockArbiterScalerServer) ConsensusWantShards(ctx context.Context, req *ringpb.ConsensusWantShardsRequest) (*emptypb.Empty, error) {
m.consensusCalled = true
m.lastNShards = req.GetNShards()
if m.consensusErr != nil {
Expand All @@ -45,9 +44,9 @@ func TestRingArbiterClient_Status(t *testing.T) {

t.Run("returns status from server", func(t *testing.T) {
mockServer := &mockArbiterScalerServer{
statusResp: &pb.ReplicaStatus{
statusResp: &ringpb.ReplicaStatus{
WantShards: 5,
Status: map[uint32]*pb.ShardStatus{
Status: map[uint32]*ringpb.ShardStatus{
0: {IsHealthy: true},
1: {IsHealthy: true},
2: {IsHealthy: false},
Expand Down Expand Up @@ -89,7 +88,7 @@ func TestRingArbiterClient_ConsensusWantShards(t *testing.T) {
mockServer := &mockArbiterScalerServer{}

client := NewRingArbiterClient(mockServer, lggr)
req := &pb.ConsensusWantShardsRequest{NShards: 10}
req := &ringpb.ConsensusWantShardsRequest{NShards: 10}
resp, err := client.ConsensusWantShards(context.Background(), req)

require.NoError(t, err)
Expand All @@ -105,7 +104,7 @@ func TestRingArbiterClient_ConsensusWantShards(t *testing.T) {
}

client := NewRingArbiterClient(mockServer, lggr)
req := &pb.ConsensusWantShardsRequest{NShards: 5}
req := &ringpb.ConsensusWantShardsRequest{NShards: 5}
resp, err := client.ConsensusWantShards(context.Background(), req)

require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/services/arbiter/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

ringpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)
Expand Down
2 changes: 1 addition & 1 deletion core/services/arbiter/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/status"

"github.com/smartcontractkit/chainlink-common/pkg/services"
ringpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)
Expand Down
2 changes: 1 addition & 1 deletion core/services/arbiter/shardconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (s *shardConfigSyncer) fetchAndCache(ctx context.Context) {
ctx,
bc.ReadIdentifier(GetDesiredShardCountMethod),
primitives.Unconfirmed,
nil, // No input params
nil, // No input params
&result,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/shardorchestrator"
"github.com/smartcontractkit/chainlink-evm/pkg/chains/legacyevm"
"github.com/smartcontractkit/chainlink-evm/pkg/keys"
"github.com/smartcontractkit/chainlink-evm/pkg/logpoller"
"github.com/smartcontractkit/chainlink-evm/pkg/txmgr"
evmutils "github.com/smartcontractkit/chainlink-evm/pkg/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator"

"github.com/smartcontractkit/chainlink/v2/core/services/ccv/ccvcommitteeverifier"
"github.com/smartcontractkit/chainlink/v2/core/services/ccv/ccvexecutor"
Expand Down
7 changes: 3 additions & 4 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ import (
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/shardorchestrator"
datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink-evm/pkg/chains/legacyevm"
"github.com/smartcontractkit/chainlink-evm/pkg/keys"
"github.com/smartcontractkit/chainlink/v2/core/services/ring"
"github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
Expand Down Expand Up @@ -94,7 +94,6 @@ import (
functionsRelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/functions"
evmmercury "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
localshardorch "github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
Expand Down Expand Up @@ -1075,7 +1074,7 @@ func (d *Delegate) newServicesRing(
ringStore := ring.NewStore()
shardOrchestratorStore := shardorchestrator.NewStore(lggr)
// Start ShardOrchestrator
orchestratorSvc := localshardorch.New(
orchestratorSvc := shardorchestrator.New(
int(shardingCfg.ShardOrchestratorPort()),
shardOrchestratorStore,
lggr,
Expand Down
30 changes: 15 additions & 15 deletions core/services/ocr2/plugins/ring/ring_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,38 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/ring/pb"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/shardorchestrator"
ringpb "github.com/smartcontractkit/chainlink-protos/ring/go"
"github.com/smartcontractkit/chainlink/v2/core/services/ring"
"github.com/smartcontractkit/chainlink/v2/core/services/shardorchestrator"
)

// mockArbiterScalerClient implements pb.ArbiterScalerClient for testing.
// mockArbiterScalerClient implements ringpb.ArbiterScalerClient for testing.
// It allows configuring the return values for Status and ConsensusWantShards.
type mockArbiterScalerClient struct {
wantShards uint32
shardStatus map[uint32]*pb.ShardStatus
shardStatus map[uint32]*ringpb.ShardStatus
statusErr error
consensusErr error
}

func newMockArbiterScalerClient() *mockArbiterScalerClient {
return &mockArbiterScalerClient{
wantShards: 1,
shardStatus: map[uint32]*pb.ShardStatus{0: {IsHealthy: true}},
shardStatus: map[uint32]*ringpb.ShardStatus{0: {IsHealthy: true}},
}
}

func (m *mockArbiterScalerClient) Status(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ReplicaStatus, error) {
func (m *mockArbiterScalerClient) Status(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*ringpb.ReplicaStatus, error) {
if m.statusErr != nil {
return nil, m.statusErr
}
return &pb.ReplicaStatus{
return &ringpb.ReplicaStatus{
WantShards: m.wantShards,
Status: m.shardStatus,
}, nil
}

func (m *mockArbiterScalerClient) ConsensusWantShards(_ context.Context, _ *pb.ConsensusWantShardsRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
func (m *mockArbiterScalerClient) ConsensusWantShards(_ context.Context, _ *ringpb.ConsensusWantShardsRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
if m.consensusErr != nil {
return nil, m.consensusErr
}
Expand All @@ -59,9 +59,9 @@ func TestRingStoreIntegration(t *testing.T) {
require.True(t, health[1])

// Set steady state routing (required for GetShardForWorkflow without OCR)
store.SetRoutingState(&pb.RoutingState{
store.SetRoutingState(&ringpb.RoutingState{
Id: 1,
State: &pb.RoutingState_RoutableShards{RoutableShards: 1},
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 1},
})

// Test workflow routing via consistent hashing
Expand All @@ -82,9 +82,9 @@ func TestRingStoreIntegration(t *testing.T) {
store.SetShardHealth(2, true)

// Set steady state with 3 shards
store.SetRoutingState(&pb.RoutingState{
store.SetRoutingState(&ringpb.RoutingState{
Id: 1,
State: &pb.RoutingState_RoutableShards{RoutableShards: 3},
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 3},
})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -106,9 +106,9 @@ func TestRingStoreIntegration(t *testing.T) {
store := ring.NewStore()

store.SetShardHealth(0, true)
store.SetRoutingState(&pb.RoutingState{
store.SetRoutingState(&ringpb.RoutingState{
Id: 1,
State: &pb.RoutingState_RoutableShards{RoutableShards: 1},
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 1},
})

// Manually set a workflow allocation
Expand Down
Loading
Loading