diff --git a/build/devenv/dashboards/heartbeat_agg.json b/build/devenv/dashboards/heartbeat_agg.json new file mode 100644 index 000000000..fcb5e2c12 --- /dev/null +++ b/build/devenv/dashboards/heartbeat_agg.json @@ -0,0 +1,1594 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + }, + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "enable": true, + "iconColor": "red", + "name": "New annotation" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 15, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 7, + "panels": [], + "title": "Heartbeat uptime overview", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 0, + "y": 1 + }, + "id": 13, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "count(\n last_over_time(aggregator_heartbeat_verifier_heartbeats_total[30s])\n)", + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "A" + } + ], + "title": "Verifiers Up", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 4, + "y": 1 + }, + "id": 14, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "count (aggregator_heartbeat_verifier_heartbeats_total{})", + "instant": false, + "legendFormat": "Up", + "range": true, + "refId": "A" + } + ], + "title": "Verifiers Up", + "type": "timeseries" + }, + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "gridPos": { + "h": 5, + "w": 14, + "x": 10, + "y": 1 + }, + "id": 19, + "options": { + "code": { + "language": "plaintext", + "showLineNumbers": false, + "showMiniMap": false + }, + "content": " ", + "mode": "markdown" + }, + "pluginVersion": "10.1.0", + "transparent": true, + "type": "text" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 98 + }, + { + "color": "green", + "value": 100 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 0, + "y": 6 + }, + "id": 4, + "maxPerRow": 8, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "repeat": "verifier", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "max(round(\n clamp_max(\n (rate(aggregator_heartbeat_verifier_heartbeats_total{caller_id=\"$verifier\"}[$__range]) / 0.1) * 100,\n 100\n )\n))", + "instant": true, + "legendFormat": "{{caller_id}}", + "range": false, + "refId": "A" + } + ], + "title": "Heartbeat uptime for $verifier", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 98 + }, + { + "color": "green", + "value": 100 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 10, + "w": 24, + "x": 0, + "y": 9 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "round(\n clamp_max(\n (rate(aggregator_heartbeat_verifier_heartbeats_total[$__range]) / 0.1) * 100,\n 100\n )\n)", + "hide": false, + "legendFormat": "{{caller_id}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "aggregator_heartbeat_verifier_heartbeats_total", + "hide": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Heartbeat uptime per Verifier", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 19 + }, + "id": 3, + "panels": [], + "title": "Heartbeat - Time since last heartbeat", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 4, + "x": 0, + "y": 20 + }, + "id": 2, + "maxPerRow": 8, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "repeat": "verifier", + "repeatDirection": "h", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "time() - max by(caller_id) (aggregator_heartbeat_verifier_heartbeat_timestamp{caller_id=~\"$verifier\"})", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "$verifier", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 11, + "w": 24, + "x": 0, + "y": 23 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "time() - max by(caller_id) (aggregator_heartbeat_verifier_heartbeat_timestamp{})", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Time since last Heartbeat", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 34 + }, + "id": 8, + "panels": [], + "title": "Heartbeat - head report & benchmark score overview", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "color-background" + }, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 2 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 35 + }, + "id": 10, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "quantile(0.90, aggregator_heartbeat_verifier_score{caller_id=~\"$verifier\"}) by (caller_id)", + "instant": true, + "legendFormat": "{{caller_id}}", + "range": false, + "refId": "A" + } + ], + "title": "P90 Score $verifier", + "transformations": [ + { + "id": "seriesToRows", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true + }, + "indexByName": {}, + "renameByName": {} + } + }, + { + "id": "rowsToFields", + "options": {} + } + ], + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "color-background" + }, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 2 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 40 + }, + "id": 26, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "quantile(0.95, aggregator_heartbeat_verifier_score{caller_id=~\"$verifier\"}) by (caller_id)", + "instant": true, + "legendFormat": "{{caller_id}}", + "range": false, + "refId": "A" + } + ], + "title": "P95 Score $verifier", + "transformations": [ + { + "id": "seriesToRows", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true + }, + "indexByName": {}, + "renameByName": {} + } + }, + { + "id": "rowsToFields", + "options": {} + } + ], + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "color-background" + }, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 2 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 45 + }, + "id": 27, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "quantile(0.99, aggregator_heartbeat_verifier_score{caller_id=~\"$verifier\"}) by (caller_id)", + "instant": true, + "legendFormat": "{{caller_id}}", + "range": false, + "refId": "A" + } + ], + "title": "P99 Score $verifier", + "transformations": [ + { + "id": "seriesToRows", + "options": {} + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true + }, + "indexByName": {}, + "renameByName": {} + } + }, + { + "id": "rowsToFields", + "options": {} + } + ], + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 16, + "x": 0, + "y": 50 + }, + "id": 9, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "aggregator_heartbeat_verifier_score{}", + "legendFormat": "{{caller_id}}: {{chain_selector}}", + "range": true, + "refId": "A" + } + ], + "title": "Verifier Score over time", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + }, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 50 + }, + "id": 40, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "aggregator_heartbeat_verifier_score{}", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Verifier Score over time", + "transformations": [ + { + "id": "seriesToRows", + "options": {} + }, + { + "id": "extractFields", + "options": { + "source": "Metric" + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "__name__": true, + "chain_selector": false, + "csa_public_key": true, + "exported_job": true, + "host_name": true, + "instance": true, + "job": true, + "os_description": true, + "os_type": true, + "service_name": true, + "telemetry_sdk_language": true, + "telemetry_sdk_name": true, + "telemetry_sdk_version": true + }, + "indexByName": { + "Metric": 1, + "Time": 0, + "Value": 8, + "__name__": 2, + "caller_id": 3, + "chain_selector": 4, + "csa_public_key": 5, + "exported_job": 6, + "host_name": 7, + "instance": 9, + "job": 10, + "os_description": 11, + "os_type": 12, + "service_name": 13, + "telemetry_sdk_language": 14, + "telemetry_sdk_name": 15, + "telemetry_sdk_version": 16 + }, + "renameByName": { + "Value": "Score", + "caller_id": "Verifier", + "chain_selector": "ChainSelector" + } + } + } + ], + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 16, + "x": 0, + "y": 59 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "aggregator_heartbeat_verifier_reported_chain_heads", + "legendFormat": "{{caller_id}}: {{chain_selector}}", + "range": true, + "refId": "A" + } + ], + "title": "Verifier reported chain head over time", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + }, + "filterable": true, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 59 + }, + "id": 12, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "aggregator_heartbeat_verifier_reported_chain_heads", + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "A" + } + ], + "title": "Verifiers reported head details", + "transformations": [ + { + "id": "seriesToRows", + "options": {} + }, + { + "id": "extractFields", + "options": { + "source": "Metric" + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "__name__": true, + "csa_public_key": true, + "exported_job": true, + "host_name": true, + "instance": true, + "job": true, + "os_description": true, + "os_type": true, + "service_name": true, + "telemetry_sdk_language": true, + "telemetry_sdk_name": true, + "telemetry_sdk_version": true + }, + "indexByName": { + "Metric": 1, + "Time": 0, + "Value": 4, + "__name__": 5, + "caller_id": 2, + "chain_selector": 3, + "csa_public_key": 6, + "exported_job": 7, + "host_name": 8, + "instance": 9, + "job": 10, + "os_description": 11, + "os_type": 12, + "service_name": 13, + "telemetry_sdk_language": 14, + "telemetry_sdk_name": 15, + "telemetry_sdk_version": 16 + }, + "renameByName": { + "Value": "Reported chain head" + } + } + } + ], + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 16, + "x": 0, + "y": 68 + }, + "id": 38, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "max by (chain_selector, caller_id) (aggregator_heartbeat_verifier_reported_chain_heads)\n- on(chain_selector) group_left()\nmax by (chain_selector) (aggregator_heartbeat_verifier_current_max_chain_head)", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Verifier reported chain gap from max ", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + }, + "filterable": true, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 68 + }, + "id": 39, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "max by (chain_selector, caller_id) (aggregator_heartbeat_verifier_reported_chain_heads)\n- on(chain_selector) group_left()\nmax by (chain_selector) (aggregator_heartbeat_verifier_current_max_chain_head)", + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "A" + } + ], + "title": "Verifiers reported head details", + "transformations": [ + { + "id": "seriesToRows", + "options": {} + }, + { + "id": "extractFields", + "options": { + "source": "Metric" + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Metric": true, + "Time": true, + "__name__": true, + "csa_public_key": true, + "exported_job": true, + "host_name": true, + "instance": true, + "job": true, + "os_description": true, + "os_type": true, + "service_name": true, + "telemetry_sdk_language": true, + "telemetry_sdk_name": true, + "telemetry_sdk_version": true + }, + "indexByName": { + "Metric": 1, + "Time": 0, + "Value": 4, + "__name__": 5, + "caller_id": 2, + "chain_selector": 3, + "csa_public_key": 6, + "exported_job": 7, + "host_name": 8, + "instance": 9, + "job": 10, + "os_description": 11, + "os_type": 12, + "service_name": 13, + "telemetry_sdk_language": 14, + "telemetry_sdk_name": 15, + "telemetry_sdk_version": 16 + }, + "renameByName": { + "Value": "Chain head lag" + } + } + } + ], + "type": "table" + } + ], + "refresh": "", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": [ + "All" + ], + "value": [ + "$__all" + ] + }, + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "definition": "label_values(aggregator_heartbeat_verifier_heartbeats_total,caller_id)", + "hide": 0, + "includeAll": true, + "multi": true, + "name": "verifier", + "options": [], + "query": { + "query": "label_values(aggregator_heartbeat_verifier_heartbeats_total,caller_id)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Health overview (Aggregator View)", + "uid": "c0968deb-5dbc-4635-b8d0-3937ce801c71", + "version": 13, + "weekStart": "" +} diff --git a/build/devenv/dashboards/heartbeat_verifier.json b/build/devenv/dashboards/heartbeat_verifier.json new file mode 100644 index 000000000..3b8722e9e --- /dev/null +++ b/build/devenv/dashboards/heartbeat_verifier.json @@ -0,0 +1,593 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 5, + "title": "Heartbeat chain score & benchmark", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 10, + "x": 0, + "y": 1 + }, + "id": 7, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": " sum by (chain_selector) (verifier_heartbeat_chain_heads{host_name=\"$hostname\"}) - sum by (chain_selector) (verifier_heartbeat_sent_chain_heads{host_name=\"$hostname\"})", + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "A" + } + ], + "title": "Blocks behind vs benchmark", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 14, + "x": 10, + "y": 1 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": " sum by (chain_selector) (verifier_heartbeat_chain_heads{host_name=\"$hostname\"}) - sum by (chain_selector) (verifier_heartbeat_sent_chain_heads{host_name=\"$hostname\"})", + "legendFormat": "__auto", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "sum by (chain_selector) (verifier_heartbeat_chain_heads{host_name=\"$hostame\"})", + "hide": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Blocks behind vs benchmark", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 3 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 10, + "x": 0, + "y": 7 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "exemplar": false, + "expr": "verifier_heartbeat_score{host_name=\"$hostname\"}", + "instant": true, + "legendFormat": "{{chain_selector}}", + "range": false, + "refId": "A" + } + ], + "title": "Heartbeat benchmark score", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 14, + "x": 10, + "y": 7 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "verifier_heartbeat_score{host_name=\"$hostname\"}", + "legendFormat": "{{chain_selector}}", + "range": true, + "refId": "A" + } + ], + "title": "Heartbeat benchmark score", + "type": "timeseries" + }, + { + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 14 + }, + "id": 3, + "title": "Heartbeat uptime overview", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 95 + }, + { + "color": "green", + "value": 99 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "round(\n clamp_max(\n (rate(verifier_heartbeats_sent_total{host_name=\"$hostname\"}[$__range]) / 0.1) * 100,\n 100\n )\n)", + "legendFormat": "{{verifier_id}}", + "range": true, + "refId": "A" + } + ], + "title": "Heartbeat uptime", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "round(\n clamp_max(\n (rate(verifier_heartbeats_sent_total[$__range]) / 0.1) * 100,\n 100\n )\n)", + "hide": false, + "legendFormat": "{{verifier_id}}", + "range": true, + "refId": "A" + } + ], + "title": "Heartbeat uptime over time", + "type": "timeseries" + } + ], + "refresh": "", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "124d04d2d422", + "value": "124d04d2d422" + }, + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "definition": "label_values(verifier_heartbeat_score,host_name)", + "hide": 0, + "includeAll": false, + "multi": false, + "name": "hostname", + "options": [], + "query": { + "query": "label_values(verifier_heartbeat_score,host_name)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + } + ] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Heartbeat overview (Verifier View)", + "uid": "b62c6293-c01b-4b43-8705-bb3f918e679c", + "version": 1, + "weekStart": "" +} diff --git a/build/devenv/fakes/go.mod b/build/devenv/fakes/go.mod index d0e7fae32..556376cd2 100644 --- a/build/devenv/fakes/go.mod +++ b/build/devenv/fakes/go.mod @@ -77,6 +77,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chain-selectors v1.0.90 // indirect github.com/smartcontractkit/chainlink-common v0.9.6-0.20260114190811-74301cd99dc3 // indirect + github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e // indirect github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d // indirect github.com/smartcontractkit/chainlink-testing-framework/framework v0.13.6 // indirect github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d // indirect diff --git a/build/devenv/fakes/go.sum b/build/devenv/fakes/go.sum index 4405f9e4a..bbbeebdc4 100644 --- a/build/devenv/fakes/go.sum +++ b/build/devenv/fakes/go.sum @@ -250,6 +250,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d h1:VYoBBNnQpZ5p+enPTl8SkKBRaubqyGpO0ul3B1np++I= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/committee-verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:oNFoKHRIerxuaANa8ASNejtHrdsG26LqGtQ2XhSac2g= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e h1:c7vgdeidC0LMtV1a01B/rPL4fEC/cnPanRDflRijXCM= +github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat v0.0.0-20260115142640-f6b99095c12e/go.mod h1:rZV/gLc1wlSp2r5oXN09iOrlyZPFX4iK+cqoSW2k5dc= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:5JdppgngCOUS76p61zCinSCgOhPeYQ+OcDUuome5THQ= github.com/smartcontractkit/chainlink-testing-framework/framework v0.13.6 h1:JqMRimMu05jFs2iz4rduvodaVKe+E9VSJhEJ7dfgXwo= diff --git a/cmd/verifier/committee/main.go b/cmd/verifier/committee/main.go index a615b19f0..9d55370e3 100644 --- a/cmd/verifier/committee/main.go +++ b/cmd/verifier/committee/main.go @@ -19,6 +19,7 @@ import ( cmd "github.com/smartcontractkit/chainlink-ccv/cmd/verifier" "github.com/smartcontractkit/chainlink-ccv/integration/pkg/accessors" "github.com/smartcontractkit/chainlink-ccv/integration/pkg/blockchain" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" "github.com/smartcontractkit/chainlink-ccv/integration/storageaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" "github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac" @@ -179,7 +180,8 @@ func main() { StorageBatchSize: 50, StorageBatchTimeout: 100 * time.Millisecond, StorageRetryDelay: 2 * time.Second, - CursePollInterval: 2 * time.Second, // Poll RMN Remotes for curse status every 2s + CursePollInterval: 2 * time.Second, // Poll RMN Remotes for curse status every 2s + HeartbeatInterval: 10 * time.Second, // Send heartbeat to aggregator every 10s } pk := os.Getenv(PkEnvVar) @@ -218,6 +220,29 @@ func main() { verifierMonitoring, ) + heartbeatClient, err := heartbeatclient.NewHeartbeatClient( + config.AggregatorAddress, + lggr, + hmacConfig, + config.InsecureAggregatorConnection, + ) + if err != nil { + lggr.Errorw("Failed to create heartbeat client", "error", err) + os.Exit(1) + } + defer func() { + if heartbeatClient != nil { + _ = heartbeatClient.Close() + } + }() + + observedHeartbeatClient := heartbeatclient.NewObservedHeartbeatClient( + heartbeatClient, + config.VerifierID, + lggr, + verifier.NewHeartbeatMonitoringAdapter(verifierMonitoring), + ) + messageTracker := monitoring.NewMessageLatencyTracker( lggr, config.VerifierID, @@ -235,6 +260,7 @@ func main() { messageTracker, verifierMonitoring, chainStatusManager, + observedHeartbeatClient, ) if err != nil { lggr.Errorw("Failed to create verification coordinator", "error", err) diff --git a/cmd/verifier/token/main.go b/cmd/verifier/token/main.go index a32c384c5..2c73d624c 100644 --- a/cmd/verifier/token/main.go +++ b/cmd/verifier/token/main.go @@ -16,6 +16,7 @@ import ( cmd "github.com/smartcontractkit/chainlink-ccv/cmd/verifier" "github.com/smartcontractkit/chainlink-ccv/integration/pkg/accessors" "github.com/smartcontractkit/chainlink-ccv/integration/pkg/blockchain" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" "github.com/smartcontractkit/chainlink-ccv/protocol/common/logging" @@ -235,6 +236,7 @@ func createCCTPCoordinator( messageTracker, verifierMonitoring, chainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) if err != nil { lggr.Errorw("Failed to create verification coordinator for cctp", "error", err) @@ -282,6 +284,7 @@ func createLBTCCoordinator( messageTracker, verifierMonitoring, chainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) if err != nil { lggr.Errorw("Failed to create verification coordinator for lbtc", "error", err) diff --git a/integration/pkg/constructors/committee_verifier.go b/integration/pkg/constructors/committee_verifier.go index 30f4da90f..b525a93bb 100644 --- a/integration/pkg/constructors/committee_verifier.go +++ b/integration/pkg/constructors/committee_verifier.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/chainlink-ccip/ccv/chains/evm/gobindings/generated/latest/onramp" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" "github.com/smartcontractkit/chainlink-ccv/integration/pkg/sourcereader" "github.com/smartcontractkit/chainlink-ccv/integration/storageaccess" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" @@ -134,6 +135,7 @@ func NewVerificationCoordinator( StorageBatchSize: 50, StorageBatchTimeout: 100 * time.Millisecond, StorageRetryDelay: 2 * time.Second, + HeartbeatInterval: 0, // Disabled by default } // Create commit verifier (with ECDSA signer) @@ -161,6 +163,7 @@ func NewVerificationCoordinator( messageTracker, verifierMonitoring, chainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) if err != nil { lggr.Errorw("Failed to create verification coordinator", "error", err) diff --git a/integration/pkg/heartbeatclient/heartbeatclient.go b/integration/pkg/heartbeatclient/heartbeatclient.go new file mode 100644 index 000000000..5f439261a --- /dev/null +++ b/integration/pkg/heartbeatclient/heartbeatclient.go @@ -0,0 +1,76 @@ +package heartbeatclient + +import ( + "context" + "crypto/tls" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + insecuregrpc "google.golang.org/grpc/credentials/insecure" + + "github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1" +) + +const ( + MinTLSVersion = tls.VersionTLS13 +) + +// HeartbeatClient provides methods to send heartbeats to the aggregator service. +type HeartbeatClient struct { + client heartbeatpb.HeartbeatServiceClient + conn *grpc.ClientConn + lggr logger.Logger +} + +// NewHeartbeatClient creates a new heartbeat client that communicates with the aggregator. +// If insecure is true, TLS verification is disabled (only for testing). +func NewHeartbeatClient(address string, lggr logger.Logger, hmacConfig *hmac.ClientConfig, insecure bool) (*HeartbeatClient, error) { + var dialOptions []grpc.DialOption + if insecure { + dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecuregrpc.NewCredentials())) + } else { + dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: MinTLSVersion}))) + } + + if hmacConfig != nil { + dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(hmac.NewClientInterceptor(hmacConfig))) + } + + conn, err := grpc.NewClient( + address, + dialOptions..., + ) + if err != nil { + return nil, err + } + + lggr.Infof("Created HeartbeatClient connecting to %s", address) + + return &HeartbeatClient{ + client: heartbeatpb.NewHeartbeatServiceClient(conn), + conn: conn, + lggr: logger.With(lggr, "service", "heartbeat_client", "aggregatorAddress", address), + }, nil +} + +// SendHeartbeat sends a heartbeat request to the aggregator. +func (hc *HeartbeatClient) SendHeartbeat(ctx context.Context, req *heartbeatpb.HeartbeatRequest, opts ...grpc.CallOption) (*heartbeatpb.HeartbeatResponse, error) { + resp, err := hc.client.SendHeartbeat(ctx, req, opts...) + if err != nil { + hc.lggr.Errorw("Failed to send heartbeat", "error", err) + return nil, fmt.Errorf("failed to send heartbeat: %w", err) + } + hc.lggr.Debugw("Heartbeat sent successfully", "timestamp", req.SendTimestamp) + return resp, nil +} + +// Close closes the gRPC connection to the aggregator server. +func (hc *HeartbeatClient) Close() error { + if hc.conn != nil { + return hc.conn.Close() + } + return nil +} diff --git a/integration/pkg/heartbeatclient/heartbeatclient_test.go b/integration/pkg/heartbeatclient/heartbeatclient_test.go new file mode 100644 index 000000000..0c5acbb21 --- /dev/null +++ b/integration/pkg/heartbeatclient/heartbeatclient_test.go @@ -0,0 +1,139 @@ +package heartbeatclient_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1" +) + +func TestNewHeartbeatClient_InvalidAddress(t *testing.T) { + lggr := logger.Test(t) + + // Test with invalid address that can't be reached + client, err := heartbeatclient.NewHeartbeatClient("invalid://address", lggr, nil, true) + // Connection succeeds but will fail on actual send + require.NoError(t, err) + require.NotNil(t, client) + defer client.Close() +} + +func TestHeartbeatClient_SendHeartbeat_Success(t *testing.T) { + lggr := logger.Test(t) + + // Test basic client construction + client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true) + require.NoError(t, err) + require.NotNil(t, client) + defer client.Close() +} + +func TestHeartbeatClient_SendHeartbeat_WithHMAC(t *testing.T) { + lggr := logger.Test(t) + + // Create HMAC config + hmacConfig := &hmac.ClientConfig{ + APIKey: "test-verifier", + Secret: "test-secret-key-1234567890ab", + } + + // Client should be created successfully with HMAC config + client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, hmacConfig, true) + require.NoError(t, err) + require.NotNil(t, client) + defer client.Close() +} + +func TestHeartbeatClient_Close(t *testing.T) { + lggr := logger.Test(t) + + client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true) + require.NoError(t, err) + require.NotNil(t, client) + + // Close should not error + err = client.Close() + // Note: Close() may return an error if there are pending operations + if err != nil { + t.Logf("First close returned error (expected): %v", err) + } + + // Closing again - gRPC connections may error on second close + err = client.Close() + if err != nil { + t.Logf("Second close returned error (expected): %v", err) + } +} + +func TestHeartbeatClient_SendHeartbeat_Timeout(t *testing.T) { + lggr := logger.Test(t) + + client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true) + require.NoError(t, err) + require.NotNil(t, client) + defer client.Close() + + // Create a context that times out immediately + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) + defer cancel() + + // This should fail due to timeout (since the server isn't actually running) + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: time.Now().Unix(), + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{42: 100}, + }, + } + + // We expect an error (either deadline exceeded or connection refused) + _, err = client.SendHeartbeat(ctx, req) + assert.Error(t, err) +} + +func TestHeartbeatClient_SendHeartbeat_NilRequest(t *testing.T) { + lggr := logger.Test(t) + + client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true) + require.NoError(t, err) + require.NotNil(t, client) + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // Sending nil request should fail + _, err = client.SendHeartbeat(ctx, nil) + assert.Error(t, err) +} + +// TestHeartbeatClient_WithCallOptions tests that call options are properly passed through. +func TestHeartbeatClient_WithCallOptions(t *testing.T) { + lggr := logger.Test(t) + + client, err := heartbeatclient.NewHeartbeatClient("localhost:50051", lggr, nil, true) + require.NoError(t, err) + require.NotNil(t, client) + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: time.Now().Unix(), + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: map[uint64]uint64{42: 100}, + }, + } + + // Pass call options (will fail to connect but options should be accepted) + _, err = client.SendHeartbeat(ctx, req, grpc.WaitForReady(false)) + assert.Error(t, err) +} diff --git a/integration/pkg/heartbeatclient/noop.go b/integration/pkg/heartbeatclient/noop.go new file mode 100644 index 000000000..c3ee7b075 --- /dev/null +++ b/integration/pkg/heartbeatclient/noop.go @@ -0,0 +1,29 @@ +package heartbeatclient + +import ( + "context" +) + +// NoopHeartbeatClient is a no-op implementation of HeartbeatSender. +type NoopHeartbeatClient struct{} + +// NewNoopHeartbeatClient creates a new no-op heartbeat client. +func NewNoopHeartbeatClient() *NoopHeartbeatClient { + return &NoopHeartbeatClient{} +} + +// SendHeartbeat is a no-op implementation that returns a dummy response. +func (n *NoopHeartbeatClient) SendHeartbeat(ctx context.Context, blockHeightsByChain map[uint64]uint64) (HeartbeatResponse, error) { + return HeartbeatResponse{ + AggregatorID: "noop", + Timestamp: 0, + ChainBenchmarks: make(map[uint64]ChainBenchmark), + }, nil +} + +// Close is a no-op implementation. +func (n *NoopHeartbeatClient) Close() error { + return nil +} + +var _ HeartbeatSender = (*NoopHeartbeatClient)(nil) diff --git a/integration/pkg/heartbeatclient/observed_heartbeat_client.go b/integration/pkg/heartbeatclient/observed_heartbeat_client.go new file mode 100644 index 000000000..e1359d97f --- /dev/null +++ b/integration/pkg/heartbeatclient/observed_heartbeat_client.go @@ -0,0 +1,159 @@ +package heartbeatclient + +import ( + "context" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + heartbeatpb "github.com/smartcontractkit/chainlink-protos/chainlink-ccv/heartbeat/v1" +) + +// HeartbeatSender defines the interface for sending heartbeats to the aggregator. +type HeartbeatSender interface { + // SendHeartbeat sends chain status information to the aggregator. + // Returns the aggregator's response containing benchmarks and timestamp. + SendHeartbeat(ctx context.Context, blockHeightsByChain map[uint64]uint64) (HeartbeatResponse, error) + // Close closes the heartbeat client connection. + Close() error +} + +// HeartbeatResponse contains the aggregator's response to a heartbeat. +type HeartbeatResponse struct { + AggregatorID string + Timestamp int64 + ChainBenchmarks map[uint64]ChainBenchmark +} + +// ChainBenchmark contains benchmark information for a specific chain. +type ChainBenchmark struct { + BlockHeight uint64 + Score float32 +} + +// Monitoring provides monitoring functionality for heartbeat clients. +// Services using the heartbeat client should provide an adapter implementing this interface. +type Monitoring interface { + // Metrics returns the metrics labeler. + Metrics() MetricLabeler +} + +// MetricLabeler provides metric recording functionality for heartbeat operations. +type MetricLabeler interface { + // With returns a new metrics labeler with the given key-value pairs. + With(keyValues ...string) MetricLabeler + + // RecordHeartbeatDuration records the duration of a heartbeat operation. + RecordHeartbeatDuration(ctx context.Context, duration time.Duration) + + // IncrementHeartbeatsSent increments the counter for successfully sent heartbeats. + IncrementHeartbeatsSent(ctx context.Context) + + // IncrementHeartbeatsFailed increments the counter for failed heartbeat attempts. + IncrementHeartbeatsFailed(ctx context.Context) + + // SetVerifierHeartbeatTimestamp sets the timestamp from the heartbeat response. + SetVerifierHeartbeatTimestamp(ctx context.Context, timestamp int64) + + // SetVerifierHeartbeatSentChainHeads sets the block height sent in the heartbeat request for a chain. + SetVerifierHeartbeatSentChainHeads(ctx context.Context, blockHeight uint64) + + // SetVerifierHeartbeatChainHeads sets the block height for a chain from the heartbeat response. + SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) + + // SetVerifierHeartbeatScore sets the score for a chain from the heartbeat response. + SetVerifierHeartbeatScore(ctx context.Context, score float64) +} + +// ObservedHeartbeatClient wraps a HeartbeatClient with observability. +type ObservedHeartbeatClient struct { + delegate *HeartbeatClient + verifierID string + lggr logger.Logger + monitoring Monitoring +} + +// NewObservedHeartbeatClient creates a new observed heartbeat client. +func NewObservedHeartbeatClient( + delegate *HeartbeatClient, + verifierID string, + lggr logger.Logger, + monitoring Monitoring, +) *ObservedHeartbeatClient { + return &ObservedHeartbeatClient{ + delegate: delegate, + verifierID: verifierID, + lggr: lggr, + monitoring: monitoring, + } +} + +// SendHeartbeat sends a heartbeat request with observability. +func (o *ObservedHeartbeatClient) SendHeartbeat(ctx context.Context, blockHeightsByChain map[uint64]uint64) (HeartbeatResponse, error) { + start := time.Now() + + // Build proto request + req := &heartbeatpb.HeartbeatRequest{ + SendTimestamp: time.Now().Unix(), + ChainDetails: &heartbeatpb.ChainHealthDetails{ + BlockHeightsByChain: blockHeightsByChain, + }, + } + + resp, err := o.delegate.SendHeartbeat(ctx, req) + + duration := time.Since(start) + + metrics := o.monitoring.Metrics().With("verifier_id", o.verifierID) + metrics.RecordHeartbeatDuration(ctx, duration) + + // Record what we're sending in the request. It will be used for monitoring of the lag. + for chainSelector, blockHeight := range blockHeightsByChain { + chainMetrics := metrics.With("chain_selector", fmt.Sprintf("%d", chainSelector)) + chainMetrics.SetVerifierHeartbeatSentChainHeads(ctx, blockHeight) + } + + if err != nil { + metrics.IncrementHeartbeatsFailed(ctx) + o.lggr.Errorw("Heartbeat failed", + "error", err, + "duration", duration, + ) + return HeartbeatResponse{}, err + } + + metrics.IncrementHeartbeatsSent(ctx) + + metrics.SetVerifierHeartbeatTimestamp(ctx, resp.Timestamp) + + // Convert proto response to domain response + chainBenchmarks := make(map[uint64]ChainBenchmark, len(resp.ChainBenchmarks)) + for chainSelector, benchmark := range resp.ChainBenchmarks { + chainBenchmarks[chainSelector] = ChainBenchmark{ + BlockHeight: benchmark.BlockHeight, + Score: benchmark.Score, + } + + // Record metrics + chainMetrics := metrics.With("chain_selector", fmt.Sprintf("%d", chainSelector)) + chainMetrics.SetVerifierHeartbeatChainHeads(ctx, benchmark.BlockHeight) + chainMetrics.SetVerifierHeartbeatScore(ctx, float64(benchmark.Score)) + } + + o.lggr.Debugw("Heartbeat succeeded", + "duration", duration, + "chainCount", len(blockHeightsByChain), + "chainBenchmarkCount", len(chainBenchmarks), + ) + + return HeartbeatResponse{ + AggregatorID: resp.AggregatorId, + Timestamp: resp.Timestamp, + ChainBenchmarks: chainBenchmarks, + }, nil +} + +// Close closes the underlying heartbeat client. +func (o *ObservedHeartbeatClient) Close() error { + return o.delegate.Close() +} diff --git a/integration/pkg/heartbeatclient/observed_heartbeat_client_test.go b/integration/pkg/heartbeatclient/observed_heartbeat_client_test.go new file mode 100644 index 000000000..5aff55bcf --- /dev/null +++ b/integration/pkg/heartbeatclient/observed_heartbeat_client_test.go @@ -0,0 +1,55 @@ +package heartbeatclient_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/smartcontractkit/chainlink-ccv/verifier" + "github.com/smartcontractkit/chainlink-ccv/verifier/pkg/monitoring" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func TestObservedHeartbeatClient_Close(t *testing.T) { + lggr := logger.Test(t) + fakeMonitoring := monitoring.NewFakeVerifierMonitoring() + + delegateClient := &heartbeatclient.HeartbeatClient{} + + observedClient := heartbeatclient.NewObservedHeartbeatClient( + delegateClient, + "test-verifier", + lggr, + verifier.NewHeartbeatMonitoringAdapter(fakeMonitoring), + ) + + // Close should not error + err := observedClient.Close() + assert.NoError(t, err) +} + +func TestObservedHeartbeatClient_WithChainSelector(t *testing.T) { + lggr := logger.Test(t) + fakeMonitoring := monitoring.NewFakeVerifierMonitoring() + + delegateClient := &heartbeatclient.HeartbeatClient{} + + observedClient := heartbeatclient.NewObservedHeartbeatClient( + delegateClient, + "test-verifier", + lggr, + verifier.NewHeartbeatMonitoringAdapter(fakeMonitoring), + ) + require.NotNil(t, observedClient) + + metrics := fakeMonitoring.Metrics() + + // Verify that With() returns a metric labeler that can be used for chain-specific metrics + chainMetrics := metrics.With("chain_selector", "42") + assert.NotNil(t, chainMetrics) + + chainMetrics = metrics.With("chain_selector", "100") + assert.NotNil(t, chainMetrics) +} diff --git a/verifier/heartbeat_monitoring_adapter.go b/verifier/heartbeat_monitoring_adapter.go new file mode 100644 index 000000000..6a801b6a9 --- /dev/null +++ b/verifier/heartbeat_monitoring_adapter.go @@ -0,0 +1,61 @@ +package verifier + +import ( + "context" + "time" + + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" +) + +// heartbeatMonitoringAdapter adapts verifier.Monitoring to heartbeatclient.Monitoring. +// This allows the reusable heartbeat client to work with verifier-specific monitoring. +type heartbeatMonitoringAdapter struct { + monitoring Monitoring +} + +// NewHeartbeatMonitoringAdapter creates an adapter that allows verifier.Monitoring +// to be used with the heartbeat client's observability layer. +func NewHeartbeatMonitoringAdapter(monitoring Monitoring) heartbeatclient.Monitoring { + return &heartbeatMonitoringAdapter{monitoring: monitoring} +} + +func (a *heartbeatMonitoringAdapter) Metrics() heartbeatclient.MetricLabeler { + return &heartbeatMetricLabelerAdapter{labeler: a.monitoring.Metrics()} +} + +// heartbeatMetricLabelerAdapter adapts verifier.MetricLabeler to heartbeatclient.MetricLabeler. +type heartbeatMetricLabelerAdapter struct { + labeler MetricLabeler +} + +func (a *heartbeatMetricLabelerAdapter) With(keyValues ...string) heartbeatclient.MetricLabeler { + return &heartbeatMetricLabelerAdapter{labeler: a.labeler.With(keyValues...)} +} + +func (a *heartbeatMetricLabelerAdapter) RecordHeartbeatDuration(ctx context.Context, duration time.Duration) { + a.labeler.RecordHeartbeatDuration(ctx, duration) +} + +func (a *heartbeatMetricLabelerAdapter) IncrementHeartbeatsSent(ctx context.Context) { + a.labeler.IncrementHeartbeatsSent(ctx) +} + +func (a *heartbeatMetricLabelerAdapter) IncrementHeartbeatsFailed(ctx context.Context) { + a.labeler.IncrementHeartbeatsFailed(ctx) +} + +func (a *heartbeatMetricLabelerAdapter) SetVerifierHeartbeatTimestamp(ctx context.Context, timestamp int64) { + a.labeler.SetVerifierHeartbeatTimestamp(ctx, timestamp) +} + +func (a *heartbeatMetricLabelerAdapter) SetVerifierHeartbeatSentChainHeads(ctx context.Context, blockHeight uint64) { + a.labeler.SetVerifierHeartbeatSentChainHeads(ctx, blockHeight) +} + +func (a *heartbeatMetricLabelerAdapter) SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) { + a.labeler.SetVerifierHeartbeatChainHeads(ctx, blockHeight) +} + +func (a *heartbeatMetricLabelerAdapter) SetVerifierHeartbeatScore(ctx context.Context, score float64) { + a.labeler.SetVerifierHeartbeatScore(ctx, score) +} diff --git a/verifier/heartbeat_reporter.go b/verifier/heartbeat_reporter.go new file mode 100644 index 000000000..6dc52a486 --- /dev/null +++ b/verifier/heartbeat_reporter.go @@ -0,0 +1,177 @@ +package verifier + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/smartcontractkit/chainlink-ccv/protocol" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +const ( + // DefaultHeartbeatInterval is how often to send heartbeat with chain statuses to aggregator. + DefaultHeartbeatInterval = 10 * time.Second +) + +// HeartbeatReporter periodically reads chain statuses and sends them to the aggregator via heartbeat. +type HeartbeatReporter struct { + services.StateMachine + stopCh services.StopChan + wg sync.WaitGroup + + logger logger.Logger + chainStatusManager protocol.ChainStatusManager + heartbeatClient heartbeatclient.HeartbeatSender + allSelectors []protocol.ChainSelector + verifierID string + interval time.Duration +} + +// NewHeartbeatReporter creates a new heartbeat reporter service. +func NewHeartbeatReporter( + lggr logger.Logger, + chainStatusManager protocol.ChainStatusManager, + heartbeatClient heartbeatclient.HeartbeatSender, + allSelectors []protocol.ChainSelector, + verifierID string, + interval time.Duration, +) (*HeartbeatReporter, error) { + if lggr == nil { + return nil, fmt.Errorf("logger cannot be nil") + } + if chainStatusManager == nil { + return nil, fmt.Errorf("chainStatusManager cannot be nil") + } + if heartbeatClient == nil { + return nil, fmt.Errorf("heartbeatClient cannot be nil") + } + if len(allSelectors) == 0 { + return nil, fmt.Errorf("allSelectors cannot be empty") + } + if verifierID == "" { + return nil, fmt.Errorf("verifierID cannot be empty") + } + + if interval == 0 { + interval = DefaultHeartbeatInterval + } + + return &HeartbeatReporter{ + stopCh: make(chan struct{}), + logger: lggr, + chainStatusManager: chainStatusManager, + heartbeatClient: heartbeatClient, + allSelectors: allSelectors, + verifierID: verifierID, + interval: interval, + }, nil +} + +// Start begins the heartbeat reporter service. +func (hr *HeartbeatReporter) Start(ctx context.Context) error { + return hr.StartOnce(hr.Name(), func() error { + hr.logger.Infow("Starting heartbeat reporter", "interval", hr.interval) + hr.wg.Add(1) + go hr.reportLoop(ctx) + return nil + }) +} + +// Close stops the heartbeat reporter service. +func (hr *HeartbeatReporter) Close() error { + return hr.StopOnce(hr.Name(), func() error { + hr.logger.Infow("Stopping heartbeat reporter") + close(hr.stopCh) + hr.wg.Wait() + hr.logger.Infow("Heartbeat reporter stopped") + return nil + }) +} + +// Name returns the name of the service. +func (hr *HeartbeatReporter) Name() string { + return fmt.Sprintf("verifier.HeartbeatReporter[%s]", hr.verifierID) +} + +// HealthReport returns a health report for the heartbeat reporter. +func (hr *HeartbeatReporter) HealthReport() map[string]error { + report := make(map[string]error) + report[hr.Name()] = hr.Ready() + return report +} + +// reportLoop is the main loop that periodically sends heartbeats with chain statuses. +func (hr *HeartbeatReporter) reportLoop(ctx context.Context) { + defer hr.wg.Done() + + ticker := time.NewTicker(hr.interval) + defer ticker.Stop() + + // Send initial heartbeat immediately. + hr.sendHeartbeat(ctx) + + for { + select { + case <-hr.stopCh: + hr.logger.Infow("Heartbeat reporter loop stopped") + return + case <-ctx.Done(): + hr.logger.Infow("Heartbeat reporter context cancelled") + return + case <-ticker.C: + hr.sendHeartbeat(ctx) + } + } +} + +// sendHeartbeat reads chain statuses and sends them to the aggregator. +func (hr *HeartbeatReporter) sendHeartbeat(ctx context.Context) { + // Read chain statuses for all selectors. + statusMap, err := hr.chainStatusManager.ReadChainStatuses(ctx, hr.allSelectors) + if err != nil { + hr.logger.Errorw("Failed to read chain statuses", "error", err) + return + } + + // Build block heights map for heartbeat. + blockHeightsByChain := make(map[uint64]uint64) + for _, selector := range hr.allSelectors { + status, ok := statusMap[selector] + if !ok { + hr.logger.Debugw("Chain status not found", "chainSelector", selector) + continue + } + + // Add block height for this chain if available. + // TODO: change to use latest seen block height instead of finalized when available. + if status.FinalizedBlockHeight != nil { + blockHeightsByChain[uint64(selector)] = status.FinalizedBlockHeight.Uint64() + } + } + + // Send heartbeat request. + resp, err := hr.heartbeatClient.SendHeartbeat(ctx, blockHeightsByChain) + if err != nil { + hr.logger.Errorw("Failed to send heartbeat", "error", err) + return + } + + hr.logger.Infow("Heartbeat sent successfully", + "verifierId", hr.verifierID, + "aggregatorId", resp.AggregatorID, + "chainCount", len(blockHeightsByChain), + ) + hr.logger.Debugw("Heartbeat details", + "verifierId", hr.verifierID, + "blockHeightsByChain", blockHeightsByChain, + "chainBenchmarks", resp.ChainBenchmarks, + "aggregatorId", resp.AggregatorID, + "respTimestamp", resp.Timestamp, + ) +} + +var _ services.Service = (*HeartbeatReporter)(nil) diff --git a/verifier/heartbeat_reporter_test.go b/verifier/heartbeat_reporter_test.go new file mode 100644 index 000000000..8a11c44ee --- /dev/null +++ b/verifier/heartbeat_reporter_test.go @@ -0,0 +1,506 @@ +package verifier_test + +import ( + "context" + "errors" + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/smartcontractkit/chainlink-ccv/internal/mocks" + "github.com/smartcontractkit/chainlink-ccv/protocol" + "github.com/smartcontractkit/chainlink-ccv/verifier" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// mockHeartbeatClient is a mock implementation of heartbeatclient.HeartbeatSender for testing. +type mockHeartbeatClient struct { + mock.Mock +} + +func (m *mockHeartbeatClient) SendHeartbeat(ctx context.Context, blockHeightsByChain map[uint64]uint64) (heartbeatclient.HeartbeatResponse, error) { + args := m.Called(ctx, blockHeightsByChain) + return args.Get(0).(heartbeatclient.HeartbeatResponse), args.Error(1) +} + +func (m *mockHeartbeatClient) Close() error { + args := m.Called() + return args.Error(0) +} + +func TestNewHeartbeatReporter_Success(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1, 10, 100} + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 10*time.Second, + ) + require.NoError(t, err) + require.NotNil(t, reporter) +} + +func TestNewHeartbeatReporter_NilLogger(t *testing.T) { + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + selectors := []protocol.ChainSelector{1} + + _, err := verifier.NewHeartbeatReporter( + nil, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 10*time.Second, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "logger cannot be nil") +} + +func TestNewHeartbeatReporter_NilChainStatusManager(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + selectors := []protocol.ChainSelector{1} + + _, err := verifier.NewHeartbeatReporter( + lggr, + nil, + mockClient, + selectors, + "test-verifier", + 10*time.Second, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "chainStatusManager cannot be nil") +} + +func TestNewHeartbeatReporter_NilHeartbeatClient(t *testing.T) { + lggr := logger.Test(t) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + selectors := []protocol.ChainSelector{1} + + _, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + nil, + selectors, + "test-verifier", + 10*time.Second, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "heartbeatClient cannot be nil") +} + +func TestNewHeartbeatReporter_EmptySelectors(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + _, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + []protocol.ChainSelector{}, + "test-verifier", + 10*time.Second, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "allSelectors cannot be empty") +} + +func TestNewHeartbeatReporter_EmptyVerifierID(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + selectors := []protocol.ChainSelector{1} + + _, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "", + 10*time.Second, + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "verifierID cannot be empty") +} + +func TestNewHeartbeatReporter_DefaultInterval(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + selectors := []protocol.ChainSelector{1} + + // Create with 0 interval - should use default + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 0, + ) + require.NoError(t, err) + require.NotNil(t, reporter) +} + +func TestHeartbeatReporter_StartAndStop(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1, 10} + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Setup mock responses + chainStatusInfo := &protocol.ChainStatusInfo{ + ChainSelector: 1, + FinalizedBlockHeight: big.NewInt(100), + Disabled: false, + } + + mockStatusMgr.On("ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors).Return(map[protocol.ChainSelector]*protocol.ChainStatusInfo{ + 1: chainStatusInfo, + 10: chainStatusInfo, + }, nil) + + mockClient.On("SendHeartbeat", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), mock.MatchedBy(func(blockHeights map[uint64]uint64) bool { + return len(blockHeights) > 0 + })).Return(heartbeatclient.HeartbeatResponse{ + Timestamp: time.Now().Unix(), + AggregatorID: "test-aggregator", + ChainBenchmarks: map[uint64]heartbeatclient.ChainBenchmark{}, + }, nil) + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 50*time.Millisecond, // Short interval for testing + ) + require.NoError(t, err) + + err = reporter.Start(ctx) + require.NoError(t, err) + + // Wait a bit for the reporter to send a heartbeat + time.Sleep(100 * time.Millisecond) + + // Stop the reporter + err = reporter.Close() + require.NoError(t, err) +} + +func TestHeartbeatReporter_SendHeartbeatFailure(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1} + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + chainStatusInfo := &protocol.ChainStatusInfo{ + ChainSelector: 1, + FinalizedBlockHeight: big.NewInt(100), + Disabled: false, + } + + mockStatusMgr.On("ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors).Return(map[protocol.ChainSelector]*protocol.ChainStatusInfo{ + 1: chainStatusInfo, + }, nil) + + // Mock client returns error + mockClient.On("SendHeartbeat", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), mock.MatchedBy(func(blockHeights map[uint64]uint64) bool { + return true + })).Return(heartbeatclient.HeartbeatResponse{}, errors.New("connection refused")) + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 50*time.Millisecond, + ) + require.NoError(t, err) + + err = reporter.Start(ctx) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + err = reporter.Close() + require.NoError(t, err) +} + +func TestHeartbeatReporter_ChainStatusReadError(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1} + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Mock status manager returns error + mockStatusMgr.On("ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors).Return(nil, errors.New("database error")) + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 50*time.Millisecond, + ) + require.NoError(t, err) + + err = reporter.Start(ctx) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + err = reporter.Close() + require.NoError(t, err) + + // Verify that ReadChainStatuses was called at least once + mockStatusMgr.AssertCalled(t, "ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors) +} + +func TestHeartbeatReporter_MultipleChains(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1, 10, 100, 1000} + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Setup chain statuses for all selectors + statusMap := make(map[protocol.ChainSelector]*protocol.ChainStatusInfo) + for i, selector := range selectors { + statusMap[selector] = &protocol.ChainStatusInfo{ + ChainSelector: selector, + FinalizedBlockHeight: big.NewInt(int64(100 + i*100)), + Disabled: false, + } + } + + mockStatusMgr.On("ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors).Return(statusMap, nil) + + // Verify the request has all chain heights + mockClient.On("SendHeartbeat", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), mock.MatchedBy(func(blockHeights map[uint64]uint64) bool { + return len(blockHeights) == len(selectors) + })).Return(heartbeatclient.HeartbeatResponse{ + Timestamp: time.Now().Unix(), + AggregatorID: "test-aggregator", + ChainBenchmarks: map[uint64]heartbeatclient.ChainBenchmark{}, + }, nil) + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 50*time.Millisecond, + ) + require.NoError(t, err) + + err = reporter.Start(ctx) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + err = reporter.Close() + require.NoError(t, err) + + // Verify SendHeartbeat was called with all chains + mockClient.AssertCalled(t, "SendHeartbeat", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), mock.MatchedBy(func(blockHeights map[uint64]uint64) bool { + return len(blockHeights) == len(selectors) + })) +} + +func TestHeartbeatReporter_Name(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + selectors := []protocol.ChainSelector{1} + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "my-verifier", + 10*time.Second, + ) + require.NoError(t, err) + + name := reporter.Name() + assert.Contains(t, name, "my-verifier") + assert.Contains(t, name, "HeartbeatReporter") +} + +func TestHeartbeatReporter_HealthReport(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + selectors := []protocol.ChainSelector{1} + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 10*time.Second, + ) + require.NoError(t, err) + + report := reporter.HealthReport() + assert.NotNil(t, report) + assert.Greater(t, len(report), 0) +} + +func TestHeartbeatReporter_ContextCancellation(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1} + ctx, cancel := context.WithCancel(context.Background()) + + chainStatusInfo := &protocol.ChainStatusInfo{ + ChainSelector: 1, + FinalizedBlockHeight: big.NewInt(100), + Disabled: false, + } + + mockStatusMgr.On("ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors).Return(map[protocol.ChainSelector]*protocol.ChainStatusInfo{ + 1: chainStatusInfo, + }, nil) + + mockClient.On("SendHeartbeat", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), mock.MatchedBy(func(blockHeights map[uint64]uint64) bool { + return true + })).Return(heartbeatclient.HeartbeatResponse{ + Timestamp: time.Now().Unix(), + AggregatorID: "test-aggregator", + ChainBenchmarks: map[uint64]heartbeatclient.ChainBenchmark{}, + }, nil) + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 50*time.Millisecond, + ) + require.NoError(t, err) + + err = reporter.Start(ctx) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + + // Cancel context - should stop the reporter + cancel() + + time.Sleep(100 * time.Millisecond) + + err = reporter.Close() + require.NoError(t, err) +} + +func TestHeartbeatReporter_MissingChainStatus(t *testing.T) { + lggr := logger.Test(t) + mockClient := new(mockHeartbeatClient) + mockStatusMgr := mocks.NewMockChainStatusManager(t) + + selectors := []protocol.ChainSelector{1, 10} + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // Only return status for one chain (not the other) + statusMap := map[protocol.ChainSelector]*protocol.ChainStatusInfo{ + 1: { + ChainSelector: 1, + FinalizedBlockHeight: big.NewInt(100), + Disabled: false, + }, + } + + mockStatusMgr.On("ReadChainStatuses", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), selectors).Return(statusMap, nil) + + // Should send heartbeat with only the available chain + mockClient.On("SendHeartbeat", mock.MatchedBy(func(c context.Context) bool { + return c != nil + }), mock.MatchedBy(func(blockHeights map[uint64]uint64) bool { + // Should only have 1 chain since the other one is missing + return len(blockHeights) == 1 + })).Return(heartbeatclient.HeartbeatResponse{ + Timestamp: time.Now().Unix(), + AggregatorID: "test-aggregator", + ChainBenchmarks: map[uint64]heartbeatclient.ChainBenchmark{}, + }, nil) + + reporter, err := verifier.NewHeartbeatReporter( + lggr, + mockStatusMgr, + mockClient, + selectors, + "test-verifier", + 50*time.Millisecond, + ) + require.NoError(t, err) + + err = reporter.Start(ctx) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + err = reporter.Close() + require.NoError(t, err) +} diff --git a/verifier/helpers_test.go b/verifier/helpers_test.go index 515380e96..481662eef 100644 --- a/verifier/helpers_test.go +++ b/verifier/helpers_test.go @@ -158,6 +158,13 @@ func (m *noopMetricLabeler) IncrementStorageWriteErrors(ctx context.Context) func (m *noopMetricLabeler) RecordSourceChainLatestBlock(ctx context.Context, blockNum int64) {} func (m *noopMetricLabeler) RecordSourceChainFinalizedBlock(ctx context.Context, blockNum int64) {} func (m *noopMetricLabeler) RecordReorgTrackedSeqNums(ctx context.Context, count int64) {} +func (m *noopMetricLabeler) IncrementHeartbeatsSent(ctx context.Context) {} +func (m *noopMetricLabeler) IncrementHeartbeatsFailed(ctx context.Context) {} +func (m *noopMetricLabeler) RecordHeartbeatDuration(ctx context.Context, duration time.Duration) {} +func (m *noopMetricLabeler) SetVerifierHeartbeatTimestamp(ctx context.Context, timestamp int64) {} +func (m *noopMetricLabeler) SetVerifierHeartbeatSentChainHeads(ctx context.Context, height uint64) {} +func (m *noopMetricLabeler) SetVerifierHeartbeatChainHeads(ctx context.Context, height uint64) {} +func (m *noopMetricLabeler) SetVerifierHeartbeatScore(ctx context.Context, score float64) {} type NoopLatencyTracker struct{} diff --git a/verifier/interfaces.go b/verifier/interfaces.go index b2780417e..b79055ed8 100644 --- a/verifier/interfaces.go +++ b/verifier/interfaces.go @@ -75,6 +75,23 @@ type MetricLabeler interface { // IncrementStorageWriteErrors increments the counter for storage write errors. IncrementStorageWriteErrors(ctx context.Context) + // Heartbeat tracking + + // IncrementHeartbeatsSent increments the counter for successfully sent heartbeats. + IncrementHeartbeatsSent(ctx context.Context) + // IncrementHeartbeatsFailed increments the counter for failed heartbeat attempts. + IncrementHeartbeatsFailed(ctx context.Context) + // RecordHeartbeatDuration records the duration of a heartbeat request. + RecordHeartbeatDuration(ctx context.Context, duration time.Duration) + // SetVerifierHeartbeatTimestamp sets the timestamp from the heartbeat response. + SetVerifierHeartbeatTimestamp(ctx context.Context, timestamp int64) + // SetVerifierHeartbeatSentChainHeads sets the block height sent in the heartbeat request for a chain. + SetVerifierHeartbeatSentChainHeads(ctx context.Context, blockHeight uint64) + // SetVerifierHeartbeatChainHeads sets the block height for a chain from the heartbeat response. + SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) + // SetVerifierHeartbeatScore sets the score for a chain from the heartbeat response. + SetVerifierHeartbeatScore(ctx context.Context, score float64) + // Chain state tracking (for multi-chain monitoring) // RecordSourceChainLatestBlock records the latest block number for a source chain. diff --git a/verifier/pkg/monitoring/metrics.go b/verifier/pkg/monitoring/metrics.go index ef35f1f6d..65d14741e 100644 --- a/verifier/pkg/monitoring/metrics.go +++ b/verifier/pkg/monitoring/metrics.go @@ -35,6 +35,15 @@ type VerifierMetrics struct { // Error Tracking storageWriteErrorsCounter metric.Int64Counter + // Heartbeat Tracking + heartbeatsSentCounter metric.Int64Counter + heartbeatsFailedCounter metric.Int64Counter + heartbeatDurationSeconds metric.Float64Histogram + verifierHeartbeatTimestamp metric.Float64Gauge + verifierHeartbeatSentChainHeads metric.Int64Gauge + verifierHeartbeatChainHeads metric.Int64Gauge + verifierHeartbeatScore metric.Float64Gauge + // Chain State sourceChainLatestBlockGauge metric.Int64Gauge sourceChainFinalizedBlockGauge metric.Int64Gauge @@ -129,6 +138,64 @@ func InitMetrics() (*VerifierMetrics, error) { return nil, fmt.Errorf("failed to register storage write errors counter: %w", err) } + // Heartbeat Tracking + vm.heartbeatsSentCounter, err = beholder.GetMeter().Int64Counter( + "verifier_heartbeats_sent_total", + metric.WithDescription("Total number of successfully sent heartbeats"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register heartbeats sent counter: %w", err) + } + + vm.heartbeatsFailedCounter, err = beholder.GetMeter().Int64Counter( + "verifier_heartbeats_failed_total", + metric.WithDescription("Total number of failed heartbeat attempts"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register heartbeats failed counter: %w", err) + } + + vm.heartbeatDurationSeconds, err = beholder.GetMeter().Float64Histogram( + "verifier_heartbeat_duration_seconds", + metric.WithDescription("Duration of heartbeat requests"), + metric.WithUnit("seconds"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register heartbeat duration histogram: %w", err) + } + + vm.verifierHeartbeatTimestamp, err = beholder.GetMeter().Float64Gauge( + "verifier_heartbeat_timestamp", + metric.WithDescription("Timestamp from the heartbeat response"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat timestamp gauge: %w", err) + } + + vm.verifierHeartbeatSentChainHeads, err = beholder.GetMeter().Int64Gauge( + "verifier_heartbeat_sent_chain_heads", + metric.WithDescription("Block height sent in the heartbeat request for a chain"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat sent chain heads gauge: %w", err) + } + + vm.verifierHeartbeatChainHeads, err = beholder.GetMeter().Int64Gauge( + "verifier_heartbeat_chain_heads", + metric.WithDescription("Block height for a chain from the heartbeat response"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat chain heads gauge: %w", err) + } + + vm.verifierHeartbeatScore, err = beholder.GetMeter().Float64Gauge( + "verifier_heartbeat_score", + metric.WithDescription("Score for a chain from the heartbeat response"), + ) + if err != nil { + return nil, fmt.Errorf("failed to register verifier heartbeat score gauge: %w", err) + } + // Chain State vm.sourceChainLatestBlockGauge, err = beholder.GetMeter().Int64Gauge( "verifier_source_chain_latest_block", @@ -256,6 +323,41 @@ func (v *VerifierMetricLabeler) IncrementStorageWriteErrors(ctx context.Context) v.vm.storageWriteErrorsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } +func (v *VerifierMetricLabeler) IncrementHeartbeatsSent(ctx context.Context) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.heartbeatsSentCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (v *VerifierMetricLabeler) IncrementHeartbeatsFailed(ctx context.Context) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.heartbeatsFailedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (v *VerifierMetricLabeler) RecordHeartbeatDuration(ctx context.Context, duration time.Duration) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.heartbeatDurationSeconds.Record(ctx, duration.Seconds(), metric.WithAttributes(otelLabels...)) +} + +func (v *VerifierMetricLabeler) SetVerifierHeartbeatTimestamp(ctx context.Context, timestamp int64) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.verifierHeartbeatTimestamp.Record(ctx, float64(timestamp), metric.WithAttributes(otelLabels...)) +} + +func (v *VerifierMetricLabeler) SetVerifierHeartbeatSentChainHeads(ctx context.Context, blockHeight uint64) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.verifierHeartbeatSentChainHeads.Record(ctx, int64(blockHeight), metric.WithAttributes(otelLabels...)) // #nosec G115 -- block heights are within int64 range +} + +func (v *VerifierMetricLabeler) SetVerifierHeartbeatChainHeads(ctx context.Context, blockHeight uint64) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.verifierHeartbeatChainHeads.Record(ctx, int64(blockHeight), metric.WithAttributes(otelLabels...)) // #nosec G115 -- block heights are within int64 range +} + +func (v *VerifierMetricLabeler) SetVerifierHeartbeatScore(ctx context.Context, score float64) { + otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() + v.vm.verifierHeartbeatScore.Record(ctx, score, metric.WithAttributes(otelLabels...)) +} + func (v *VerifierMetricLabeler) RecordSourceChainLatestBlock(ctx context.Context, blockNum int64) { otelLabels := beholder.OtelAttributes(v.Labels).AsStringAttributes() v.vm.sourceChainLatestBlockGauge.Record(ctx, blockNum, metric.WithAttributes(otelLabels...)) diff --git a/verifier/pkg/monitoring/monitoring.go b/verifier/pkg/monitoring/monitoring.go index 25915e0c5..dc3fb6729 100644 --- a/verifier/pkg/monitoring/monitoring.go +++ b/verifier/pkg/monitoring/monitoring.go @@ -98,6 +98,20 @@ func (f *FakeVerifierMetricLabeler) IncrementMessagesProcessed(context.Context) func (f *FakeVerifierMetricLabeler) IncrementMessagesVerificationFailed(context.Context) {} +func (f *FakeVerifierMetricLabeler) IncrementHeartbeatsSent(context.Context) {} + +func (f *FakeVerifierMetricLabeler) IncrementHeartbeatsFailed(context.Context) {} + +func (f *FakeVerifierMetricLabeler) RecordHeartbeatDuration(context.Context, time.Duration) {} + +func (f *FakeVerifierMetricLabeler) SetVerifierHeartbeatTimestamp(context.Context, int64) {} + +func (f *FakeVerifierMetricLabeler) SetVerifierHeartbeatSentChainHeads(context.Context, uint64) {} + +func (f *FakeVerifierMetricLabeler) SetVerifierHeartbeatChainHeads(context.Context, uint64) {} + +func (f *FakeVerifierMetricLabeler) SetVerifierHeartbeatScore(context.Context, float64) {} + func (f *FakeVerifierMetricLabeler) RecordFinalityWaitDuration(context.Context, time.Duration) {} func (f *FakeVerifierMetricLabeler) RecordMessageVerificationDuration(context.Context, time.Duration) { diff --git a/verifier/types.go b/verifier/types.go index a31b733ac..1d6e8439d 100644 --- a/verifier/types.go +++ b/verifier/types.go @@ -41,6 +41,7 @@ type CoordinatorConfig struct { StorageBatchTimeout time.Duration `json:"storage_batch_timeout"` // Maximum duration to wait before flushing incomplete storage batch (default: 100ms) StorageRetryDelay time.Duration `json:"storage_retry_delay"` // Delay before retrying failed storage writes (default: 2s) CursePollInterval time.Duration `json:"curse_poll_interval"` // How often to poll RMN Remote contracts for curse status (default: 2s) + HeartbeatInterval time.Duration `json:"heartbeat_interval"` // How often to send heartbeat to aggregator (default: 10s, 0 disables heartbeat) } // VerificationError represents an error that occurred during message verification. diff --git a/verifier/verification_coordinator.go b/verifier/verification_coordinator.go index 929af58e8..fd6e0121c 100644 --- a/verifier/verification_coordinator.go +++ b/verifier/verification_coordinator.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-ccv/common" cursecheckerimpl "github.com/smartcontractkit/chainlink-ccv/integration/pkg/cursechecker" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -30,6 +31,8 @@ type Coordinator struct { taskVerifierProcessor *TaskVerifierProcessor // 3rd step processor: storage writer storageWriterProcessor *StorageWriterProcessor + // Heartbeat reporter: periodically sends chain statuses to aggregator + heartbeatReporter *HeartbeatReporter } func NewCoordinator( @@ -42,6 +45,7 @@ func NewCoordinator( messageTracker MessageLatencyTracker, monitoring Monitoring, chainStatusManager protocol.ChainStatusManager, + heartbeatClient heartbeatclient.HeartbeatSender, ) (*Coordinator, error) { return NewCoordinatorWithDetector( ctx, @@ -54,6 +58,7 @@ func NewCoordinator( monitoring, chainStatusManager, nil, + heartbeatClient, ) } @@ -68,6 +73,7 @@ func NewCoordinatorWithDetector( monitoring Monitoring, chainStatusManager protocol.ChainStatusManager, detector common.CurseCheckerService, + heartbeatClient heartbeatclient.HeartbeatSender, ) (*Coordinator, error) { enabledSourceReaders, err := filterOnlyEnabledSourceReaders(ctx, lggr, config, sourceReaders, chainStatusManager) if err != nil { @@ -103,6 +109,28 @@ func NewCoordinatorWithDetector( return nil, fmt.Errorf("failed to create or/and start task verifier service: %w", err) } + var heartbeatReporter *HeartbeatReporter + + if heartbeatClient != nil && config.HeartbeatInterval > 0 { + // Collect all chain selectors from source readers. + allSelectors := make([]protocol.ChainSelector, 0, len(sourceReaders)) + for selector := range sourceReaders { + allSelectors = append(allSelectors, selector) + } + + heartbeatReporter, err = NewHeartbeatReporter( + logger.With(lggr, "component", "HeartbeatReporter"), + chainStatusManager, + heartbeatClient, + allSelectors, + config.VerifierID, + config.HeartbeatInterval, + ) + if err != nil { + return nil, fmt.Errorf("failed to create heartbeat reporter: %w", err) + } + } + return &Coordinator{ lggr: lggr, verifierID: config.VerifierID, @@ -110,6 +138,7 @@ func NewCoordinatorWithDetector( curseDetector: curseDetector, storageWriterProcessor: storageWriterProcessor, taskVerifierProcessor: taskVerifierProcessor, + heartbeatReporter: heartbeatReporter, }, nil } @@ -150,6 +179,13 @@ func (vc *Coordinator) Start(_ context.Context) error { } } + if vc.heartbeatReporter != nil { + if err := vc.heartbeatReporter.Start(ctx); err != nil { + vc.lggr.Errorw("Failed to start heartbeat reporter", "error", err) + return fmt.Errorf("failed to start heartbeat reporter: %w", err) + } + } + vc.lggr.Infow("Coordinator started successfully") return nil }) @@ -243,6 +279,14 @@ func (vc *Coordinator) Close() error { vc.cancel() errs := make([]error, 0) + + if vc.heartbeatReporter != nil { + if err := vc.heartbeatReporter.Close(); err != nil { + vc.lggr.Errorw("Failed to stop heartbeat reporter", "error", err) + errs = append(errs, fmt.Errorf("failed to stop heartbeat reporter: %w", err)) + } + } + if vc.curseDetector != nil { if err := vc.curseDetector.Close(); err != nil { vc.lggr.Errorw("Failed to stop curse detector", "error", err) diff --git a/verifier/verification_coordinator_cctp_test.go b/verifier/verification_coordinator_cctp_test.go index 46aeac85e..84ba2a15d 100644 --- a/verifier/verification_coordinator_cctp_test.go +++ b/verifier/verification_coordinator_cctp_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -490,6 +492,7 @@ func createCCTPCoordinator( noopLatencyTracker, noopMonitoring, ts.chainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) } diff --git a/verifier/verification_coordinator_curse_test.go b/verifier/verification_coordinator_curse_test.go index 4d3d3e5ee..386bc1a38 100644 --- a/verifier/verification_coordinator_curse_test.go +++ b/verifier/verification_coordinator_curse_test.go @@ -9,6 +9,8 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/smartcontractkit/chainlink-ccv/internal/mocks" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" @@ -127,6 +129,7 @@ func setupCurseTest(t *testing.T, sourceChain, destChain protocol.ChainSelector, &noopMonitoring{}, setup.chainStatusManager, setup.mockCurseChecker, + heartbeatclient.NewNoopHeartbeatClient(), ) require.NoError(t, err) setup.coordinator = coordinator diff --git a/verifier/verification_coordinator_finality_test.go b/verifier/verification_coordinator_finality_test.go index 1968dbd2d..d19e8e4e0 100644 --- a/verifier/verification_coordinator_finality_test.go +++ b/verifier/verification_coordinator_finality_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" "github.com/smartcontractkit/chainlink-ccv/internal/mocks" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" "github.com/smartcontractkit/chainlink-ccv/protocol" @@ -355,6 +356,7 @@ func initializeCoordinator(t *testing.T, verifierID string) *coordinatorTestSetu &NoopLatencyTracker{}, &noopMonitoring{}, mockChainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) require.NoError(t, err) diff --git a/verifier/verification_coordinator_lbtc_test.go b/verifier/verification_coordinator_lbtc_test.go index c4af5a650..77242737e 100644 --- a/verifier/verification_coordinator_lbtc_test.go +++ b/verifier/verification_coordinator_lbtc_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-ccv/pkg/chainaccess" @@ -276,6 +278,7 @@ func createLBTCCoordinator( noopLatencyTracker, noopMonitoring, ts.chainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) } diff --git a/verifier/verification_coordinator_test.go b/verifier/verification_coordinator_test.go index 12bc704ff..b6d3d1e36 100644 --- a/verifier/verification_coordinator_test.go +++ b/verifier/verification_coordinator_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-ccv/integration/pkg/heartbeatclient" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -150,6 +152,7 @@ func createVerificationCoordinator( noopLatencyTracker, noopMonitoring, ts.chainStatusManager, + heartbeatclient.NewNoopHeartbeatClient(), ) }