From 2d59488422343f4a12d59eb3e104a39bd2f2e2ae Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Nov 2025 12:48:20 +0300 Subject: [PATCH 1/3] fix dominant event --- pom.xml | 1 + .../dev/vality/daway/config/KafkaConfig.java | 65 +++++++++++++++---- .../properties/KafkaConsumerProperties.java | 2 + .../daway/listener/DominantKafkaListener.java | 43 +++++++++--- src/main/resources/application.yml | 1 + 5 files changed, 88 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index d4738450..e6bce753 100644 --- a/pom.xml +++ b/pom.xml @@ -165,6 +165,7 @@ dev.vality damsel + 1.674-368f05b dev.vality diff --git a/src/main/java/dev/vality/daway/config/KafkaConfig.java b/src/main/java/dev/vality/daway/config/KafkaConfig.java index 82b7a8ed..e08169c6 100644 --- a/src/main/java/dev/vality/daway/config/KafkaConfig.java +++ b/src/main/java/dev/vality/daway/config/KafkaConfig.java @@ -1,15 +1,8 @@ package dev.vality.daway.config; -import dev.vality.damsel.domain_config_v2.HistoricalCommit; -import dev.vality.daway.config.properties.KafkaConsumerProperties; -import dev.vality.daway.serde.CurrencyExchangeRateEventDeserializer; -import dev.vality.daway.serde.HistoricalCommitDeserializer; -import dev.vality.daway.serde.SinkEventDeserializer; -import dev.vality.daway.service.FileService; -import dev.vality.exrates.events.CurrencyEvent; -import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory; -import dev.vality.machinegun.eventsink.MachineEvent; -import lombok.RequiredArgsConstructor; +import java.util.Map; +import java.util.Objects; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; @@ -22,12 +15,25 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; -import java.util.Map; -import java.util.Objects; +import dev.vality.damsel.domain_config_v2.HistoricalCommit; +import dev.vality.daway.config.properties.KafkaConsumerProperties; +import dev.vality.daway.serde.CurrencyExchangeRateEventDeserializer; +import dev.vality.daway.serde.HistoricalCommitDeserializer; +import dev.vality.daway.serde.SinkEventDeserializer; +import dev.vality.daway.service.FileService; +import dev.vality.daway.util.JsonUtil; +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory; +import dev.vality.machinegun.eventsink.MachineEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; @Configuration @RequiredArgsConstructor +@Slf4j @SuppressWarnings("LineLength") public class KafkaConfig { @@ -117,8 +123,35 @@ public KafkaListenerContainerFactory> dominantContainerFactory( - ConsumerFactory consumerFactory) { - return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDominantConcurrency()); + ConsumerFactory consumerFactory, + DefaultErrorHandler dominantErrorHandler) { + ConcurrentKafkaListenerContainerFactory factory = + createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDominantConcurrency()); + factory.setCommonErrorHandler(dominantErrorHandler); + return factory; + } + + @Bean + public DefaultErrorHandler dominantErrorHandler() { + long interval = kafkaConsumerProperties.getDominantErrorBackoffIntervalMs(); + long maxAttempts = kafkaConsumerProperties.getDominantErrorMaxAttempts(); + FixedBackOff backOff = new FixedBackOff(interval, resolveDominantMaxAttempts(maxAttempts)); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff); + errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> { + if (record != null) { + log.error("Failed to process HistoricalCommit, attempt={}, topic={}, partition={}, offset={}, key={}, payload={}", + deliveryAttempt, + record.topic(), + record.partition(), + record.offset(), + record.key(), + JsonUtil.thriftBaseToJsonString((HistoricalCommit) record.value()), + ex); + } else { + log.error("Failed to process HistoricalCommit, attempt={}", deliveryAttempt, ex); + } + }); + return errorHandler; } @Bean @@ -184,6 +217,10 @@ public KafkaListenerContainerFactory KafkaListenerContainerFactory> createConcurrentFactory( ConsumerFactory consumerFactory, int threadsNumber) { ConcurrentKafkaListenerContainerFactory factory = diff --git a/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java b/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java index 81024b85..caa24793 100644 --- a/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java +++ b/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java @@ -26,5 +26,7 @@ public class KafkaConsumerProperties { private int limitConfigConcurrency; private int exrateConcurrency; private int withdrawalAdjustmentConcurrency; + private long dominantErrorBackoffIntervalMs = 5000L; + private long dominantErrorMaxAttempts = -1L; } diff --git a/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java b/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java index 1529abfb..29dd9cb1 100644 --- a/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java +++ b/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java @@ -1,15 +1,18 @@ package dev.vality.daway.listener; -import dev.vality.damsel.domain_config_v2.HistoricalCommit; -import dev.vality.daway.service.DominantService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; -import java.util.List; +import dev.vality.damsel.domain_config_v2.HistoricalCommit; +import dev.vality.daway.service.DominantService; +import dev.vality.daway.util.JsonUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; @Slf4j @RequiredArgsConstructor @@ -25,10 +28,30 @@ public class DominantKafkaListener { public void handle(List> messages, Acknowledgment ack) { log.info("Got historicalCommit batch with size: {}", messages.size()); log.debug("HistoricalCommit messages: {}", messages); - dominantService.processCommit(messages.stream() - .map(ConsumerRecord::value) - .toList()); - ack.acknowledge(); - log.info("Batch has been committed, size={}", messages.size()); + try { + dominantService.processCommit(messages.stream() + .map(ConsumerRecord::value) + .toList()); + ack.acknowledge(); + log.info("Batch has been committed, size={}", messages.size()); + } catch (Exception ex) { + log.error("Failed to process dominant batch: {}", stringifyMessages(messages), ex); + throw ex; + } + } + + private String stringifyMessages(List> messages) { + return messages.stream() + .map(record -> String.format("topic=%s partition=%d offset=%d key=%s payload=%s", + record.topic(), + record.partition(), + record.offset(), + record.key(), + stringifyCommit(record.value()))) + .collect(Collectors.joining("; ")); + } + + private String stringifyCommit(HistoricalCommit historicalCommit) { + return historicalCommit == null ? "null" : JsonUtil.thriftBaseToJsonString(historicalCommit); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a38a6d75..a3c4a332 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -69,6 +69,7 @@ kafka: limit-config-concurrency: 7 exrate-concurrency: 7 withdrawal-adjustment-concurrency: 7 + dominant-error-backoff-interval-ms: 30000 topics: invoice: id: mg-invoice-100-2 From 321fb643f77fa1f848282fe0ee18c0305d7bac91 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Nov 2025 12:53:47 +0300 Subject: [PATCH 2/3] fix dominant event [2] --- src/main/java/dev/vality/daway/config/KafkaConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/dev/vality/daway/config/KafkaConfig.java b/src/main/java/dev/vality/daway/config/KafkaConfig.java index e08169c6..f6b50739 100644 --- a/src/main/java/dev/vality/daway/config/KafkaConfig.java +++ b/src/main/java/dev/vality/daway/config/KafkaConfig.java @@ -221,7 +221,7 @@ private long resolveDominantMaxAttempts(long configuredMaxAttempts) { return configuredMaxAttempts < 0 ? FixedBackOff.UNLIMITED_ATTEMPTS : configuredMaxAttempts; } - private KafkaListenerContainerFactory> createConcurrentFactory( + private ConcurrentKafkaListenerContainerFactory createConcurrentFactory( ConsumerFactory consumerFactory, int threadsNumber) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); From 4b287dba59615cd1a4e8f5b472c366ea3a5ad873 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Nov 2025 15:30:31 +0300 Subject: [PATCH 3/3] fix dominant event [3] --- .../daway/listener/DominantKafkaListener.java | 43 +++++-------------- 1 file changed, 10 insertions(+), 33 deletions(-) diff --git a/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java b/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java index 29dd9cb1..1529abfb 100644 --- a/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java +++ b/src/main/java/dev/vality/daway/listener/DominantKafkaListener.java @@ -1,18 +1,15 @@ package dev.vality.daway.listener; -import java.util.List; -import java.util.stream.Collectors; - +import dev.vality.damsel.domain_config_v2.HistoricalCommit; +import dev.vality.daway.service.DominantService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; -import dev.vality.damsel.domain_config_v2.HistoricalCommit; -import dev.vality.daway.service.DominantService; -import dev.vality.daway.util.JsonUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import java.util.List; @Slf4j @RequiredArgsConstructor @@ -28,30 +25,10 @@ public class DominantKafkaListener { public void handle(List> messages, Acknowledgment ack) { log.info("Got historicalCommit batch with size: {}", messages.size()); log.debug("HistoricalCommit messages: {}", messages); - try { - dominantService.processCommit(messages.stream() - .map(ConsumerRecord::value) - .toList()); - ack.acknowledge(); - log.info("Batch has been committed, size={}", messages.size()); - } catch (Exception ex) { - log.error("Failed to process dominant batch: {}", stringifyMessages(messages), ex); - throw ex; - } - } - - private String stringifyMessages(List> messages) { - return messages.stream() - .map(record -> String.format("topic=%s partition=%d offset=%d key=%s payload=%s", - record.topic(), - record.partition(), - record.offset(), - record.key(), - stringifyCommit(record.value()))) - .collect(Collectors.joining("; ")); - } - - private String stringifyCommit(HistoricalCommit historicalCommit) { - return historicalCommit == null ? "null" : JsonUtil.thriftBaseToJsonString(historicalCommit); + dominantService.processCommit(messages.stream() + .map(ConsumerRecord::value) + .toList()); + ack.acknowledge(); + log.info("Batch has been committed, size={}", messages.size()); } }