diff --git a/pom.xml b/pom.xml
index d4738450..e6bce753 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
dev.vality
damsel
+ 1.674-368f05b
dev.vality
diff --git a/src/main/java/dev/vality/daway/config/KafkaConfig.java b/src/main/java/dev/vality/daway/config/KafkaConfig.java
index 82b7a8ed..f6b50739 100644
--- a/src/main/java/dev/vality/daway/config/KafkaConfig.java
+++ b/src/main/java/dev/vality/daway/config/KafkaConfig.java
@@ -1,15 +1,8 @@
package dev.vality.daway.config;
-import dev.vality.damsel.domain_config_v2.HistoricalCommit;
-import dev.vality.daway.config.properties.KafkaConsumerProperties;
-import dev.vality.daway.serde.CurrencyExchangeRateEventDeserializer;
-import dev.vality.daway.serde.HistoricalCommitDeserializer;
-import dev.vality.daway.serde.SinkEventDeserializer;
-import dev.vality.daway.service.FileService;
-import dev.vality.exrates.events.CurrencyEvent;
-import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
-import dev.vality.machinegun.eventsink.MachineEvent;
-import lombok.RequiredArgsConstructor;
+import java.util.Map;
+import java.util.Objects;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
@@ -22,12 +15,25 @@
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
-import java.util.Map;
-import java.util.Objects;
+import dev.vality.damsel.domain_config_v2.HistoricalCommit;
+import dev.vality.daway.config.properties.KafkaConsumerProperties;
+import dev.vality.daway.serde.CurrencyExchangeRateEventDeserializer;
+import dev.vality.daway.serde.HistoricalCommitDeserializer;
+import dev.vality.daway.serde.SinkEventDeserializer;
+import dev.vality.daway.service.FileService;
+import dev.vality.daway.util.JsonUtil;
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
@Configuration
@RequiredArgsConstructor
+@Slf4j
@SuppressWarnings("LineLength")
public class KafkaConfig {
@@ -117,8 +123,35 @@ public KafkaListenerContainerFactory> dominantContainerFactory(
- ConsumerFactory consumerFactory) {
- return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDominantConcurrency());
+ ConsumerFactory consumerFactory,
+ DefaultErrorHandler dominantErrorHandler) {
+ ConcurrentKafkaListenerContainerFactory factory =
+ createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDominantConcurrency());
+ factory.setCommonErrorHandler(dominantErrorHandler);
+ return factory;
+ }
+
+ @Bean
+ public DefaultErrorHandler dominantErrorHandler() {
+ long interval = kafkaConsumerProperties.getDominantErrorBackoffIntervalMs();
+ long maxAttempts = kafkaConsumerProperties.getDominantErrorMaxAttempts();
+ FixedBackOff backOff = new FixedBackOff(interval, resolveDominantMaxAttempts(maxAttempts));
+ DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff);
+ errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> {
+ if (record != null) {
+ log.error("Failed to process HistoricalCommit, attempt={}, topic={}, partition={}, offset={}, key={}, payload={}",
+ deliveryAttempt,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.key(),
+ JsonUtil.thriftBaseToJsonString((HistoricalCommit) record.value()),
+ ex);
+ } else {
+ log.error("Failed to process HistoricalCommit, attempt={}", deliveryAttempt, ex);
+ }
+ });
+ return errorHandler;
}
@Bean
@@ -184,7 +217,11 @@ public KafkaListenerContainerFactory KafkaListenerContainerFactory> createConcurrentFactory(
+ private long resolveDominantMaxAttempts(long configuredMaxAttempts) {
+ return configuredMaxAttempts < 0 ? FixedBackOff.UNLIMITED_ATTEMPTS : configuredMaxAttempts;
+ }
+
+ private ConcurrentKafkaListenerContainerFactory createConcurrentFactory(
ConsumerFactory consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
diff --git a/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java b/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java
index 81024b85..caa24793 100644
--- a/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java
+++ b/src/main/java/dev/vality/daway/config/properties/KafkaConsumerProperties.java
@@ -26,5 +26,7 @@ public class KafkaConsumerProperties {
private int limitConfigConcurrency;
private int exrateConcurrency;
private int withdrawalAdjustmentConcurrency;
+ private long dominantErrorBackoffIntervalMs = 5000L;
+ private long dominantErrorMaxAttempts = -1L;
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index a38a6d75..a3c4a332 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -69,6 +69,7 @@ kafka:
limit-config-concurrency: 7
exrate-concurrency: 7
withdrawal-adjustment-concurrency: 7
+ dominant-error-backoff-interval-ms: 30000
topics:
invoice:
id: mg-invoice-100-2