Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,594 changes: 1,594 additions & 0 deletions build/devenv/dashboards/heartbeat_agg.json

Large diffs are not rendered by default.

593 changes: 593 additions & 0 deletions build/devenv/dashboards/heartbeat_verifier.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions build/devenv/fakes/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/chain-selectors v1.0.90 // indirect
github.com/smartcontractkit/chainlink-common v0.9.6-0.20260114190811-74301cd99dc3 // indirect
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e // indirect
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect
github.com/smartcontractkit/chainlink-testing-framework/framework v0.13.6 // indirect
github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d // indirect
Expand Down
2 changes: 2 additions & 0 deletions build/devenv/fakes/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d h1:VYoBBNnQpZ5p+enPTl8SkKBRaubqyGpO0ul3B1np++I=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:oNFoKHRIerxuaANa8ASNejtHrdsG26LqGtQ2XhSac2g=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e h1:c7vgdeidC0LMtV1a01B/rPL4fEC/cnPanRDflRijXCM=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e/go.mod h1:rZV/gLc1wlSp2r5oXN09iOrlyZPFX4iK+cqoSW2k5dc=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ=
github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:5JdppgngCOUS76p61zCinSCgOhPeYQ+OcDUuome5THQ=
github.com/smartcontractkit/chainlink-testing-framework/framework v0.13.6 h1:JqMRimMu05jFs2iz4rduvodaVKe+E9VSJhEJ7dfgXwo=
Expand Down
28 changes: 27 additions & 1 deletion cmd/verifier/committee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
cmd "github.com/smartcontractkit/chainlink-ccv/cmd/verifier"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/accessors"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/blockchain"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient"
"github.com/smartcontractkit/chainlink-ccv/integration/storageaccess"
"github.com/smartcontractkit/chainlink-ccv/protocol"
"github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac"
Expand Down Expand Up @@ -179,7 +180,8 @@ func main() {
StorageBatchSize: 50,
StorageBatchTimeout: 100 * time.Millisecond,
StorageRetryDelay: 2 * time.Second,
CursePollInterval: 2 * time.Second, // Poll RMN Remotes for curse status every 2s
CursePollInterval: 2 * time.Second, // Poll RMN Remotes for curse status every 2s
HeartbeatInterval: 10 * time.Second, // Send heartbeat to aggregator every 10s
}

pk := os.Getenv(PkEnvVar)
Expand Down Expand Up @@ -218,6 +220,29 @@ func main() {
verifierMonitoring,
)

heartbeatClient, err := heartbeatclient.NewHeartbeatClient(
config.AggregatorAddress,
lggr,
hmacConfig,
config.InsecureAggregatorConnection,
)
if err != nil {
lggr.Errorw("Failed to create heartbeat client", "error", err)
os.Exit(1)
}
defer func() {
if heartbeatClient != nil {
_ = heartbeatClient.Close()
}
}()

observedHeartbeatClient := heartbeatclient.NewObservedHeartbeatClient(
heartbeatClient,
config.VerifierID,
lggr,
verifier.NewHeartbeatMonitoringAdapter(verifierMonitoring),
)

