Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public void shouldProduceOpenLineageOutputDataset() throws Exception {
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue())
.with("schema.history.internal.kafka.bootstrap.servers", "test-kafka:9092")
.with("openlineage.integration.enabled", true)
.with("extended.headers.enabled", true)
.with("openlineage.integration.config.file.path", getClass().getClassLoader().getResource("openlineage/openlineage.yml").getPath())
.with("openlineage.integration.job.description", "This connector does cdc for products")
.with("openlineage.integration.job.tags", "env=prod,team=cdc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public long getSnapshotAborted() {
}

@Override
public boolean getSnapshotSkipped() {
public long getSnapshotSkipped() {
return snapshotMeter.getSnapshotSkipped();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 46))
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withDefault(true)
.withDefault(false)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(Field::isBoolean)
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -32,7 +31,7 @@ public class SnapshotMeter implements SnapshotMetricsMXBean {
private final AtomicLong snapshotPaused = new AtomicLong();
private final AtomicLong snapshotCompleted = new AtomicLong();
private final AtomicLong snapshotAborted = new AtomicLong();
private final AtomicBoolean snapshotSkipped = new AtomicBoolean();
private final AtomicLong snapshotSkipped = new AtomicLong();
private final AtomicLong startTime = new AtomicLong();
private final AtomicLong stopTime = new AtomicLong();
private final AtomicLong startPauseTime = new AtomicLong();
Expand Down Expand Up @@ -89,7 +88,7 @@ public long getSnapshotAborted() {
}

@Override
public boolean getSnapshotSkipped() {
public long getSnapshotSkipped() {
return this.snapshotSkipped.get();
}

Expand Down Expand Up @@ -134,7 +133,7 @@ public void snapshotStarted() {
this.snapshotPaused.set(0);
this.snapshotCompleted.set(0);
this.snapshotAborted.set(0);
this.snapshotSkipped.set(false);
this.snapshotSkipped.set(0);
this.taskStateMetrics.setConnectTaskRebalanceExempt(1);
this.startTime.set(clock.currentTimeInMillis());
this.stopTime.set(0L);
Expand All @@ -148,7 +147,7 @@ public void snapshotPaused() {
this.snapshotPaused.set(1);
this.snapshotCompleted.set(0);
this.snapshotAborted.set(0);
this.snapshotSkipped.set(false);
this.snapshotSkipped.set(0);
this.taskStateMetrics.setConnectTaskRebalanceExempt(0);
this.startPauseTime.set(clock.currentTimeInMillis());
this.stopPauseTime.set(0L);
Expand All @@ -159,7 +158,7 @@ public void snapshotResumed() {
this.snapshotPaused.set(0);
this.snapshotCompleted.set(0);
this.snapshotAborted.set(0);
this.snapshotSkipped.set(false);
this.snapshotSkipped.set(0);
this.taskStateMetrics.setConnectTaskRebalanceExempt(1);
final long currTime = clock.currentTimeInMillis();
this.stopPauseTime.set(currTime);
Expand All @@ -181,7 +180,7 @@ public void snapshotCompleted() {
this.snapshotAborted.set(0);
this.snapshotRunning.set(0);
this.snapshotPaused.set(0);
this.snapshotSkipped.set(false);
this.snapshotSkipped.set(0);
this.taskStateMetrics.setConnectTaskRebalanceExempt(0);
this.stopTime.set(clock.currentTimeInMillis());
}
Expand All @@ -191,14 +190,14 @@ public void snapshotAborted() {
this.snapshotAborted.set(1);
this.snapshotRunning.set(0);
this.snapshotPaused.set(0);
this.snapshotSkipped.set(false);
this.snapshotSkipped.set(0);
this.stopTime.set(clock.currentTimeInMillis());
}

public void snapshotSkipped() {
// snapshotSkipped is an additive metric on top of the other snapshot metrics,
// it doesn't reset the state of the other metrics
this.snapshotSkipped.set(true);
this.snapshotSkipped.set(1);
this.snapshotCompleted.set(0);
this.snapshotAborted.set(1);
this.snapshotRunning.set(0);
Expand Down Expand Up @@ -262,7 +261,7 @@ public void reset() {
snapshotPaused.set(0);
snapshotCompleted.set(0);
snapshotAborted.set(0);
snapshotSkipped.set(false);
snapshotSkipped.set(0);
startTime.set(0);
stopTime.set(0);
startPauseTime.set(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public long getSnapshotAborted() {
}

@Override
public boolean getSnapshotSkipped() {
public long getSnapshotSkipped() {
return snapshotMeter.getSnapshotSkipped();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface SnapshotMetricsMXBean extends SchemaMetricsMXBean {

long getSnapshotAborted();

boolean getSnapshotSkipped();
long getSnapshotSkipped();

long getSnapshotDurationInSeconds();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void shouldSupportAllFeatures() throws Exception {

// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers).hasSize(5);
assertThat(headers).hasSize(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("f9171eb6-19f3-4579-9206-0e179d2ebad7"));
Expand Down Expand Up @@ -356,7 +356,7 @@ public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {

// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(5);
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
Expand Down Expand Up @@ -418,7 +418,7 @@ public void shouldProduceTombstoneEventForNullPayload() throws Exception {

// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(5);
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
Expand Down Expand Up @@ -467,7 +467,7 @@ public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {

// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(4);
assertThat(headers.size()).isEqualTo(1);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
Expand Down