diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java index b8ab60b0f85..7d22e338471 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OpenLineageIT.java @@ -263,6 +263,7 @@ public void shouldProduceOpenLineageOutputDataset() throws Exception { .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()) .with("schema.history.internal.kafka.bootstrap.servers", "test-kafka:9092") .with("openlineage.integration.enabled", true) + .with("extended.headers.enabled", true) .with("openlineage.integration.config.file.path", getClass().getClassLoader().getResource("openlineage/openlineage.yml").getPath()) .with("openlineage.integration.job.description", "This connector does cdc for products") .with("openlineage.integration.job.tags", "env=prod,team=cdc") diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java index d005024f849..cd99d3e904a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/metrics/SqlServerSnapshotPartitionMetrics.java @@ -58,7 +58,7 @@ public long getSnapshotAborted() { } @Override - public boolean getSnapshotSkipped() { + public long getSnapshotSkipped() { return snapshotMeter.getSnapshotSkipped(); } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 431232cef94..24cb9f0dea0 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -1353,7 +1353,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 46)) .withType(Type.BOOLEAN) .withWidth(Width.SHORT) - .withDefault(true) + .withDefault(false) .withImportance(ConfigDef.Importance.LOW) .withValidation(Field::isBoolean) .withDescription( diff --git a/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java b/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java index 740ca75f9bf..5111dac96ee 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/meters/SnapshotMeter.java @@ -11,7 +11,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -32,7 +31,7 @@ public class SnapshotMeter implements SnapshotMetricsMXBean { private final AtomicLong snapshotPaused = new AtomicLong(); private final AtomicLong snapshotCompleted = new AtomicLong(); private final AtomicLong snapshotAborted = new AtomicLong(); - private final AtomicBoolean snapshotSkipped = new AtomicBoolean(); + private final AtomicLong snapshotSkipped = new AtomicLong(); private final AtomicLong startTime = new AtomicLong(); private final AtomicLong stopTime = new AtomicLong(); private final AtomicLong startPauseTime = new AtomicLong(); @@ -89,7 +88,7 @@ public long getSnapshotAborted() { } @Override - public boolean getSnapshotSkipped() { + public long getSnapshotSkipped() { return this.snapshotSkipped.get(); } @@ -134,7 +133,7 @@ public void snapshotStarted() { this.snapshotPaused.set(0); this.snapshotCompleted.set(0); this.snapshotAborted.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.taskStateMetrics.setConnectTaskRebalanceExempt(1); this.startTime.set(clock.currentTimeInMillis()); this.stopTime.set(0L); @@ -148,7 +147,7 @@ public void snapshotPaused() { this.snapshotPaused.set(1); this.snapshotCompleted.set(0); this.snapshotAborted.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.taskStateMetrics.setConnectTaskRebalanceExempt(0); this.startPauseTime.set(clock.currentTimeInMillis()); this.stopPauseTime.set(0L); @@ -159,7 +158,7 @@ public void snapshotResumed() { this.snapshotPaused.set(0); this.snapshotCompleted.set(0); this.snapshotAborted.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.taskStateMetrics.setConnectTaskRebalanceExempt(1); final long currTime = clock.currentTimeInMillis(); this.stopPauseTime.set(currTime); @@ -181,7 +180,7 @@ public void snapshotCompleted() { this.snapshotAborted.set(0); this.snapshotRunning.set(0); this.snapshotPaused.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.taskStateMetrics.setConnectTaskRebalanceExempt(0); this.stopTime.set(clock.currentTimeInMillis()); } @@ -191,14 +190,14 @@ public void snapshotAborted() { this.snapshotAborted.set(1); this.snapshotRunning.set(0); this.snapshotPaused.set(0); - this.snapshotSkipped.set(false); + this.snapshotSkipped.set(0); this.stopTime.set(clock.currentTimeInMillis()); } public void snapshotSkipped() { // snapshotSkipped is an additive metric on top of the other snapshot metrics, // it doesn't reset the state of the other metrics - this.snapshotSkipped.set(true); + this.snapshotSkipped.set(1); this.snapshotCompleted.set(0); this.snapshotAborted.set(1); this.snapshotRunning.set(0); @@ -262,7 +261,7 @@ public void reset() { snapshotPaused.set(0); snapshotCompleted.set(0); snapshotAborted.set(0); - snapshotSkipped.set(false); + snapshotSkipped.set(0); startTime.set(0); stopTime.set(0); startPauseTime.set(0); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java index ed850d87862..11f03c77e12 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/DefaultSnapshotChangeEventSourceMetrics.java @@ -76,7 +76,7 @@ public long getSnapshotAborted() { } @Override - public boolean getSnapshotSkipped() { + public long getSnapshotSkipped() { return snapshotMeter.getSnapshotSkipped(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java index 2e9a964c5ca..1c82b7707b2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/traits/SnapshotMetricsMXBean.java @@ -24,7 +24,7 @@ public interface SnapshotMetricsMXBean extends SchemaMetricsMXBean { long getSnapshotAborted(); - boolean getSnapshotSkipped(); + long getSnapshotSkipped(); long getSnapshotDurationInSeconds(); diff --git a/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java b/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java index 61333ba4f7f..34c5c1dc21e 100644 --- a/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java +++ b/debezium-embedded/src/test/java/io/debezium/transforms/outbox/AbstractEventRouterTest.java @@ -260,7 +260,7 @@ public void shouldSupportAllFeatures() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers).hasSize(5); + assertThat(headers).hasSize(2); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("f9171eb6-19f3-4579-9206-0e179d2ebad7")); @@ -356,7 +356,7 @@ public void shouldNotProduceTombstoneEventForNullPayload() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers.size()).isEqualTo(5); + assertThat(headers.size()).isEqualTo(2); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218")); @@ -418,7 +418,7 @@ public void shouldProduceTombstoneEventForNullPayload() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers.size()).isEqualTo(5); + assertThat(headers.size()).isEqualTo(2); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218")); @@ -467,7 +467,7 @@ public void shouldProduceTombstoneEventForEmptyPayload() throws Exception { // Validate headers Headers headers = routedEvent.headers(); - assertThat(headers.size()).isEqualTo(4); + assertThat(headers.size()).isEqualTo(1); Header headerId = headers.lastWithName("id"); assertThat(headerId.schema()).isEqualTo(getIdSchema()); assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));