Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void snapshotStarted() {
this.snapshotCompleted.set(0);
this.snapshotAborted.set(0);
this.snapshotSkipped.set(false);
this.taskStateMetrics.setConnectTaskRebalanceExempt(1);
this.taskStateMetrics.setConnectTaskDnd(1);
this.startTime.set(clock.currentTimeInMillis());
this.stopTime.set(0L);
this.startPauseTime.set(0);
Expand All @@ -149,7 +149,7 @@ public void snapshotPaused() {
this.snapshotCompleted.set(0);
this.snapshotAborted.set(0);
this.snapshotSkipped.set(false);
this.taskStateMetrics.setConnectTaskRebalanceExempt(0);
this.taskStateMetrics.setConnectTaskDnd(0);
this.startPauseTime.set(clock.currentTimeInMillis());
this.stopPauseTime.set(0L);
}
Expand All @@ -160,7 +160,7 @@ public void snapshotResumed() {
this.snapshotCompleted.set(0);
this.snapshotAborted.set(0);
this.snapshotSkipped.set(false);
this.taskStateMetrics.setConnectTaskRebalanceExempt(1);
this.taskStateMetrics.setConnectTaskDnd(1);
final long currTime = clock.currentTimeInMillis();
this.stopPauseTime.set(currTime);

Expand All @@ -182,7 +182,7 @@ public void snapshotCompleted() {
this.snapshotRunning.set(0);
this.snapshotPaused.set(0);
this.snapshotSkipped.set(false);
this.taskStateMetrics.setConnectTaskRebalanceExempt(0);
this.taskStateMetrics.setConnectTaskDnd(0);
this.stopTime.set(clock.currentTimeInMillis());
}

Expand All @@ -203,7 +203,7 @@ public void snapshotSkipped() {
this.snapshotAborted.set(1);
this.snapshotRunning.set(0);
this.snapshotPaused.set(0);
this.taskStateMetrics.setConnectTaskRebalanceExempt(0);
this.taskStateMetrics.setConnectTaskDnd(0);
this.stopTime.set(clock.currentTimeInMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@
@ThreadSafe
public class TaskStateMetrics extends Metrics implements TaskStateMetricsMXBean {

private final AtomicLong connectTaskRebalanceExempt = new AtomicLong();
private final AtomicLong connectTaskDnd = new AtomicLong();

public TaskStateMetrics(CdcSourceTaskContext taskContext) {
super(taskContext, "task");
}

@Override
public long getConnectTaskRebalanceExempt() {
return connectTaskRebalanceExempt.get();
public long getConnectTaskDnd() {
return connectTaskDnd.get();
}

/**
* Sets the rebalance exemption status.
* Sets the do-not-disturb status.
*
* @param exempt 1 if the task should be exempt from rebalancing, 0 otherwise
* @param dnd 1 if the task should not be disturbed, 0 otherwise
*/
public void setConnectTaskRebalanceExempt(long exempt) {
connectTaskRebalanceExempt.set(exempt);
public void setConnectTaskDnd(long dnd) {
connectTaskDnd.set(dnd);
}

public void reset() {
connectTaskRebalanceExempt.set(0);
connectTaskDnd.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
public interface TaskStateMetricsMXBean {

/**
* Gets the current rebalance exemption status.
* Gets the current do-not-disturb status.
*
* @return 1 if the task is exempt from rebalancing, 0 otherwise
* @return 1 if the task should not be disturbed, 0 otherwise
*/
long getConnectTaskRebalanceExempt();
long getConnectTaskDnd();
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ protected void assertStreamingWithCustomMetrics(Map<String, String> customMetric
}

@Test
public void testConnectTaskRebalanceExemptMetric() throws Exception {
public void testConnectTaskDndMetric() throws Exception {
executeInsertStatements();

final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Expand Down Expand Up @@ -347,7 +347,7 @@ public void testConnectTaskRebalanceExemptMetric() throws Exception {

// Wait for MBeans to be registered and snapshot to start
// Poll frequently (5ms) to catch snapshot while running since it completes quickly
Awaitility.await("Waiting for snapshot to start and rebalance exempt to be set")
Awaitility.await("Waiting for snapshot to start and DND to be set")
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
.pollInterval(5, TimeUnit.MILLISECONDS)
.ignoreException(InstanceNotFoundException.class)
Expand All @@ -359,12 +359,12 @@ public void testConnectTaskRebalanceExemptMetric() throws Exception {

Object snapshotRunning = mBeanServer.getAttribute(snapshotMetricsObjectName,
"SnapshotRunning");
Object rebalanceExempt = mBeanServer.getAttribute(taskMetricsObjectName,
"ConnectTaskRebalanceExempt");
Object dnd = mBeanServer.getAttribute(taskMetricsObjectName,
"ConnectTaskDnd");

// During snapshot, both should be 1
return Long.valueOf(1).equals(snapshotRunning)
&& Long.valueOf(1).equals(rebalanceExempt);
&& Long.valueOf(1).equals(dnd);
}
catch (InstanceNotFoundException e) {
return false;
Expand All @@ -374,9 +374,9 @@ public void testConnectTaskRebalanceExemptMetric() throws Exception {
// Wait for snapshot to complete
waitForSnapshotToBeCompleted(connector(), server(), task(), database());

// After snapshot: ConnectTaskRebalanceExempt should be 0
assertThat(mBeanServer.getAttribute(taskMetricsObjectName, "ConnectTaskRebalanceExempt"))
.as("ConnectTaskRebalanceExempt should be 0 after snapshot completes")
// After snapshot: ConnectTaskDnd should be 0
assertThat(mBeanServer.getAttribute(taskMetricsObjectName, "ConnectTaskDnd"))
.as("ConnectTaskDnd should be 0 after snapshot completes")
.isEqualTo(0L);
}

Expand Down