messageTracker := monitoring.NewMessageLatencyTracker(
lggr,
config.VerifierID,
Expand All @@ -235,6 +260,7 @@ func main() {
messageTracker,
verifierMonitoring,
chainStatusManager,
observedHeartbeatClient,
)
if err != nil {
lggr.Errorw("Failed to create verification coordinator", "error", err)
Expand Down
3 changes: 3 additions & 0 deletions cmd/verifier/token/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
cmd "github.com/smartcontractkit/chainlink-ccv/cmd/verifier"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/accessors"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/blockchain"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient"
"github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess"
"github.com/smartcontractkit/chainlink-ccv/protocol"
"github.com/smartcontractkit/chainlink-ccv/protocol/common/logging"
Expand Down Expand Up @@ -235,6 +236,7 @@ func createCCTPCoordinator(
messageTracker,
verifierMonitoring,
chainStatusManager,
heartbeatclient.NewNoopHeartbeatClient(),
)
if err != nil {
lggr.Errorw("Failed to create verification coordinator for cctp", "error", err)
Expand Down Expand Up @@ -282,6 +284,7 @@ func createLBTCCoordinator(
messageTracker,
verifierMonitoring,
chainStatusManager,
heartbeatclient.NewNoopHeartbeatClient(),
)
if err != nil {
lggr.Errorw("Failed to create verification coordinator for lbtc", "error", err)
Expand Down
3 changes: 3 additions & 0 deletions integration/pkg/constructors/committee_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient"
"github.com/smartcontractkit/chainlink-ccv/integration/pkg/sourcereader"
"github.com/smartcontractkit/chainlink-ccv/integration/storageaccess"
"github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess"
Expand Down Expand Up @@ -134,6 +135,7 @@ func NewVerificationCoordinator(
StorageBatchSize: 50,
StorageBatchTimeout: 100 * time.Millisecond,
StorageRetryDelay: 2 * time.Second,
HeartbeatInterval: 0, // Disabled by default
}

// Create commit verifier (with ECDSA signer)
Expand Down Expand Up @@ -161,6 +163,7 @@ func NewVerificationCoordinator(
messageTracker,
verifierMonitoring,
chainStatusManager,
heartbeatclient.NewNoopHeartbeatClient(),
)
if err != nil {
lggr.Errorw("Failed to create verification coordinator", "error", err)
Expand Down
76 changes: 76 additions & 0 deletions integration/pkg/heartbeatclient/heartbeatclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package heartbeatclient

import (
"context"
"crypto/tls"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
insecuregrpc "google.golang.org/grpc/credentials/insecure"

"github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1"
)

const (
MinTLSVersion = tls.VersionTLS13
)

// HeartbeatClient provides methods to send heartbeats to the aggregator service.
type HeartbeatClient struct {
client heartbeatpb.HeartbeatServiceClient
conn *grpc.ClientConn
lggr logger.Logger
}

// NewHeartbeatClient creates a new heartbeat client that communicates with the aggregator.
// If insecure is true, TLS verification is disabled (only for testing).
func NewHeartbeatClient(address string, lggr logger.Logger, hmacConfig *hmac.ClientConfig, insecure bool) (*HeartbeatClient, error) {
var dialOptions []grpc.DialOption
if insecure {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecuregrpc.NewCredentials()))
} else {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: MinTLSVersion})))
}

if hmacConfig != nil {
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(hmac.NewClientInterceptor(hmacConfig)))
}

conn, err := grpc.NewClient(
address,
dialOptions...,
)
if err != nil {
return nil, err
}

lggr.Infof("Created HeartbeatClient connecting to %s", address)

return &HeartbeatClient{
client: heartbeatpb.NewHeartbeatServiceClient(conn),
conn: conn,
lggr: logger.With(lggr, "service", "heartbeat_client", "aggregatorAddress", address),
}, nil
}

// SendHeartbeat sends a heartbeat request to the aggregator.
func (hc *HeartbeatClient) SendHeartbeat(ctx context.Context, req *heartbeatpb.HeartbeatRequest, opts ...grpc.CallOption) (*heartbeatpb.HeartbeatResponse, error) {
resp, err := hc.client.SendHeartbeat(ctx, req, opts...)
if err != nil {
hc.lggr.Errorw("Failed to send heartbeat", "error", err)
return nil, fmt.Errorf("failed to send heartbeat: %w", err)
}
hc.lggr.Debugw("Heartbeat sent successfully", "timestamp", req.SendTimestamp)
return resp, nil
}

// Close closes the gRPC connection to the aggregator server.
func (hc *HeartbeatClient) Close() error {
if hc.conn != nil {
return hc.conn.Close()
}
return nil
}
139 changes: 139 additions & 0 deletions integration/pkg/heartbeatclient/heartbeatclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package heartbeatclient_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient"
"github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1"
)

func TestNewHeartbeatClient_InvalidAddress(t *testing.T) {
lggr := logger.Test(t)

// Test with invalid address that can't be reached
client, err := heartbeatclient.NewHeartbeatClient("invalid://address", lggr, nil, true)
// Connection succeeds but will fail on actual send
require.NoError(t, err)
require.NotNil(t, client)
defer client.Close()
}

