From e5522f2a364ddceb7a05285391a6364d0e9501a7 Mon Sep 17 00:00:00 2001 From: Sangeet Mishra Date: Fri, 16 Jan 2026 14:32:12 +0530 Subject: [PATCH] Rename metric from connect.task.rebalance.exempt to connect.task.dnd Updated the metric name and related class members from RebalanceExempt to Dnd (do-not-disturb) to better reflect the metric's purpose. This change includes: - Renamed connectTaskRebalanceExempt field to connectTaskDnd in TaskStateMetrics - Updated getter/setter methods and MXBean interface accordingly - Updated all usages in SnapshotMeter - Updated test method name and assertions in AbstractMetricsTest --- .../debezium/pipeline/meters/SnapshotMeter.java | 10 +++++----- .../pipeline/metrics/TaskStateMetrics.java | 16 ++++++++-------- .../pipeline/metrics/TaskStateMetricsMXBean.java | 6 +++--- .../debezium/pipeline/AbstractMetricsTest.java | 16 ++++++++-------- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java b/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java index 740ca75f9bf..b02982261e1 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java @@ -135,7 +135,7 @@ public void snapshotStarted() { this.snapshotCompleted.set(0); this.snapshotAborted.set(0); this.snapshotSkipped.set(false); - this.taskStateMetrics.setConnectTaskRebalanceExempt(1); + this.taskStateMetrics.setConnectTaskDnd(1); this.startTime.set(clock.currentTimeInMillis()); this.stopTime.set(0L); this.startPauseTime.set(0); @@ -149,7 +149,7 @@ public void snapshotPaused() { this.snapshotCompleted.set(0); this.snapshotAborted.set(0); this.snapshotSkipped.set(false); - this.taskStateMetrics.setConnectTaskRebalanceExempt(0); + this.taskStateMetrics.setConnectTaskDnd(0); this.startPauseTime.set(clock.currentTimeInMillis()); this.stopPauseTime.set(0L); } @@ -160,7 +160,7 @@ public void snapshotResumed() { this.snapshotCompleted.set(0); this.snapshotAborted.set(0); this.snapshotSkipped.set(false); - this.taskStateMetrics.setConnectTaskRebalanceExempt(1); + this.taskStateMetrics.setConnectTaskDnd(1); final long currTime = clock.currentTimeInMillis(); this.stopPauseTime.set(currTime); @@ -182,7 +182,7 @@ public void snapshotCompleted() { this.snapshotRunning.set(0); this.snapshotPaused.set(0); this.snapshotSkipped.set(false); - this.taskStateMetrics.setConnectTaskRebalanceExempt(0); + this.taskStateMetrics.setConnectTaskDnd(0); this.stopTime.set(clock.currentTimeInMillis()); } @@ -203,7 +203,7 @@ public void snapshotSkipped() { this.snapshotAborted.set(1); this.snapshotRunning.set(0); this.snapshotPaused.set(0); - this.taskStateMetrics.setConnectTaskRebalanceExempt(0); + this.taskStateMetrics.setConnectTaskDnd(0); this.stopTime.set(clock.currentTimeInMillis()); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetrics.java index 33ff539c64e..81d2fc0bc93 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetrics.java @@ -19,27 +19,27 @@ @ThreadSafe public class TaskStateMetrics extends Metrics implements TaskStateMetricsMXBean { - private final AtomicLong connectTaskRebalanceExempt = new AtomicLong(); + private final AtomicLong connectTaskDnd = new AtomicLong(); public TaskStateMetrics(CdcSourceTaskContext taskContext) { super(taskContext, "task"); } @Override - public long getConnectTaskRebalanceExempt() { - return connectTaskRebalanceExempt.get(); + public long getConnectTaskDnd() { + return connectTaskDnd.get(); } /** - * Sets the rebalance exemption status. + * Sets the do-not-disturb status. * - * @param exempt 1 if the task should be exempt from rebalancing, 0 otherwise + * @param dnd 1 if the task should not be disturbed, 0 otherwise */ - public void setConnectTaskRebalanceExempt(long exempt) { - connectTaskRebalanceExempt.set(exempt); + public void setConnectTaskDnd(long dnd) { + connectTaskDnd.set(dnd); } public void reset() { - connectTaskRebalanceExempt.set(0); + connectTaskDnd.set(0); } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetricsMXBean.java index 30a5485d1ca..8544dcb6435 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/TaskStateMetricsMXBean.java @@ -13,9 +13,9 @@ public interface TaskStateMetricsMXBean { /** - * Gets the current rebalance exemption status. + * Gets the current do-not-disturb status. * - * @return 1 if the task is exempt from rebalancing, 0 otherwise + * @return 1 if the task should not be disturbed, 0 otherwise */ - long getConnectTaskRebalanceExempt(); + long getConnectTaskDnd(); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java index a1d6c7b632f..c041a607a46 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java @@ -319,7 +319,7 @@ protected void assertStreamingWithCustomMetrics(Map customMetric } @Test - public void testConnectTaskRebalanceExemptMetric() throws Exception { + public void testConnectTaskDndMetric() throws Exception { executeInsertStatements(); final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -347,7 +347,7 @@ public void testConnectTaskRebalanceExemptMetric() throws Exception { // Wait for MBeans to be registered and snapshot to start // Poll frequently (5ms) to catch snapshot while running since it completes quickly - Awaitility.await("Waiting for snapshot to start and rebalance exempt to be set") + Awaitility.await("Waiting for snapshot to start and DND to be set") .atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS) .pollInterval(5, TimeUnit.MILLISECONDS) .ignoreException(InstanceNotFoundException.class) @@ -359,12 +359,12 @@ public void testConnectTaskRebalanceExemptMetric() throws Exception { Object snapshotRunning = mBeanServer.getAttribute(snapshotMetricsObjectName, "SnapshotRunning"); - Object rebalanceExempt = mBeanServer.getAttribute(taskMetricsObjectName, - "ConnectTaskRebalanceExempt"); + Object dnd = mBeanServer.getAttribute(taskMetricsObjectName, + "ConnectTaskDnd"); // During snapshot, both should be 1 return Long.valueOf(1).equals(snapshotRunning) - && Long.valueOf(1).equals(rebalanceExempt); + && Long.valueOf(1).equals(dnd); } catch (InstanceNotFoundException e) { return false; @@ -374,9 +374,9 @@ public void testConnectTaskRebalanceExemptMetric() throws Exception { // Wait for snapshot to complete waitForSnapshotToBeCompleted(connector(), server(), task(), database()); - // After snapshot: ConnectTaskRebalanceExempt should be 0 - assertThat(mBeanServer.getAttribute(taskMetricsObjectName, "ConnectTaskRebalanceExempt")) - .as("ConnectTaskRebalanceExempt should be 0 after snapshot completes") + // After snapshot: ConnectTaskDnd should be 0 + assertThat(mBeanServer.getAttribute(taskMetricsObjectName, "ConnectTaskDnd")) + .as("ConnectTaskDnd should be 0 after snapshot completes") .isEqualTo(0L); }