From 2f5216d336d61ebd0313524e4e8a293484fb0f15 Mon Sep 17 00:00:00 2001 From: Bhagyashree Date: Wed, 21 Jan 2026 18:24:18 +0530 Subject: [PATCH 1/2] Updates SnapshotSkipped metrics to use integer, changes extended header default to false --- .../SqlServerSnapshotPartitionMetrics.java | 2 +- .../config/CommonConnectorConfig.java | 2 +- .../pipeline/meters/SnapshotMeter.java | 19 +++++++++---------- ...faultSnapshotChangeEventSourceMetrics.java | 2 +- .../metrics/traits/SnapshotMetricsMXBean.java | 2 +- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java index 2ebe98e2e39..7181abf1ca8 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java @@ -56,7 +56,7 @@ public long getSnapshotAborted() { } @Override - public boolean getSnapshotSkipped() { + public long getSnapshotSkipped() { return snapshotMeter.getSnapshotSkipped(); } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 431232cef94..24cb9f0dea0 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -1353,7 +1353,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 46)) .withType(Type.BOOLEAN) .withWidth(Width.SHORT) - .withDefault(true) + .withDefault(false) .withImportance(ConfigDef.Importance.LOW) .withValidation(Field::isBoolean) .withDescription( diff --git a/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java b/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java index fbd933b291a..1a82dabc0dd 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java @@ -11,7 +11,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -35,7 +34,7 @@ public class SnapshotMeter implements SnapshotMetricsMXBean { private final AtomicLong snapshotPaused = new AtomicLong(); private final AtomicLong snapshotCompleted = new AtomicLong(); private final AtomicLong snapshotAborted = new AtomicLong(); - private final AtomicBoolean snapshotSkipped = new AtomicBoolean(); + private final AtomicLong snapshotSkipped = new AtomicLong(); private final AtomicLong startTime = new AtomicLong(); private final AtomicLong stopTime = new AtomicLong(); private final AtomicLong startPauseTime = new AtomicLong(); @@ -90,7 +89,7 @@ public long getSnapshotAborted() { } @Override - public boolean getSnapshotSkipped() { + public long getSnapshotSkipped() { return this.snapshotSkipped.get(); } @@ -135,7 +134,7 @@ public void snapshotStarted() { this.snapshotPaused.set(0); this.snapshotCompleted.set(0); this.snapshotAborted.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.startTime.set(clock.currentTimeInMillis()); this.stopTime.set(0L); this.startPauseTime.set(0); @@ -148,7 +147,7 @@ public void snapshotPaused() { this.snapshotPaused.set(1); this.snapshotCompleted.set(0); this.snapshotAborted.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.startPauseTime.set(clock.currentTimeInMillis()); this.stopPauseTime.set(0L); } @@ -158,7 +157,7 @@ public void snapshotResumed() { this.snapshotPaused.set(0); this.snapshotCompleted.set(0); this.snapshotAborted.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); final long currTime = clock.currentTimeInMillis(); this.stopPauseTime.set(currTime); @@ -179,7 +178,7 @@ public void snapshotCompleted() { this.snapshotAborted.set(0); this.snapshotRunning.set(0); this.snapshotPaused.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.stopTime.set(clock.currentTimeInMillis()); } @@ -188,14 +187,14 @@ public void snapshotAborted() { this.snapshotAborted.set(1); this.snapshotRunning.set(0); this.snapshotPaused.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.stopTime.set(clock.currentTimeInMillis()); } public void snapshotSkipped() { // snapshotSkipped is an additive metric on top of the other snapshot metrics, // it doesn't reset the state of the other metrics - this.snapshotSkipped.set(true); + this.snapshotSkipped.set(1); this.snapshotCompleted.set(0); this.snapshotAborted.set(1); this.snapshotRunning.set(0); @@ -258,7 +257,7 @@ public void reset() { snapshotPaused.set(0); snapshotCompleted.set(0); snapshotAborted.set(0); - snapshotSkipped.set(false); + snapshotSkipped.set(0); startTime.set(0); stopTime.set(0); startPauseTime.set(0); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java index 5ae253100d1..4c1bce7c391 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java @@ -71,7 +71,7 @@ public long getSnapshotAborted() { } @Override - public boolean getSnapshotSkipped() { + public long getSnapshotSkipped() { return snapshotMeter.getSnapshotSkipped(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java index 2e9a964c5ca..1c82b7707b2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java @@ -24,7 +24,7 @@ public interface SnapshotMetricsMXBean extends SchemaMetricsMXBean { long getSnapshotAborted(); - boolean getSnapshotSkipped(); + long getSnapshotSkipped(); long getSnapshotDurationInSeconds(); From 1c3cca8213942a51a735d16d9a7de1b9101423b9 Mon Sep 17 00:00:00 2001 From: Bhagyashree Date: Thu, 22 Jan 2026 12:04:20 +0530 Subject: [PATCH 2/2] test fixes --- .../io/debezium/connector/postgresql/OpenLineageIT.java | 1 + .../transforms/outbox/AbstractEventRouterTest.java | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java index b8ab60b0f85..7d22e338471 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java @@ -263,6 +263,7 @@ public void shouldProduceOpenLineageOutputDataset() throws Exception { .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()) .with("schema.history.internal.kafka.bootstrap.servers", "test-kafka:9092") .with("openlineage.integration.enabled", true) + .with("extended.headers.enabled", true) .with("openlineage.integration.config.file.path", getClass().getClassLoader().getResource("openlineage/openlineage.yml").getPath()) .with("openlineage.integration.job.description", "This connector does cdc for products") .with("openlineage.integration.job.tags", "env=prod,team=cdc") diff --git a/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java b/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java index 61333ba4f7f..34c5c1dc21e 100644 --- a/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java +++ b/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java @@ -260,7 +260,7 @@ public void shouldSupportAllFeatures() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers).hasSize(5); + assertThat(headers).hasSize(2); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("f9171eb6-19f3-4579-9206-0e179d2ebad7")); @@ -356,7 +356,7 @@ public void shouldNotProduceTombstoneEventForNullPayload() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers.size()).isEqualTo(5); + assertThat(headers.size()).isEqualTo(2); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218")); @@ -418,7 +418,7 @@ public void shouldProduceTombstoneEventForNullPayload() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers.size()).isEqualTo(5); + assertThat(headers.size()).isEqualTo(2); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218")); @@ -467,7 +467,7 @@ public void shouldProduceTombstoneEventForEmptyPayload() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers.size()).isEqualTo(4); + assertThat(headers.size()).isEqualTo(1); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));