Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,7 @@ public Optional<String[]> parseSignallingMessage(Struct value, String fieldName)
}
List<org.apache.kafka.connect.data.Field> fields = event.schema().fields();
if (fields.size() != 3) {
LOGGER.warn("Field {} part of signal '{}' is missing", fieldName, value);
LOGGER.warn("The signal event '{}' should have 3 fields but has {}", event, fields.size());
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ private void processSignal(SignalRecord signalRecord) {

LOGGER.debug("Signal Processor offset context {}", previousOffsets.getOffsets());
LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", signalRecord.getId(), signalRecord.getType(), signalRecord.getData());
LOGGER.info("Action {} has been interrupted. The signal {} may not have been processed.", signalRecord.getType(), signalRecord);
LOGGER.info("Action {} failed. The signal {} may not have been processed.", signalRecord.getType(), signalRecord);
LOGGER.info("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType());
LOGGER.info("Signal '{}' has been received but the data '{}' cannot be parsed", signalRecord.getId(), signalRecord.getData());
final SignalAction<P> action = signalActions.get(signalRecord.getType());
if (action == null) {
LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", signalRecord.getId(), signalRecord.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public boolean process(Struct value) throws InterruptedException {

LOGGER.trace("Received event from signaling table. Enqueue for process");
try {
LOGGER.info("Exception while preparing to process the signal '{}'", value);
Optional<SignalRecord> result = SignalRecord.buildSignalRecordFromChangeEventSource(value, connectorConfig);
if (result.isEmpty()) {
return false;
Expand Down