func TestHeartbeatClient_SendHeartbeat_Success(t *testing.T) {
lggr := logger.Test(t)

// Test basic client construction
client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true)
require.NoError(t, err)
require.NotNil(t, client)
defer client.Close()
}

func TestHeartbeatClient_SendHeartbeat_WithHMAC(t *testing.T) {
lggr := logger.Test(t)

// Create HMAC config
hmacConfig := &hmac.ClientConfig{
APIKey: "test-verifier",
Secret: "test-secret-key-1234567890ab",
}

// Client should be created successfully with HMAC config
client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, hmacConfig, true)
require.NoError(t, err)
require.NotNil(t, client)
defer client.Close()
}

func TestHeartbeatClient_Close(t *testing.T) {
lggr := logger.Test(t)

client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true)
require.NoError(t, err)
require.NotNil(t, client)

// Close should not error
err = client.Close()
// Note: Close() may return an error if there are pending operations
if err != nil {
t.Logf("First close returned error (expected): %v", err)
}

// Closing again - gRPC connections may error on second close
err = client.Close()
if err != nil {
t.Logf("Second close returned error (expected): %v", err)
}
}

func TestHeartbeatClient_SendHeartbeat_Timeout(t *testing.T) {
lggr := logger.Test(t)

client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true)
require.NoError(t, err)
require.NotNil(t, client)
defer client.Close()

// Create a context that times out immediately
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()

// This should fail due to timeout (since the server isn't actually running)
req := &heartbeatpb.HeartbeatRequest{
SendTimestamp: time.Now().Unix(),
ChainDetails: &heartbeatpb.ChainHealthDetails{
BlockHeightsByChain: map[uint64]uint64{42: 100},
},
}

// We expect an error (either deadline exceeded or connection refused)
_, err = client.SendHeartbeat(ctx, req)
assert.Error(t, err)
}

func TestHeartbeatClient_SendHeartbeat_NilRequest(t *testing.T) {
lggr := logger.Test(t)

client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true)
require.NoError(t, err)
require.NotNil(t, client)
defer client.Close()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// Sending nil request should fail
_, err = client.SendHeartbeat(ctx, nil)
assert.Error(t, err)
}

// TestHeartbeatClient_WithCallOptions tests that call options are properly passed through.
func TestHeartbeatClient_WithCallOptions(t *testing.T) {
lggr := logger.Test(t)

client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true)
require.NoError(t, err)
require.NotNil(t, client)
defer client.Close()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

req := &heartbeatpb.HeartbeatRequest{
SendTimestamp: time.Now().Unix(),
ChainDetails: &heartbeatpb.ChainHealthDetails{
BlockHeightsByChain: map[uint64]uint64{42: 100},
},
}

// Pass call options (will fail to connect but options should be accepted)
_, err = client.SendHeartbeat(ctx, req, grpc.WaitForReady(false))
assert.Error(t, err)
}
29 changes: 29 additions & 0 deletions integration/pkg/heartbeatclient/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package heartbeatclient

import (
"context"
)

// NoopHeartbeatClient is a no-op implementation of HeartbeatSender.
type NoopHeartbeatClient struct{}

// NewNoopHeartbeatClient creates a new no-op heartbeat client.
func NewNoopHeartbeatClient() *NoopHeartbeatClient {
return &NoopHeartbeatClient{}
}

// SendHeartbeat is a no-op implementation that returns a dummy response.
func (n *NoopHeartbeatClient) SendHeartbeat(ctx context.Context, blockHeightsByChain map[uint64]uint64) (HeartbeatResponse, error) {
return HeartbeatResponse{
AggregatorID: "noop",
Timestamp: 0,
ChainBenchmarks: make(map[uint64]ChainBenchmark),
}, nil
}

// Close is a no-op implementation.
func (n *NoopHeartbeatClient) Close() error {
return nil
}

var _ HeartbeatSender = (*NoopHeartbeatClient)(nil)
Loading
Loading