From e3218805af3c7e7cb0918733b7d4fc913e41041c Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Thu, 22 Jan 2026 13:17:42 +0530 Subject: [PATCH 1/2] Rephrase sensitive logs for testing redaction on CCloud --- .../src/main/java/io/debezium/config/CommonConnectorConfig.java | 1 + .../main/java/io/debezium/pipeline/signal/SignalProcessor.java | 2 ++ .../debezium/pipeline/signal/channels/SourceSignalChannel.java | 1 + 3 files changed, 4 insertions(+) diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index cf6036b6133..f44f26b6039 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -1868,6 +1868,7 @@ public Optional parseSignallingMessage(Struct value, String fieldName) } List fields = event.schema().fields(); if (fields.size() != 3) { + LOGGER.warn("Field {} part of signal '{}' is missing", fieldName, value); LOGGER.warn("The signal event '{}' should have 3 fields but has {}", event, fields.size()); return Optional.empty(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java index 13a79c5a3f9..68d51f32b25 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java @@ -188,6 +188,8 @@ private void processSignal(SignalRecord signalRecord) { LOGGER.debug("Signal Processor offset context {}", previousOffsets.getOffsets()); LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", signalRecord.getId(), signalRecord.getType(), signalRecord.getData()); + LOGGER.info("Action {} has been interrupted. The signal {} may not have been processed.", signalRecord.getType(), signalRecord); + LOGGER.info("Action {} failed. The signal {} may not have been processed.", signalRecord.getType(), signalRecord); final SignalAction

action = signalActions.get(signalRecord.getType()); if (action == null) { LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType()); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java index db190b172d1..08d8b4e401f 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/channels/SourceSignalChannel.java @@ -81,6 +81,7 @@ public boolean process(Struct value) throws InterruptedException { LOGGER.trace("Received event from signaling table. Enqueue for process"); try { + LOGGER.info("Exception while preparing to process the signal '{}'", value); Optional result = SignalRecord.buildSignalRecordFromChangeEventSource(value, connectorConfig); if (result.isEmpty()) { return false; From 585aa7df0fc2f9b9ce17482233cad2054351dc4b Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Thu, 22 Jan 2026 17:07:33 +0530 Subject: [PATCH 2/2] Add more redact logs --- .../main/java/io/debezium/pipeline/signal/SignalProcessor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java index 68d51f32b25..c7732584c8c 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/SignalProcessor.java @@ -190,6 +190,8 @@ private void processSignal(SignalRecord signalRecord) { LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", signalRecord.getId(), signalRecord.getType(), signalRecord.getData()); LOGGER.info("Action {} has been interrupted. The signal {} may not have been processed.", signalRecord.getType(), signalRecord); LOGGER.info("Action {} failed. The signal {} may not have been processed.", signalRecord.getType(), signalRecord); + LOGGER.info("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType()); + LOGGER.info("Signal '{}' has been received but the data '{}' cannot be parsed", signalRecord.getId(), signalRecord.getData()); final SignalAction

action = signalActions.get(signalRecord.getType()); if (action == null) { LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType());