Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>damsel</artifactId>
<version>1.674-368f05b</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
Expand Down
67 changes: 52 additions & 15 deletions src/main/java/dev/vality/daway/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package dev.vality.daway.config;

import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.daway.config.properties.KafkaConsumerProperties;
import dev.vality.daway.serde.CurrencyExchangeRateEventDeserializer;
import dev.vality.daway.serde.HistoricalCommitDeserializer;
import dev.vality.daway.serde.SinkEventDeserializer;
import dev.vality.daway.service.FileService;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.Objects;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -22,12 +15,25 @@
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

import java.util.Map;
import java.util.Objects;
import dev.vality.damsel.domain_config_v2.HistoricalCommit;
import dev.vality.daway.config.properties.KafkaConsumerProperties;
import dev.vality.daway.serde.CurrencyExchangeRateEventDeserializer;
import dev.vality.daway.serde.HistoricalCommitDeserializer;
import dev.vality.daway.serde.SinkEventDeserializer;
import dev.vality.daway.service.FileService;
import dev.vality.daway.util.JsonUtil;
import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Configuration
@RequiredArgsConstructor
@Slf4j
@SuppressWarnings("LineLength")
public class KafkaConfig {

Expand Down Expand Up @@ -117,8 +123,35 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, HistoricalCommit>> dominantContainerFactory(
ConsumerFactory<String, HistoricalCommit> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDominantConcurrency());
ConsumerFactory<String, HistoricalCommit> consumerFactory,
DefaultErrorHandler dominantErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, HistoricalCommit> factory =
createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getDominantConcurrency());
factory.setCommonErrorHandler(dominantErrorHandler);
return factory;
}

@Bean
public DefaultErrorHandler dominantErrorHandler() {
long interval = kafkaConsumerProperties.getDominantErrorBackoffIntervalMs();
long maxAttempts = kafkaConsumerProperties.getDominantErrorMaxAttempts();
FixedBackOff backOff = new FixedBackOff(interval, resolveDominantMaxAttempts(maxAttempts));
DefaultErrorHandler errorHandler = new DefaultErrorHandler(backOff);
errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> {
if (record != null) {
log.error("Failed to process HistoricalCommit, attempt={}, topic={}, partition={}, offset={}, key={}, payload={}",
deliveryAttempt,
record.topic(),
record.partition(),
record.offset(),
record.key(),
JsonUtil.thriftBaseToJsonString((HistoricalCommit) record.value()),
ex);
} else {
log.error("Failed to process HistoricalCommit, attempt={}", deliveryAttempt, ex);
}
});
return errorHandler;
}

@Bean
Expand Down Expand Up @@ -184,7 +217,11 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getExrateConcurrency());
}

private <T> KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, T>> createConcurrentFactory(
private long resolveDominantMaxAttempts(long configuredMaxAttempts) {
return configuredMaxAttempts < 0 ? FixedBackOff.UNLIMITED_ATTEMPTS : configuredMaxAttempts;
}

private <T> ConcurrentKafkaListenerContainerFactory<String, T> createConcurrentFactory(
ConsumerFactory<String, T> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ public class KafkaConsumerProperties {
private int limitConfigConcurrency;
private int exrateConcurrency;
private int withdrawalAdjustmentConcurrency;
private long dominantErrorBackoffIntervalMs = 5000L;
private long dominantErrorMaxAttempts = -1L;

}
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ kafka:
limit-config-concurrency: 7
exrate-concurrency: 7
withdrawal-adjustment-concurrency: 7
dominant-error-backoff-interval-ms: 30000
topics:
invoice:
id: mg-invoice-100-2
Expand Down
Loading