diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index cf6036b6133..f44f26b6039 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -1868,6 +1868,7 @@ public Optional parseSignallingMessage(Struct value, String fieldName) } List fields = event.schema().fields(); if (fields.size() != 3) { + LOGGER.warn("Field {} part of signal '{}' is missing", fieldName, value); LOGGER.warn("The signal event '{}' should have 3 fields but has {}", event, fields.size()); return Optional.empty(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java index 13a79c5a3f9..c7732584c8c 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java @@ -188,6 +188,10 @@ private void processSignal(SignalRecord signalRecord) { LOGGER.debug("Signal Processor offset context {}", previousOffsets.getOffsets()); LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", signalRecord.getId(), signalRecord.getType(), signalRecord.getData()); + LOGGER.info("Action {} has been interrupted. The signal {} may not have been processed.", signalRecord.getType(), signalRecord); + LOGGER.info("Action {} failed. The signal {} may not have been processed.", signalRecord.getType(), signalRecord); + LOGGER.info("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType()); + LOGGER.info("Signal '{}' has been received but the data '{}' cannot be parsed", signalRecord.getId(), signalRecord.getData()); final SignalAction

action = signalActions.get(signalRecord.getType()); if (action == null) { LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType()); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java index db190b172d1..08d8b4e401f 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java @@ -81,6 +81,7 @@ public boolean process(Struct value) throws InterruptedException { LOGGER.trace("Received event from signaling table. Enqueue for process"); try { + LOGGER.info("Exception while preparing to process the signal '{}'", value); Optional result = SignalRecord.buildSignalRecordFromChangeEventSource(value, connectorConfig); if (result.isEmpty()) { return false;