From ff94c06ea738a399c0b3c4a0f9e07a61813108b7 Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Wed, 31 Dec 2025 21:48:25 +0330 Subject: [PATCH 1/9] calculate user's deposits per day --- .../opex/accountant/app/config/AppConfig.kt | 8 + .../app/listener/DepositEventListener.kt | 38 ++ .../core/spi/UserDepositVolumePersister.kt | 15 + .../listener/config/AccountantKafkaConfig.kt | 32 +- .../listener/consumer/DepositKafkaListener.kt | 8 + .../kafka/listener/inout/DepositEvent.kt | 12 + .../kafka/listener/spi/DepositListener.kt | 5 + .../dao/UserDepositVolumeRepository.kt | 44 +++ .../impl/UserDepositVolumePersisterImpl.kt | 56 +++ .../postgres/model/UserDepositVolumeModel.kt | 16 + .../src/main/resources/schema.sql | 10 + .../co/nilin/opex/auth/proxy/KeycloakProxy.kt | 2 +- .../controller/PaymentGatewayController.kt | 71 +--- .../app/controller/TransferController.kt | 4 +- .../opex/wallet/app/service/DepositService.kt | 364 ++++++++++++------ .../src/main/resources/application-otc.yml | 4 +- .../src/main/resources/application.yml | 3 + .../wallet/core/spi/DepositEventSubmitter.kt | 13 + .../listener/config/KafkaProducerConfig.kt | 12 +- .../kafka/listener/config/KafkaTopicConfig.kt | 11 + .../kafka/listener/model/DepositEvent.kt | 13 + .../listener/submitter/DepositSubmitter.kt | 48 +++ .../submitter/DummyDepositSubmitter.kt | 22 ++ 23 files changed, 618 insertions(+), 193 deletions(-) create mode 100644 accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt create mode 100644 accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt create mode 100644 accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt create mode 100644 accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt create mode 100644 accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt create mode 100644 wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt create mode 100644 wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt create mode 100644 wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt create mode 100644 wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index 9ee0a1322..f1e0a70f9 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -157,4 +157,12 @@ class AppConfig( withdrawRequestKafkaListener.addListener(withdrawRequestEventListener) } + + @Autowired + fun configureDepositEventListener( + depositKafkaListener: DepositKafkaListener, + depositEventListener: DepositEventListener + ) { + depositKafkaListener.addListener(depositEventListener) + } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt new file mode 100644 index 000000000..01a2662d0 --- /dev/null +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt @@ -0,0 +1,38 @@ +package co.nilin.opex.accountant.app.listener + +import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent +import co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent +import co.nilin.opex.accountant.ports.kafka.listener.spi.DepositListener +import co.nilin.opex.accountant.ports.kafka.listener.spi.WithdrawRequestListener +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class DepositEventListener(private val userDepositVolumePersister: UserDepositVolumePersister) : + DepositListener { + + private val logger = LoggerFactory.getLogger(DepositEventListener::class.java) + val scope = CoroutineScope(Dispatchers.IO) + override fun id(): String { + return "DepositEventListener" + } + + override fun onEvent( + event: DepositEvent, + partition: Int, + offset: Long, + timestamp: Long + ) { + logger.info("==========================================================================") + logger.info("Incoming Deposit event: $event") + logger.info("==========================================================================") + scope.launch { + userDepositVolumePersister.update(event.uuid, event.currency, event.amount, event.createDate) + } + } + +} diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt new file mode 100644 index 000000000..441eca0f2 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt @@ -0,0 +1,15 @@ +package co.nilin.opex.accountant.core.spi + +import co.nilin.opex.accountant.core.model.WithdrawStatus +import java.math.BigDecimal +import java.time.LocalDateTime + +interface UserDepositVolumePersister { + suspend fun update( + userId: String, + currency: String, + amount: BigDecimal, + date: LocalDateTime) + + suspend fun getTotalValueByUserAndDateAfter(uuid: String, startDate: LocalDateTime): BigDecimal +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt index 75cb6dd41..6ae6f37dd 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.listener.config import co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent import co.nilin.opex.accountant.ports.kafka.listener.consumer.* +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent import co.nilin.opex.accountant.ports.kafka.listener.inout.FinancialActionResponseEvent import co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent @@ -37,7 +38,7 @@ class AccountantKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent,kyc_level_updated_event:co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent,fiAction_response_event:co.nilin.opex.accountant.ports.kafka.listener.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent,kyc_level_updated_event:co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent,fiAction_response_event:co.nilin.opex.accountant.ports.kafka.listener.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent,depositEvent:co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent" ) } @@ -60,6 +61,10 @@ class AccountantKafkaConfig { fun withdrawRequestConsumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } + @Bean("depositConsumerFactory") + fun depositConsumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } @Autowired @ConditionalOnBean(TradeKafkaListener::class) @@ -157,6 +162,16 @@ class AccountantKafkaConfig { return KafkaTemplate(producerFactory) } + @Bean("depositProducerFactory") + fun depositProducerFactory(@Qualifier("consumerConfig") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("depositKafkaTemplate") + fun depositKafkaTemplate(@Qualifier("depositProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + @Autowired @ConditionalOnBean(KycLevelUpdatedKafkaListener::class) fun configureKycLevelUpdatedListener( @@ -187,6 +202,21 @@ class AccountantKafkaConfig { container.start() } + @Autowired + @ConditionalOnBean(DepositKafkaListener::class) + fun configureDepositRequestEventListener( + listener: DepositKafkaListener, + @Qualifier("depositKafkaTemplate") template: KafkaTemplate, + @Qualifier("depositConsumerFactory") consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("deposit")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("DepositKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "deposit.DLT") + container.start() + } + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> cr.headers().add("dlt-origin-module", "ACCOUNTANT".toByteArray()) diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt new file mode 100644 index 000000000..da4347aa9 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.accountant.ports.kafka.listener.consumer + +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent +import co.nilin.opex.accountant.ports.kafka.listener.spi.DepositListener +import org.springframework.stereotype.Component + +@Component +class DepositKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt new file mode 100644 index 000000000..37624a599 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt @@ -0,0 +1,12 @@ +package co.nilin.opex.accountant.ports.kafka.listener.inout + +import java.math.BigDecimal +import java.time.LocalDateTime + +data class DepositEvent( + val uuid: String, + val depositRef: String? = null, + val currency: String, + val amount: BigDecimal, + val createDate: LocalDateTime, +) diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt new file mode 100644 index 000000000..765899b4f --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.accountant.ports.kafka.listener.spi + +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent + +interface DepositListener : Listener \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt new file mode 100644 index 000000000..dccd2d98d --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt @@ -0,0 +1,44 @@ +package co.nilin.opex.accountant.ports.postgres.dao + +import co.nilin.opex.accountant.ports.postgres.model.UserDepositVolumeModel +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono +import java.math.BigDecimal +import java.time.LocalDate + +@Repository +interface UserDepositVolumeRepository : ReactiveCrudRepository { + + @Query( + """ + insert into user_deposit_volume (user_id, date, total_amount, quote_currency) + values (:userId, :date, :totalAmount, :quoteCurrency) + on conflict (user_id, date,quote_currency) + do update + set total_amount = user_deposit_volume.total_amount + EXCLUDED.total_amount + """ + ) + fun insertOrUpdate( + userId: String, + date: LocalDate, + totalAmount: BigDecimal, + quoteCurrency: String + ): Mono + + + @Query( + """ + select sum(total_amount) as total_Amount + from user_deposit_volume + where user_id = :userId and date >= :startDate and quote_currency=:quoteCurrency + group by user_id + """ + ) + fun findTotalValueByUserAndAndDateAfter( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Mono +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt new file mode 100644 index 000000000..6afff2d53 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt @@ -0,0 +1,56 @@ +package co.nilin.opex.accountant.ports.postgres.impl + +import co.nilin.opex.accountant.core.model.WithdrawStatus +import co.nilin.opex.accountant.core.spi.CurrencyRatePersister +import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister +import co.nilin.opex.accountant.ports.postgres.dao.UserDepositVolumeRepository +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingleOrNull +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.math.RoundingMode + + +@Component +class UserDepositVolumePersisterImpl( + private val repository: UserDepositVolumeRepository, + private val currencyRatePersister: CurrencyRatePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, + @Value("\${app.withdraw-volume-calculation-currency}") private val calculationCurrency: String, + @Value("\${app.deposit-volume-calculation-currency-precision:2}") private val calculationCurrencyPrecision: Int + +) : UserDepositVolumePersister { + + override suspend fun update( + userId: String, + currency: String, + amount: BigDecimal, + date: LocalDateTime + ) { + val rate = if (currency == calculationCurrency) BigDecimal.ONE + else currencyRatePersister.getRate(currency, calculationCurrency) + + val signedAmount = amount.multiply(rate).setScale(calculationCurrencyPrecision, RoundingMode.DOWN) + + repository.insertOrUpdate( + userId, + date.atOffset(ZoneOffset.of(zoneOffsetString)).toLocalDate(), + signedAmount, + calculationCurrency + ).awaitSingleOrNull() + } + + override suspend fun getTotalValueByUserAndDateAfter( + uuid: String, + startDate: LocalDateTime + ): BigDecimal { + return repository.findTotalValueByUserAndAndDateAfter( + uuid, + startDate.atOffset(ZoneOffset.of(zoneOffsetString)).toLocalDate(), + calculationCurrency + ).awaitFirstOrNull() ?: BigDecimal.ZERO + } +} diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt new file mode 100644 index 000000000..5f0cbead1 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt @@ -0,0 +1,16 @@ +package co.nilin.opex.accountant.ports.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal +import java.time.LocalDate + +@Table("user_deposit_volume") +data class UserDepositVolumeModel( + @Id val id: Long? = null, + val userId: String, + val date: LocalDate, + val totalAmount: BigDecimal, + val quoteCurrency: String, +) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql index a217b6e39..c4d6999c6 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql @@ -149,4 +149,14 @@ CREATE TABLE IF NOT EXISTS withdraw_limit_config daily_max_amount decimal not null ); +CREATE TABLE IF NOT EXISTS user_deposit_volume +( + id SERIAL PRIMARY KEY, + user_id VARCHAR(36) NOT NULL, + date DATE not null, + total_amount decimal not null, + quote_currency VARCHAR(50) NOT NULL, + unique (user_id, date,quote_currency) +); + COMMIT; diff --git a/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt b/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt index f983badff..5fe5e21b7 100644 --- a/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt +++ b/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt @@ -60,7 +60,7 @@ class KeycloakProxy( .bodyValue("client_id=${clientId}&client_secret=${clientSecret}&grant_type=password&username=${users[0].username}&password=${password}") .retrieve() .onStatus({ it == HttpStatus.valueOf(401) }) { - throw OpexError.InvalidUserCredentials.exception() + throw OpexError.UsernameOrPasswordIsIncorrect.exception() } .awaitBody() } diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt index 737c40e13..ffe8962a1 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt @@ -1,35 +1,18 @@ package co.nilin.opex.wallet.app.controller -import co.nilin.opex.common.OpexError import co.nilin.opex.wallet.app.dto.PaymentDepositRequest import co.nilin.opex.wallet.app.dto.PaymentDepositResponse -import co.nilin.opex.wallet.app.service.TraceDepositService -import co.nilin.opex.wallet.core.inout.Deposit -import co.nilin.opex.wallet.core.inout.TransferCommand -import co.nilin.opex.wallet.core.inout.TransferMethod -import co.nilin.opex.wallet.core.model.* -import co.nilin.opex.wallet.core.service.GatewayService -import co.nilin.opex.wallet.core.spi.CurrencyServiceManager -import co.nilin.opex.wallet.core.spi.TransferManager -import co.nilin.opex.wallet.core.spi.WalletManager -import co.nilin.opex.wallet.core.spi.WalletOwnerManager +import co.nilin.opex.wallet.app.service.DepositService import org.springframework.transaction.annotation.Transactional import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RestController -import java.math.BigDecimal -import java.util.* @RestController @RequestMapping("/payment") class PaymentGatewayController( - val transferManager: TransferManager, - val currencyService: CurrencyServiceManager, - val walletManager: WalletManager, - val walletOwnerManager: WalletOwnerManager, - val traceDepositService: TraceDepositService, - val gatewayService: GatewayService + val depositService: DepositService ) { @@ -38,55 +21,7 @@ class PaymentGatewayController( @PostMapping("/internal/deposit") @Transactional suspend fun paymentDeposit(@RequestBody request: PaymentDepositRequest): PaymentDepositResponse { - val receiverWalletType = WalletType.MAIN - - val currency = - currencyService.fetchCurrency(FetchCurrency(symbol = request.currency.name)) - ?: throw OpexError.CurrencyNotFound.exception() - val sourceOwner = walletOwnerManager.findWalletOwner(walletOwnerManager.systemUuid) - ?: throw OpexError.WalletOwnerNotFound.exception() - val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, WalletType.MAIN, currency) - ?: walletManager.createWallet(sourceOwner, Amount(currency, BigDecimal.ZERO), currency, WalletType.MAIN) - - val receiverOwner = walletOwnerManager.findWalletOwner(request.userId) - ?: walletOwnerManager.createWalletOwner(request.userId, "not set", "") - - val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType( - receiverOwner, - receiverWalletType, - currency - ) ?: walletManager.createWallet( - receiverOwner, - Amount(currency, BigDecimal.ZERO), - currency, - receiverWalletType - ) - transferManager.transfer( - TransferCommand( - sourceWallet, - receiverWallet, - Amount(sourceWallet.currency, request.amount), - request.description, - request.reference, - TransferCategory.DEPOSIT - ) - ) - var depositCommand = Deposit( - receiverOwner.uuid, - UUID.randomUUID().toString(), - currency.symbol, - request.amount, - note = request.description, - transactionRef = request.reference, - status = DepositStatus.DONE, - depositType = DepositType.OFF_CHAIN, - network = null, - attachment = null, - transferMethod = if (request.isIPG == true) TransferMethod.IPG else TransferMethod.MPG - ) - traceDepositService.saveDepositInNewTransaction(depositCommand) - - return PaymentDepositResponse(true) + return depositService.commitPaymentDeposit(request) } } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt index d32e0b205..a98e6e8d0 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt @@ -13,9 +13,7 @@ import java.math.BigDecimal @RestController class TransferController( - private val transferService: TransferService, - private val depositService: DepositService -) { + private val transferService: TransferService) { data class TransferBody( val description: String?, diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt index 7ccf88410..331a3a4cd 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt @@ -3,14 +3,15 @@ package co.nilin.opex.wallet.app.service import co.nilin.opex.common.OpexError import co.nilin.opex.utility.error.data.OpexException import co.nilin.opex.wallet.app.dto.ManualTransferRequest +import co.nilin.opex.wallet.app.dto.PaymentDepositRequest +import co.nilin.opex.wallet.app.dto.PaymentDepositResponse import co.nilin.opex.wallet.core.inout.* -import co.nilin.opex.wallet.core.model.DepositStatus +import co.nilin.opex.wallet.core.model.* import co.nilin.opex.wallet.core.model.DepositType -import co.nilin.opex.wallet.core.model.TransferCategory -import co.nilin.opex.wallet.core.model.WalletType -import co.nilin.opex.wallet.core.spi.DepositPersister -import co.nilin.opex.wallet.core.spi.WalletOwnerManager +import co.nilin.opex.wallet.core.spi.* +import co.nilin.opex.wallet.ports.kafka.listener.submitter.DepositSubmitter import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal @@ -20,15 +21,48 @@ import java.util.* @Service class DepositService( private val walletOwnerManager: WalletOwnerManager, - private val currencyService: CurrencyServiceV2, + private val walletManager: WalletManager, private val depositPersister: DepositPersister, private val transferService: TransferService, private val traceDepositService: TraceDepositService, private val currencyServiceV2: CurrencyServiceV2, + private val depositEventSubmitter: DepositEventSubmitter, + private val transferManager: TransferManager, + private val currencyService: CurrencyServiceManager, + @Value("\${app.deposit.snapshot.enabled:true}") + private val depositSnapshotEnabled: Boolean) { - ) { private val logger = LoggerFactory.getLogger(DepositService::class.java) + // ------------------------------------------------------------------------- + // Helpers (NO LOGIC CHANGE) + // ------------------------------------------------------------------------- + + private suspend fun getOrCreateWalletOwner(ownerUuid: String): WalletOwner { + return walletOwnerManager.findWalletOwner(ownerUuid) + ?: walletOwnerManager.createWalletOwner(ownerUuid, "not set", "") + } + + private suspend fun getOrCreateMainWallet( + owner: WalletOwner, + currency: CurrencyCommand + ): Wallet { + return walletManager.findWalletByOwnerAndCurrencyAndType( + owner, + WalletType.MAIN, + currency + ) ?: walletManager.createWallet( + owner, + Amount(currency, BigDecimal.ZERO), + currency, + WalletType.MAIN + ) + } + + // ------------------------------------------------------------------------- + // Manual Deposit + // ------------------------------------------------------------------------- + @Transactional suspend fun depositManually( symbol: String, @@ -37,41 +71,46 @@ class DepositService( amount: BigDecimal, request: ManualTransferRequest, ): TransferResult? { - logger.info("deposit manually: $senderUuid to $receiverUuid on $symbol at ${LocalDateTime.now()}") - val gateway = currencyServiceV2.fetchCurrencyGateway(request.gatewayUuid, symbol) + logger.info( + "deposit manually: $senderUuid to $receiverUuid on $symbol at ${LocalDateTime.now()}" + ) + + val gateway = currencyServiceV2 + .fetchCurrencyGateway(request.gatewayUuid, symbol) ?: throw OpexError.GatewayNotFount.exception() if (gateway !is OffChainGatewayCommand || gateway.transferMethod != TransferMethod.MANUALLY) { throw OpexError.GatewayNotFount.exception() } - walletOwnerManager.findWalletOwner(senderUuid)?.let { it.level } + walletOwnerManager.findWalletOwner(senderUuid) + ?.level ?: throw OpexException(OpexError.WalletOwnerNotFound) - walletOwnerManager.findWalletOwner(receiverUuid)?.let { it.level } - ?: walletOwnerManager.createWalletOwner( - receiverUuid, - "not set", - "1" - ).level + walletOwnerManager.findWalletOwner(receiverUuid) + ?.level + ?: walletOwnerManager.createWalletOwner(receiverUuid, "not set", "1").level return deposit( - symbol, - receiverUuid, - WalletType.MAIN, - senderUuid, - amount, - request.description, - request.ref, - null, - request.attachment, - DepositType.OFF_CHAIN, - request.gatewayUuid, - TransferMethod.MANUALLY + symbol = symbol, + receiverUuid = receiverUuid, + receiverWalletType = WalletType.MAIN, + senderUuid = senderUuid, + amount = amount, + description = request.description, + transferRef = request.ref, + chain = null, + attachment = request.attachment, + depositType = DepositType.OFF_CHAIN, + gatewayUuid = request.gatewayUuid, + transferMethod = TransferMethod.MANUALLY ) } + // ------------------------------------------------------------------------- + // Core Deposit + // ------------------------------------------------------------------------- @Transactional suspend fun deposit( @@ -84,17 +123,20 @@ class DepositService( transferRef: String?, chain: String?, attachment: String?, - depositType: DepositType, + depositType: co.nilin.opex.wallet.core.model.DepositType, gatewayUuid: String?, transferMethod: TransferMethod?, ): TransferResult? { - logger.info("A ${depositType.name} deposit tx on $symbol-$chain was received for $receiverUuid .......") - var depositCommand = Deposit( - receiverUuid, - UUID.randomUUID().toString(), - symbol, - amount, + logger.info( + "A ${depositType.name} deposit tx on $symbol-$chain was received for $receiverUuid ......." + ) + + val depositCommand = Deposit( + ownerUuid = receiverUuid, + depositUuid = UUID.randomUUID().toString(), + currency = symbol, + amount = amount, note = description, transactionRef = transferRef, status = DepositStatus.DONE, @@ -104,100 +146,128 @@ class DepositService( transferMethod = transferMethod ) - val gatewayData = fetchDepositData(gatewayUuid, symbol, depositType, depositCommand) + val gatewayData = fetchDepositData( + gatewayUuid = gatewayUuid, + symbol = symbol, + depositType = depositType, + depositCommand = depositCommand + ) - if (depositCommand.depositType != depositType) + if (depositCommand.depositType != depositType) { throw OpexError.GatewayNotFount.exception() + } - val validDeposit = isValidDeposit(depositCommand, gatewayData) - if (!validDeposit) { - logger.info("An invalid deposit command : $symbol-$chain-$receiverUuid-$amount") + val isValid = isValidDeposit(depositCommand, gatewayData) + if (!isValid) { + logger.info( + "An invalid deposit command : $symbol-$chain-$receiverUuid-$amount" + ) depositCommand.status = DepositStatus.INVALID } - //todo add statusReason field to be able to trace invalid deposit's reason - if (validDeposit || depositCommand.depositType == DepositType.ON_CHAIN) + // todo add statusReason field + if (isValid || depositCommand.depositType == co.nilin.opex.wallet.core.model.DepositType.ON_CHAIN) { traceDepositService.saveDepositInNewTransaction(depositCommand) + } - if (depositCommand.status == DepositStatus.DONE) { - logger.info("Going to charge wallet on a ${depositType.name} deposit event :$symbol-$chain-$receiverUuid-$amount") - val (actualSenderUuid, actualTransferCategory) = if ( + if (depositCommand.status != DepositStatus.DONE) { + throw OpexError.InvalidDeposit.exception() + } + + logger.info( + "Going to charge wallet on a ${depositType.name} deposit event :" + + "$symbol-$chain-$receiverUuid-$amount" + ) + + val (actualSenderUuid, transferCategory) = + if ( senderUuid != null && - depositType == DepositType.OFF_CHAIN && + depositType == co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN && transferMethod == TransferMethod.MANUALLY ) { senderUuid to TransferCategory.DEPOSIT_MANUALLY } else { walletOwnerManager.systemUuid to TransferCategory.DEPOSIT } - return transferService.transfer( - symbol, - WalletType.MAIN, - actualSenderUuid, - receiverWalletType, - receiverUuid, - amount, - description, - transferRef, - actualTransferCategory, - ).transferResult - } else throw OpexError.InvalidDeposit.exception() + val transferResult = transferService.transfer( + symbol = symbol, + senderWalletType = WalletType.MAIN, + senderUuid = actualSenderUuid, + receiverWalletType = receiverWalletType, + receiverUuid = receiverUuid, + amount = amount, + description = description, + transferRef = transferRef, + transferCategory = transferCategory + ).transferResult + + publishDepositEvent(depositCommand) + return transferResult } + // ------------------------------------------------------------------------- + // Validation & Gateway + // ------------------------------------------------------------------------- - fun isValidDeposit(depositCommand: Deposit, gatewayData: GatewayData): Boolean { - return gatewayData.isEnabled && depositCommand.amount >= gatewayData.minimum && depositCommand.amount <= gatewayData.maximum + fun isValidDeposit(deposit: Deposit, gatewayData: GatewayData): Boolean { + return gatewayData.isEnabled && + deposit.amount >= gatewayData.minimum && + deposit.amount <= gatewayData.maximum } suspend fun fetchDepositData( gatewayUuid: String?, symbol: String, - depositType: DepositType, + depositType: co.nilin.opex.wallet.core.model.DepositType, depositCommand: Deposit, ): GatewayData { - gatewayUuid?.let { - currencyService.fetchCurrencyGateway(gatewayUuid, symbol)?.let { - when (it) { - is OnChainGatewayCommand -> { - depositCommand.depositType = DepositType.ON_CHAIN - depositCommand.currency = it.currencySymbol!! - depositCommand.network = it.chain - return GatewayData( - it.isDepositActive ?: true && it.depositAllowed ?: true, - BigDecimal.ZERO, - it.depositMin ?: BigDecimal.ZERO, - it.depositMax - ) - } - - is OffChainGatewayCommand -> { - depositCommand.depositType = DepositType.OFF_CHAIN - depositCommand.currency = it.currencySymbol!! - depositCommand.network = null - depositCommand.transferMethod = it.transferMethod - return GatewayData( - it.isDepositActive ?: true && it.depositAllowed ?: true, - BigDecimal.ZERO, - it.depositMin ?: BigDecimal.ZERO, - it.depositMax - ) - } - - else -> { - throw OpexError.GatewayNotFount.exception() - } - - - } - - - } ?: throw OpexError.GatewayNotFount.exception() - - } ?: return GatewayData(true, BigDecimal.ZERO, BigDecimal.ZERO, null) + if (gatewayUuid == null) { + return GatewayData(true, BigDecimal.ZERO, BigDecimal.ZERO, null) + } + + val gateway = currencyServiceV2 + .fetchCurrencyGateway(gatewayUuid, symbol) + ?: throw OpexError.GatewayNotFount.exception() + + return when (gateway) { + + is OnChainGatewayCommand -> { + depositCommand.depositType = co.nilin.opex.wallet.core.model.DepositType.ON_CHAIN + depositCommand.currency = gateway.currencySymbol!! + depositCommand.network = gateway.chain + + GatewayData( + gateway.isDepositActive ?: true && gateway.depositAllowed ?: true, + BigDecimal.ZERO, + gateway.depositMin ?: BigDecimal.ZERO, + gateway.depositMax + ) + } + + is OffChainGatewayCommand -> { + depositCommand.depositType = co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN + depositCommand.currency = gateway.currencySymbol!! + depositCommand.network = null + depositCommand.transferMethod = gateway.transferMethod + + GatewayData( + gateway.isDepositActive ?: true && gateway.depositAllowed ?: true, + BigDecimal.ZERO, + gateway.depositMin ?: BigDecimal.ZERO, + gateway.depositMax + ) + } + + else -> throw OpexError.GatewayNotFount.exception() + } } + // ------------------------------------------------------------------------- + // Queries + // ------------------------------------------------------------------------- + suspend fun findDepositHistory( uuid: String, symbol: String?, @@ -206,8 +276,9 @@ class DepositService( limit: Int?, size: Int?, ascendingByTime: Boolean?, - ): List { - return depositPersister.findDepositHistory(uuid, symbol, startTime, endTime, limit, size, ascendingByTime) + ): List = + depositPersister + .findDepositHistory(uuid, symbol, startTime, endTime, limit, size, ascendingByTime) .map { DepositResponse( it.depositUuid, @@ -225,17 +296,14 @@ class DepositService( it.transferMethod ) } - } suspend fun getDepositHistoryCount( uuid: String, symbol: String?, startTime: LocalDateTime?, endTime: LocalDateTime?, - ): Long { - return depositPersister.getDepositHistoryCount(uuid, symbol, startTime, endTime) - } - + ): Long = + depositPersister.getDepositHistoryCount(uuid, symbol, startTime, endTime) suspend fun searchDeposit( ownerUuid: String?, @@ -248,9 +316,8 @@ class DepositService( offset: Int?, size: Int?, ascendingByTime: Boolean?, - ): List { - - return depositPersister.findByCriteria( + ): List = + depositPersister.findByCriteria( ownerUuid, symbol, sourceAddress, @@ -262,19 +329,80 @@ class DepositService( size, ascendingByTime ) - } suspend fun getDepositSummary( uuid: String, startTime: LocalDateTime?, endTime: LocalDateTime?, limit: Int?, - ): List { - return depositPersister.getDepositSummary( - uuid, - startTime, - endTime, - limit, + ): List = + depositPersister.getDepositSummary(uuid, startTime, endTime, limit) + + // ------------------------------------------------------------------------- + // Payment Deposit + // ------------------------------------------------------------------------- + + suspend fun commitPaymentDeposit( + request: PaymentDepositRequest + ): PaymentDepositResponse { + + val currency = currencyService + .fetchCurrency(FetchCurrency(symbol = request.currency.name)) + ?: throw OpexError.CurrencyNotFound.exception() + + val sourceOwner = walletOwnerManager + .findWalletOwner(walletOwnerManager.systemUuid) + ?: throw OpexError.WalletOwnerNotFound.exception() + + val sourceWallet = getOrCreateMainWallet(sourceOwner, currency) + + val receiverOwner = getOrCreateWalletOwner(request.userId) + val receiverWallet = getOrCreateMainWallet(receiverOwner, currency) + + transferManager.transfer( + TransferCommand( + sourceWallet, + receiverWallet, + Amount(currency, request.amount), + request.description, + request.reference, + TransferCategory.DEPOSIT + ) ) + + val deposit = Deposit( + ownerUuid = receiverOwner.uuid, + depositUuid = UUID.randomUUID().toString(), + currency = currency.symbol, + amount = request.amount, + note = request.description, + transactionRef = request.reference, + status = DepositStatus.DONE, + depositType = co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN, + network = null, + attachment = null, + transferMethod = + if (request.isIPG == true) TransferMethod.IPG else TransferMethod.MPG + ) + + traceDepositService.saveDepositInNewTransaction(deposit) + publishDepositEvent(deposit) + + return PaymentDepositResponse(true) + } + + // ------------------------------------------------------------------------- + // Events + // ------------------------------------------------------------------------- + + suspend fun publishDepositEvent(deposit: Deposit) { + if (depositSnapshotEnabled) { + depositEventSubmitter.send( + deposit.ownerUuid, + deposit.transactionRef, + deposit.currency, + deposit.amount + ) + } } -} \ No newline at end of file +} diff --git a/wallet/wallet-app/src/main/resources/application-otc.yml b/wallet/wallet-app/src/main/resources/application-otc.yml index 47217af27..bf489d739 100644 --- a/wallet/wallet-app/src/main/resources/application-otc.yml +++ b/wallet/wallet-app/src/main/resources/application-otc.yml @@ -110,10 +110,12 @@ app: enabled: ${WITHDRAW_LIMIT_ENABLED:false} otp-required-count: ${WITHDRAW_OTP_REQUIRED_COUNT:0} bank-account-validation: ${WITHDRAW_BANK_ACCOUNT_VALIDATION:false} + deposit: + snapshot: + enabled: ${DEPOSIT_SNAPSHOT_ENABLED:false} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} - logging: level: root: INFO diff --git a/wallet/wallet-app/src/main/resources/application.yml b/wallet/wallet-app/src/main/resources/application.yml index d993574e8..4f12fa225 100644 --- a/wallet/wallet-app/src/main/resources/application.yml +++ b/wallet/wallet-app/src/main/resources/application.yml @@ -154,6 +154,9 @@ app: enabled: ${WITHDRAW_LIMIT_ENABLED:false} otp-required-count: ${WITHDRAW_OTP_REQUIRED_COUNT:0} bank-account-validation: ${WITHDRAW_BANK_ACCOUNT_VALIDATION:false} + deposit: + snapshot: + enabled: ${DEPOSIT_SNAPSHOT_ENABLED:true} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt new file mode 100644 index 000000000..3109003ec --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.wallet.core.spi +import java.math.BigDecimal +import java.time.LocalDateTime + +interface DepositEventSubmitter { + suspend fun send( + uuid: String, + depositRef: String?, + currency: String, + amount: BigDecimal, + createDate: LocalDateTime?= LocalDateTime.now() + ) +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt index 2acd93193..b4db5991a 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -29,7 +29,7 @@ class KafkaProducerConfig { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", - JsonSerializer.TYPE_MAPPINGS to "fiAction_response_event:co.nilin.opex.wallet.core.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.wallet.ports.kafka.listener.model.WithdrawRequestEvent" + JsonSerializer.TYPE_MAPPINGS to "fiAction_response_event:co.nilin.opex.wallet.core.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.wallet.ports.kafka.listener.model.WithdrawRequestEvent,depositEvent:co.nilin.opex.wallet.ports.kafka.listener.model.DepositEvent" ) } @@ -68,4 +68,14 @@ class KafkaProducerConfig { return DefaultKafkaProducerFactory(producerConfigs) } + @Bean + fun depositKafkaTemplate(@Qualifier("depositProducerFactory") factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + + @Bean + fun depositProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt index 3f7a237de..aafc3193b 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt @@ -30,6 +30,17 @@ class KafkaTopicConfig { .replicas(1) .build() }) + + } + + @Autowired + fun depositTopics(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_deposit", NewTopic::class.java, Supplier { + TopicBuilder.name("deposit") + .partitions(1) + .replicas(1) + .build() + }) } } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt new file mode 100644 index 000000000..29f49e745 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +import co.nilin.opex.wallet.core.model.WithdrawStatus +import java.math.BigDecimal +import java.time.LocalDateTime + +data class DepositEvent( + val uuid: String, + val depositRef: String? = null, + val currency: String, + val amount: BigDecimal, + val createDate: LocalDateTime?= LocalDateTime.now(), +) \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt new file mode 100644 index 000000000..4a5f2d293 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt @@ -0,0 +1,48 @@ +package co.nilin.opex.wallet.ports.kafka.listener.submitter + +import co.nilin.opex.common.utils.LoggerDelegate +import co.nilin.opex.wallet.core.spi.DepositEventSubmitter +import co.nilin.opex.wallet.ports.kafka.listener.model.DepositEvent +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.retry.support.RetryTemplate +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDateTime + +@Component +@ConditionalOnProperty(name = ["app.deposit.snapshot.enabled"], havingValue = "true") +class DepositSubmitter( + @Qualifier("depositKafkaTemplate") private val template: KafkaTemplate +) : DepositEventSubmitter { + + private val logger by LoggerDelegate() + + private val retryTemplate = RetryTemplate.builder() + .maxAttempts(10) + .exponentialBackoff(1000, 1.8, 5 * 60 * 1000) + .retryOn(Exception::class.java) + .build() + + override suspend fun send( + uuid: String, + depositRef: String?, + currency: String, + amount: BigDecimal, + createDate: LocalDateTime? + ) { + retryTemplate.execute { + try { + template.send( + "deposit", + DepositEvent(uuid, depositRef, currency, amount, createDate) + ).get() + logger.info("Deposit event sent") + } catch (ex: Exception) { + logger.error("Error sending deposit event", ex) + throw ex + } + } + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt new file mode 100644 index 000000000..c723e884a --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt @@ -0,0 +1,22 @@ +package co.nilin.opex.wallet.ports.kafka.listener.submitter + +import co.nilin.opex.wallet.core.spi.DepositEventSubmitter +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDateTime + +@Component +@ConditionalOnProperty(name = ["app.deposit.snapshot.enabled"], havingValue = "false") +class DummyDepositSubmitter( +) : DepositEventSubmitter { + override suspend fun send( + uuid: String, + depositRef: String?, + currency: String, + amount: BigDecimal, + createDate: LocalDateTime? + ) { + + } +} \ No newline at end of file From f602852e2bbe09a4f6f8b1d50e8261f8b3656d3a Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Sat, 3 Jan 2026 19:55:45 +0330 Subject: [PATCH 2/9] Develop the service(s) to return users' total volume activity per day --- .../opex/accountant/app/config/AppConfig.kt | 2 +- .../controller/UserDailyActivityController.kt | 46 ++++++++++++ .../app/controller/UserDataController.kt | 5 +- .../src/main/resources/application.yml | 1 + accountant/accountant-core/pom.xml | 6 ++ .../core/api/DepositActivityManager.kt | 11 +++ .../core/api/TradeActivityManager.kt | 11 +++ .../core/api/WithdrawActivityManager.kt | 11 +++ .../opex/accountant/core/model/DailyAmount.kt | 9 +++ .../service/DepositActivityManagerImpl.kt | 72 ++++++++++++++++++ .../core/service/FeeCalculatorImpl.kt | 4 +- .../core/service/TradeActivityManagerImpl.kt | 73 +++++++++++++++++++ .../core/service/TradeManagerImpl.kt | 3 +- .../service/WithdrawActivityManagerImpl.kt | 72 ++++++++++++++++++ .../core/spi/UserDepositVolumePersister.kt | 5 ++ ...rsister.kt => UserTradeVolumePersister.kt} | 5 +- .../core/spi/UserWithdrawVolumePersister.kt | 10 +-- .../core/service/FeeCalculatorImplTest.kt | 4 +- .../core/service/TradeManagerImplTest.kt | 2 +- .../dao/UserDepositVolumeRepository.kt | 19 +++++ .../postgres/dao/UserTradeVolumeRepository.kt | 18 +++++ .../dao/UserWithdrawVolumeRepository.kt | 18 +++++ .../impl/UserDepositVolumePersisterImpl.kt | 28 ++++++- ...mpl.kt => UserTradeVolumePersisterImpl.kt} | 36 ++++++++- .../impl/UserWithdrawVolumePersisterImpl.kt | 28 ++++++- 25 files changed, 479 insertions(+), 20 deletions(-) create mode 100644 accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt create mode 100644 accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt rename accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/{UserVolumePersister.kt => UserTradeVolumePersister.kt} (72%) rename accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/{UserVolumePersisterImpl.kt => UserTradeVolumePersisterImpl.kt} (50%) diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index f1e0a70f9..4a160cefb 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -74,7 +74,7 @@ class AppConfig( feeCalculator: FeeCalculator, financialActionPublisher: FinancialActionPublisher, currencyRatePersister: CurrencyRatePersister, - userVolumePersister: UserVolumePersister + userVolumePersister: UserTradeVolumePersister ): TradeManager { return TradeManagerImpl( financeActionPersister, diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt new file mode 100644 index 000000000..e447f7646 --- /dev/null +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt @@ -0,0 +1,46 @@ +package co.nilin.opex.accountant.app.controller + +import co.nilin.opex.accountant.core.api.DepositActivityManager +import co.nilin.opex.accountant.core.api.TradeActivityManager +import co.nilin.opex.accountant.core.api.WithdrawActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/user-activity") +class UserDailyActivityController( + private val withdrawManager: WithdrawActivityManager, + private val depositManager: DepositActivityManager, + private val TradeManager: TradeActivityManager +) { + + @GetMapping("/withdraw/{userId}") + suspend fun getDailyWithdrawLast31Days( + @PathVariable userId: String + ): List { + return withdrawManager.getLastDaysWithdrawActivity( + userId = userId + ) + } + + @GetMapping("/deposit/{userId}") + suspend fun getDailyDepositLast31Days( + @PathVariable userId: String + ): List { + return depositManager.getLastDaysDepositActivity( + userId = userId + ) + } + + @GetMapping("/trade/{userId}") + suspend fun getDailyTradeLast31Days( + @PathVariable userId: String + ): List { + return TradeManager.getLastDaysTradeActivity( + userId = userId + ) + } +} \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt index 49b9bdfde..4066e8730 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt @@ -2,7 +2,7 @@ package co.nilin.opex.accountant.app.controller import co.nilin.opex.accountant.core.api.FeeCalculator import co.nilin.opex.accountant.core.model.UserFee -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.core.spi.UserWithdrawVolumePersister import co.nilin.opex.common.utils.Interval import org.springframework.beans.factory.annotation.Value @@ -13,7 +13,7 @@ import java.time.LocalDateTime @RestController @RequestMapping("/user/data") class UserDataController( - private val userVolumePersister: UserVolumePersister, + private val userVolumePersister: UserTradeVolumePersister, private val feeCalculator: FeeCalculator, private val userWithdrawVolumePersister: UserWithdrawVolumePersister, @Value("\${app.trade-volume-calculation-currency}") @@ -60,4 +60,5 @@ class UserDataController( (interval?.getLocalDateTime() ?: LocalDateTime.now()) ) } + } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/resources/application.yml b/accountant/accountant-app/src/main/resources/application.yml index 0d588412a..a2b0499a6 100644 --- a/accountant/accountant-app/src/main/resources/application.yml +++ b/accountant/accountant-app/src/main/resources/application.yml @@ -108,6 +108,7 @@ app: zone-offset: +03:30 trade-volume-calculation-currency: ${TRADE_VOLUME_CALCULATION_CURRENCY:USDT} withdraw-volume-calculation-currency: ${WITHDRAW_VOLUME_CALCULATION_CURRENCY:USDT} + deposit-volume-calculation-currency: ${DEPOSIT_VOLUME_CALCULATION_CURRENCY:USDT} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} diff --git a/accountant/accountant-core/pom.xml b/accountant/accountant-core/pom.xml index 4ef1fd63d..9e689f4d0 100644 --- a/accountant/accountant-core/pom.xml +++ b/accountant/accountant-core/pom.xml @@ -66,5 +66,11 @@ jackson-datatype-jsr310 2.15.2 + + com.sun.mail + mailapi + 1.6.2 + compile + diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt new file mode 100644 index 000000000..b655d77ac --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.DailyAmount + +interface DepositActivityManager { + suspend fun getLastDaysDepositActivity( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt new file mode 100644 index 000000000..c19491256 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.DailyAmount + +interface TradeActivityManager { + suspend fun getLastDaysTradeActivity( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt new file mode 100644 index 000000000..c21b4eb56 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.DailyAmount + +interface WithdrawActivityManager { + suspend fun getLastDaysWithdrawActivity( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt new file mode 100644 index 000000000..c30d8c689 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.accountant.core.model + +import java.math.BigDecimal +import java.time.LocalDate + +data class DailyAmount( + val date: LocalDate, + val totalAmount: BigDecimal +) diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt new file mode 100644 index 000000000..58907c55e --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt @@ -0,0 +1,72 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.DepositActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister +import co.nilin.opex.common.utils.CacheManager +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors +@Service +class DepositActivityManagerImpl( + private val cacheManager: CacheManager, + private val depositVolumePersister: UserDepositVolumePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, +) : DepositActivityManager { + override suspend fun getLastDaysDepositActivity( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "deposit:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = depositVolumePersister.getLastDaysDeposit(userId, startDate,quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "deposit:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt index 6e5b29273..789cc388f 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt @@ -4,7 +4,7 @@ import co.nilin.opex.accountant.core.api.FeeCalculator import co.nilin.opex.accountant.core.model.* import co.nilin.opex.accountant.core.spi.FeeConfigService import co.nilin.opex.accountant.core.spi.JsonMapper -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.core.spi.WalletProxy import co.nilin.opex.common.utils.CacheManager import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class FeeCalculatorImpl( private val walletProxy: WalletProxy, private val feeConfigService: FeeConfigService, - private val userVolumePersister: UserVolumePersister, + private val userVolumePersister: UserTradeVolumePersister, @Qualifier("appCacheManager") private val cacheManager: CacheManager, @Value("\${app.address}") private val platformAddress: String, @Value("\${app.zone-offset}") private val zoneOffsetString: String, diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt new file mode 100644 index 000000000..51ed4dbd8 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt @@ -0,0 +1,73 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.TradeActivityManager +import co.nilin.opex.accountant.core.api.WithdrawActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister +import co.nilin.opex.common.utils.CacheManager +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors +@Service +class TradeActivityManagerImpl( + private val cacheManager: CacheManager, + private val tradeVolumePersister: UserTradeVolumePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, +) : TradeActivityManager { + override suspend fun getLastDaysTradeActivity( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "trade:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = tradeVolumePersister.getLastDaysTrade(userId, startDate,quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "trade:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 005008af0..f6ce24662 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -9,7 +9,6 @@ import co.nilin.opex.accountant.core.model.* import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Value import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal import java.time.LocalDateTime @@ -25,7 +24,7 @@ open class TradeManagerImpl( private val feeCalculator: FeeCalculator, private val financialActionPublisher: FinancialActionPublisher, private val currencyRatePersister: CurrencyRatePersister, - private val userVolumePersister: UserVolumePersister, + private val userVolumePersister: UserTradeVolumePersister, private val tradeVolumeCalculationCurrency: String, private val zoneOffsetString: String, ) : TradeManager { diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt new file mode 100644 index 000000000..cbf8dd491 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt @@ -0,0 +1,72 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.WithdrawActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserWithdrawVolumePersister +import co.nilin.opex.common.utils.CacheManager +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors +@Service +class WithdrawActivityManagerImpl( + private val cacheManager: CacheManager, + private val withdrawVolumePersister: UserWithdrawVolumePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, +) : WithdrawActivityManager { + override suspend fun getLastDaysWithdrawActivity( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "withdraw:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = withdrawVolumePersister.getLastDaysWithdraw(userId, startDate,quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "withdraw:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt index 441eca0f2..496ccf3a4 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt @@ -1,7 +1,9 @@ package co.nilin.opex.accountant.core.spi +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.core.model.WithdrawStatus import java.math.BigDecimal +import java.time.LocalDate import java.time.LocalDateTime interface UserDepositVolumePersister { @@ -12,4 +14,7 @@ interface UserDepositVolumePersister { date: LocalDateTime) suspend fun getTotalValueByUserAndDateAfter(uuid: String, startDate: LocalDateTime): BigDecimal + + suspend fun getLastDaysDeposit(userId: String, startDate: LocalDate?, quatCurrency: String?, lastDays: Long = 31): List + } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserTradeVolumePersister.kt similarity index 72% rename from accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserVolumePersister.kt rename to accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserTradeVolumePersister.kt index ba96e888c..5e6de6bbe 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserVolumePersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserTradeVolumePersister.kt @@ -1,9 +1,10 @@ package co.nilin.opex.accountant.core.spi +import co.nilin.opex.accountant.core.model.DailyAmount import java.math.BigDecimal import java.time.LocalDate -interface UserVolumePersister { +interface UserTradeVolumePersister { suspend fun update( userId: String, @@ -21,4 +22,6 @@ interface UserVolumePersister { startDate: LocalDate, quoteCurrency: String ): BigDecimal? + suspend fun getLastDaysTrade(userId: String, startDate: LocalDate?, quatCurrency: String?, lastDays: Long = 31): List + } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt index e283bf6e2..7bc056c49 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt @@ -1,17 +1,17 @@ package co.nilin.opex.accountant.core.spi +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.core.model.WithdrawStatus import java.math.BigDecimal +import java.time.LocalDate import java.time.LocalDateTime interface UserWithdrawVolumePersister { suspend fun update( - userId: String, - currency: String, - amount: BigDecimal, - date: LocalDateTime, - withdrawStatus: WithdrawStatus + userId: String, currency: String, amount: BigDecimal, date: LocalDateTime, withdrawStatus: WithdrawStatus ) suspend fun getTotalValueByUserAndDateAfter(uuid: String, startDate: LocalDateTime): BigDecimal + + suspend fun getLastDaysWithdraw(userId: String, startDate: LocalDate?, quatCurrency: String?, lastDays: Long = 31): List } \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt index 2c1ad86d3..55e69d80d 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt @@ -5,7 +5,7 @@ import co.nilin.opex.accountant.core.model.FinancialActionCategory import co.nilin.opex.accountant.core.model.UserFee import co.nilin.opex.accountant.core.model.WalletType import co.nilin.opex.accountant.core.spi.FeeConfigService -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.core.spi.WalletProxy import co.nilin.opex.common.utils.CacheManager import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit internal class FeeCalculatorImplTest { private val walletProxy = mockk() private val feeConfigService = mockk() - private val userVolumePersister = mockk() + private val userVolumePersister = mockk() @Qualifier("appCacheManager") private val cacheManager = mockk>() diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index e6d3f2f29..455204684 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -30,7 +30,7 @@ internal class TradeManagerImplTest { private val richTradePublisher = mockk() private val financialActionPublisher = mockk() private val currencyRatePersister = mockk() - private val userVolumePersister = mockk() + private val userVolumePersister = mockk() private val feeCalculator = mockk() private val walletProxy = mockk() private val feeConfigService = mockk() diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt index dccd2d98d..56f5f4c03 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt @@ -1,9 +1,11 @@ package co.nilin.opex.accountant.ports.postgres.dao +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.ports.postgres.model.UserDepositVolumeModel import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.LocalDate @@ -41,4 +43,21 @@ interface UserDepositVolumeRepository : ReactiveCrudRepository + + + @Query( + """ + select date, total_amount + from user_deposit_volume + where user_id = :userId + and date >= :startDate + and quote_currency = :quoteCurrency + order by date desc + """ + ) + fun findDailyDepositVolume( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt index 953c45d9f..730452f0e 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt @@ -1,9 +1,11 @@ package co.nilin.opex.accountant.ports.postgres.dao +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.ports.postgres.model.UserTradeVolumeModel import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.LocalDate @@ -58,4 +60,20 @@ interface UserTradeVolumeRepository : ReactiveCrudRepository + + @Query( + """ + select date, total_amount + from user_trade_volume + where user_id = :userId + and date >= :startDate + and quote_currency = :quoteCurrency + order by date desc + """ + ) + fun findDailyTradeVolume( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt index 36a158e73..8f65a4a38 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt @@ -1,9 +1,11 @@ package co.nilin.opex.accountant.ports.postgres.dao +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.ports.postgres.model.UserWithdrawVolumeModel import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.LocalDate @@ -41,4 +43,20 @@ interface UserWithdrawVolumeRepository : ReactiveCrudRepository + + @Query( + """ + select date, total_amount + from user_withdraw_volume + where user_id = :userId + and date >= :startDate + and quote_currency = :quoteCurrency + order by date desc + """ + ) + fun findDailyWithdrawVolume( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt index 6afff2d53..580c3ca36 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt @@ -1,10 +1,12 @@ package co.nilin.opex.accountant.ports.postgres.impl +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.core.model.WithdrawStatus import co.nilin.opex.accountant.core.spi.CurrencyRatePersister import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister import co.nilin.opex.accountant.ports.postgres.dao.UserDepositVolumeRepository import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component @@ -12,6 +14,7 @@ import java.math.BigDecimal import java.time.LocalDateTime import java.time.ZoneOffset import java.math.RoundingMode +import java.time.LocalDate @Component @@ -19,7 +22,7 @@ class UserDepositVolumePersisterImpl( private val repository: UserDepositVolumeRepository, private val currencyRatePersister: CurrencyRatePersister, @Value("\${app.zone-offset}") private val zoneOffsetString: String, - @Value("\${app.withdraw-volume-calculation-currency}") private val calculationCurrency: String, + @Value("\${app.deposit-volume-calculation-currency}") private val calculationCurrency: String, @Value("\${app.deposit-volume-calculation-currency-precision:2}") private val calculationCurrencyPrecision: Int ) : UserDepositVolumePersister { @@ -53,4 +56,27 @@ class UserDepositVolumePersisterImpl( calculationCurrency ).awaitFirstOrNull() ?: BigDecimal.ZERO } + + override suspend fun getLastDaysDeposit( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return repository + .findDailyDepositVolume(userId, startDate, quatCurrency?:calculationCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } } diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserTradeVolumePersisterImpl.kt similarity index 50% rename from accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserVolumePersisterImpl.kt rename to accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserTradeVolumePersisterImpl.kt index 16fd79f1a..9291c0d36 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserVolumePersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserTradeVolumePersisterImpl.kt @@ -1,14 +1,23 @@ package co.nilin.opex.accountant.ports.postgres.impl -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.ports.postgres.dao.UserTradeVolumeRepository +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull +import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import java.math.BigDecimal import java.time.LocalDate +import java.time.ZoneOffset @Component -class UserVolumePersisterImpl(private val repository: UserTradeVolumeRepository) : UserVolumePersister { +class UserTradeVolumePersisterImpl( + private val repository: UserTradeVolumeRepository, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, + @Value("\${app.trade-volume-calculation-currency}") private val calculationCurrency: String, + + ) : UserTradeVolumePersister { override suspend fun update( userId: String, @@ -38,4 +47,27 @@ class UserVolumePersisterImpl(private val repository: UserTradeVolumeRepository) return repository.findTotalValueByUserAndAndDateAfterAndCurrency(uuid, currency, startDate, quoteCurrency) .awaitSingleOrNull() } + + override suspend fun getLastDaysTrade( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return repository + .findDailyTradeVolume(userId, startDate, quatCurrency ?: calculationCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt index dc02a6874..6e72f5bcc 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt @@ -1,17 +1,20 @@ package co.nilin.opex.accountant.ports.postgres.impl +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.core.model.WithdrawStatus import co.nilin.opex.accountant.core.spi.CurrencyRatePersister import co.nilin.opex.accountant.core.spi.UserWithdrawVolumePersister import co.nilin.opex.accountant.ports.postgres.dao.UserWithdrawVolumeRepository import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import java.math.BigDecimal +import java.math.RoundingMode +import java.time.LocalDate import java.time.LocalDateTime import java.time.ZoneOffset -import java.math.RoundingMode @Component @@ -55,4 +58,27 @@ class UserWithdrawVolumePersisterImpl( calculationCurrency ).awaitFirstOrNull() ?: BigDecimal.ZERO } + + override suspend fun getLastDaysWithdraw( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return repository + .findDailyWithdrawVolume(userId, startDate, quatCurrency?:calculationCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } } From 8c682cd18bc410144f631d714bed68fb3d43d6aa Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Sun, 4 Jan 2026 18:39:41 +0330 Subject: [PATCH 3/9] Aggregate user activity behind an API module --- .../service/DepositActivityManagerImpl.kt | 3 +- .../src/main/resources/application.yml | 4 +- .../opex/api/core/inout/MarketBasicData.kt | 4 +- .../core/inout/analytics/ActivityTotals.kt | 2 +- .../api/core/inout/analytics/DailyAmount.kt | 6 ++ .../opex/api/core/spi/AccountantProxy.kt | 6 +- .../co/nilin/opex/api/core/spi/WalletProxy.kt | 3 + .../ports/opex/controller/MarketController.kt | 13 ++-- .../controller/UserAnalyticsController.kt | 72 ++----------------- .../service/UserActivityAggregationService.kt | 51 +++++++++++++ .../ports/opex/util/ConvertorExtenstions.kt | 49 ++++++++++++- .../ports/proxy/impl/AccountantProxyImpl.kt | 46 ++++++++++++ .../api/ports/proxy/impl/WalletProxyImpl.kt | 20 ++++++ .../app/controller/MarketStatsController.kt | 2 - .../app/controller/WalletStatController.kt | 10 +++ .../opex/wallet/app/service/DepositService.kt | 12 ++-- .../opex/wallet/core/inout/DailyAmount.kt | 9 +++ .../core/spi/TotalAssetsSnapshotManager.kt | 10 +++ .../opex/wallet/core/spi/WalletDataManager.kt | 7 ++ .../opex/wallet/core/spi/WalletManager.kt | 4 ++ .../dao/TotalAssetsSnapshotRepository.kt | 20 ++++++ .../postgres/impl/TotalAssetsSnapshotImpl.kt | 30 +++++++- .../postgres/impl/WalletDataManagerImpl.kt | 71 ++++++++++++++++-- .../postgres/impl/WalletManagerImplV2.kt | 9 ++- 24 files changed, 359 insertions(+), 104 deletions(-) create mode 100644 api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt create mode 100644 api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt create mode 100644 wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt index 58907c55e..22c5fa720 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt @@ -11,6 +11,7 @@ import java.time.LocalDate import java.time.ZoneOffset import java.util.concurrent.TimeUnit import java.util.stream.Collectors + @Service class DepositActivityManagerImpl( private val cacheManager: CacheManager, @@ -42,7 +43,7 @@ class DepositActivityManagerImpl( if (missingDates.isNotEmpty()) { val startDate = missingDates.minOrNull()!! - val dbData = depositVolumePersister.getLastDaysDeposit(userId, startDate,quoteCurrency) + val dbData = depositVolumePersister.getLastDaysDeposit(userId, startDate, quoteCurrency) .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); for (date in missingDates) { diff --git a/api/api-app/src/main/resources/application.yml b/api/api-app/src/main/resources/application.yml index 80a1b7cc9..b4df93b34 100644 --- a/api/api-app/src/main/resources/application.yml +++ b/api/api-app/src/main/resources/application.yml @@ -118,9 +118,7 @@ app: id: opex-api-key binance: api-url: https://api1.binance.com - trade-volume-calculation-currency: ${TRADE_VOLUME_CALCULATION_CURRENCY:USDT} - withdraw-volume-calculation-currency: ${WITHDRAW_VOLUME_CALCULATION_CURRENCY:USDT} - total-asset-calculation-currency: ${TOTAL_ASSET_CALCULATION_CURRENCY:USDT} + user-activity-reference-currency: ${USER_ACTIVITY_REFERENCE_CURRENCY:USDT} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt index de0882559..de478609e 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt @@ -3,7 +3,5 @@ package co.nilin.opex.api.core.inout data class MarketBasicData( val quoteCurrencies: List, val referenceCurrencies: List, - val withdrawReferenceCurrency: String, - val tradeReferenceCurrency: String, - val totalAssetReferenceCurrency: String, + val userActivityReferenceCurrency: String ) diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt index 5799b1044..43203da9d 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt @@ -10,5 +10,5 @@ data class ActivityTotals( val totalWithdraw: BigDecimal, val totalDeposit: BigDecimal, val totalTrade: BigDecimal, - val totalSwap: BigDecimal + val totalOrder: BigDecimal ) diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt new file mode 100644 index 000000000..f07212c51 --- /dev/null +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt @@ -0,0 +1,6 @@ +package co.nilin.opex.api.core.inout.analytics + +import java.math.BigDecimal +import java.time.LocalDate + +data class DailyAmount(val date: LocalDate, var totalAmount: BigDecimal) diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt index a98dfcc3d..1497d7a32 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt @@ -1,11 +1,10 @@ package co.nilin.opex.api.core.spi -import co.nilin.opex.api.core.inout.Chain -import co.nilin.opex.api.core.inout.ChainInfo import co.nilin.opex.api.core.inout.FeeConfig import co.nilin.opex.api.core.inout.PairConfigResponse import co.nilin.opex.api.core.inout.UserFee import co.nilin.opex.api.core.inout.WithdrawLimitConfig +import co.nilin.opex.api.core.inout.analytics.DailyAmount import co.nilin.opex.common.utils.Interval import java.math.BigDecimal @@ -24,4 +23,7 @@ interface AccountantProxy { suspend fun getWithdrawLimitConfigs(): List suspend fun getTotalWithdrawVolumeValue(uuid: String, interval: Interval?): BigDecimal + suspend fun getDailyDepositLast31Days(uuid: String): List + suspend fun getDailyWithdrawLast31Days(uuid: String): List + suspend fun getDailyTradeLast31Days(uuid: String): List } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt index 7726d3940..c64ea66bb 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt @@ -1,6 +1,7 @@ package co.nilin.opex.api.core.spi import co.nilin.opex.api.core.inout.* +import co.nilin.opex.api.core.inout.analytics.DailyAmount import java.math.BigDecimal interface WalletProxy { @@ -197,4 +198,6 @@ interface WalletProxy { amount: BigDecimal, request: ManualTransferRequest ): TransferResult + + suspend fun getDailyBalanceLast31Days(token: String, uuid: String): List } \ No newline at end of file diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt index 037565ee5..234d4f6ad 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt @@ -25,12 +25,8 @@ class MarketController( private val walletProxy: WalletProxy, private val matchingGatewayProxy: MatchingGatewayProxy, private val blockChainGatewayProxy: BlockchainGatewayProxy, - @Value("\${app.trade-volume-calculation-currency}") - private val tradeVolumeCalculationCurrency: String, - @Value("\${app.withdraw-volume-calculation-currency}") - private val withdrawVolumeCalculationCurrency: String, - @Value("\${app.total-asset-calculation-currency}") - private val totalAssetCalculationCurrency: String + @Value("\${app.user-activity-reference-currency}") + private val userActivityReferenceCurrency: String, ) { private val orderBookValidLimits = arrayListOf(5, 10, 20, 50, 100, 500, 1000, 5000) private val validDurations = arrayListOf("24h", "7d", "1M") @@ -266,9 +262,8 @@ class MarketController( return MarketBasicData( (quoteCurrencies.map { it.currency }), (quoteCurrencies.filter { it.isReference }.map { it.currency }), - withdrawVolumeCalculationCurrency, - tradeVolumeCalculationCurrency, - totalAssetCalculationCurrency + userActivityReferenceCurrency + ) } diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt index 6c86f7eaf..1451528d7 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt @@ -1,7 +1,9 @@ package co.nilin.opex.api.ports.opex.controller import co.nilin.opex.api.core.inout.analytics.ActivityTotals +import co.nilin.opex.api.ports.opex.service.UserActivityAggregationService import co.nilin.opex.api.ports.opex.util.jwtAuthentication +import co.nilin.opex.api.ports.opex.util.tokenValue import org.springframework.security.core.annotation.CurrentSecurityContext import org.springframework.security.core.context.SecurityContext import org.springframework.web.bind.annotation.GetMapping @@ -14,73 +16,11 @@ import kotlin.random.Random @RestController @RequestMapping("/opex/v1/analytics") -class UserAnalyticsController { +class UserAnalyticsController(private val userActivityAggregationService: UserActivityAggregationService) { @GetMapping("/user-activity") - suspend fun userActivity(@CurrentSecurityContext securityContext: SecurityContext): Map { - val jwt = securityContext.jwtAuthentication().token - val uuid = jwt.subject ?: "unknown" - - val zone = ZoneId.systemDefault() - val todayStart = LocalDate.now(zone).atStartOfDay(zone).toInstant() - - val days = 30 - val result = LinkedHashMap(days) - - // Initial balance seeded by user hash - var runningBalance = BigDecimal.valueOf(100 + (uuid.hashCode() and Int.MAX_VALUE) % 900L) - - for (i in (days - 1) downTo 0) { - val dayInstant = todayStart.minusSeconds(86400L * i) - val dayKey = dayInstant.toEpochMilli().toString() - - // deterministic seed per user+day - val seed = deterministicSeed(uuid.hashCode(), dayInstant.toEpochMilli()) - val rnd = Random(seed) - - val base = BigDecimal.valueOf((50 + rnd.nextInt(0, 950)).toLong()) // 50..999 - - val deposit = scaleMoney(base.multiply(BigDecimal.valueOf(rnd.nextDouble(0.0, 5.0)))) - val withdraw = scaleMoney(deposit.multiply(BigDecimal.valueOf(rnd.nextDouble(0.0, 0.9)))) - - val trade = scaleMoney(base.multiply(BigDecimal.valueOf(rnd.nextDouble(0.5, 8.0)))) - val swap = scaleMoney(base.multiply(BigDecimal.valueOf(rnd.nextDouble(0.0, 3.0)))) - - val pnlDrift = scaleMoney(trade.multiply(BigDecimal.valueOf(rnd.nextDouble(-0.01, 0.01)))) - - runningBalance = (runningBalance + deposit - withdraw + pnlDrift).coerceAtLeast(BigDecimal.ZERO) - - result[dayKey] = ActivityTotals( - totalBalance = runningBalance.min(MAX_BALANCE), - totalWithdraw = withdraw, - totalDeposit = deposit, - totalTrade = trade.min(MAX_TRADE), - totalSwap = swap.min(MAX_SWAP) - ) - } - - return result - } - - private fun scaleMoney(v: BigDecimal) = v.setScale(2, RoundingMode.HALF_UP) - - private val MAX_BALANCE = BigDecimal("10000000") - private val MAX_TRADE = BigDecimal("250000") - private val MAX_SWAP = BigDecimal("100000") - - /** - * Simple deterministic seed generator for a user and day. - * Combines user hash and day epoch millis into a reproducible seed. - */ - private fun deterministicSeed(userHash: Int, dayEpochMillis: Long): Long { - var seed = userHash.toLong() * 31 + dayEpochMillis - // simple bit mixing - seed = seed xor (seed shr 33) - seed *= 0xBF58476D1CE4E5BL - seed = seed xor (seed shr 33) - seed *= 0xc4ceb9fe1a85ec5L - seed = seed xor (seed shr 33) - return seed + suspend fun userActivity(@CurrentSecurityContext securityContext: SecurityContext): Map { + val auth=securityContext.jwtAuthentication() + return userActivityAggregationService.getLast31DaysUserStats(auth.tokenValue(),auth.name) } - } diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt new file mode 100644 index 000000000..02705f24e --- /dev/null +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt @@ -0,0 +1,51 @@ +package co.nilin.opex.api.ports.opex.service + +import co.nilin.opex.api.core.inout.analytics.ActivityTotals +import co.nilin.opex.api.core.spi.AccountantProxy +import co.nilin.opex.api.core.spi.WalletProxy +import co.nilin.opex.api.ports.opex.util.toTimestamp +import org.springframework.stereotype.Service +import java.math.BigDecimal + +@Service +class UserActivityAggregationService( + private val walletProxy: WalletProxy, + private val accountantProxy: AccountantProxy +) { + + suspend fun getLast31DaysUserStats( + token: String, + userId: String + ): Map { + + val balances = walletProxy.getDailyBalanceLast31Days(token, userId) + val withdraws = accountantProxy.getDailyWithdrawLast31Days(userId) + val deposits = accountantProxy.getDailyDepositLast31Days(userId) + val trades = accountantProxy.getDailyTradeLast31Days(userId) + + // Collect all dates + val allDates = ( + balances.map { it.date } + + withdraws.map { it.date } + + deposits.map { it.date } + + trades.map { it.date } + ).toSet() + + // Index by date for fast lookup + val balanceMap = balances.associateBy { it.date } + val withdrawMap = withdraws.associateBy { it.date } + val depositMap = deposits.associateBy { it.date } + val tradeMap = trades.associateBy { it.date } + + // Build final map + return allDates.associateWith { date -> + ActivityTotals( + totalBalance = balanceMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalWithdraw = withdrawMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalDeposit = depositMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalTrade = tradeMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalOrder = BigDecimal.ZERO + ) + }.mapKeys { (key, value) -> key.toTimestamp() }.toSortedMap() + } +} \ No newline at end of file diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt index ca232e74b..ad76448bc 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt @@ -3,6 +3,10 @@ package co.nilin.opex.api.ports.opex.util import co.nilin.opex.api.core.inout.OrderData import co.nilin.opex.api.core.inout.OrderStatus import co.nilin.opex.api.ports.opex.data.OrderDataResponse +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.ZoneId fun OrderData.toResponse(): OrderDataResponse { return OrderDataResponse( @@ -20,4 +24,47 @@ fun OrderData.toResponse(): OrderDataResponse { createDate = this.createDate, updateDate = this.updateDate, ) -} \ No newline at end of file +} + + +/** + * LocalDate -> epoch millis (start of day) + */ +fun LocalDate.toTimestamp( + zoneId: ZoneId = ZoneId.of("UTC") +): Long = + this + .atStartOfDay(zoneId) + .toInstant() + .toEpochMilli() + +/** + * LocalDateTime -> epoch millis + */ +fun LocalDateTime.toTimestamp( + zoneId: ZoneId = ZoneId.of("UTC") +): Long = + this + .atZone(zoneId) + .toInstant() + .toEpochMilli() + +/** + * epoch millis -> LocalDate + */ +fun Long.toLocalDate( + zoneId: ZoneId = ZoneId.of("UTC") +): LocalDate = + Instant.ofEpochMilli(this) + .atZone(zoneId) + .toLocalDate() + +/** + * epoch millis -> LocalDateTime + */ +fun Long.toLocalDateTime( + zoneId: ZoneId = ZoneId.of("UTC") +): LocalDateTime = + Instant.ofEpochMilli(this) + .atZone(zoneId) + .toLocalDateTime() diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt index db020e061..a712bc771 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt @@ -4,6 +4,7 @@ import co.nilin.opex.api.core.inout.FeeConfig import co.nilin.opex.api.core.inout.PairConfigResponse import co.nilin.opex.api.core.inout.UserFee import co.nilin.opex.api.core.inout.WithdrawLimitConfig +import co.nilin.opex.api.core.inout.analytics.DailyAmount import co.nilin.opex.api.core.spi.AccountantProxy import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.common.utils.Interval @@ -139,5 +140,50 @@ class AccountantProxyImpl(@Qualifier("generalWebClient") private val webClient: } } + override suspend fun getDailyWithdrawLast31Days( + uuid: String): List { + logger.info("fetching daily withdraw stats for {}", uuid) + return withContext(ProxyDispatchers.general) { + webClient.get() + .uri("$baseUrl/user-activity/withdraw/$uuid") + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } + + override suspend fun getDailyDepositLast31Days( + uuid: String): List { + + logger.info("fetching daily deposit stats for {}", uuid) + + return withContext(ProxyDispatchers.general) { + webClient.get() + .uri("$baseUrl/user-activity/deposit/$uuid") + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } + + override suspend fun getDailyTradeLast31Days( + uuid: String): List { + + logger.info("fetching daily trade stats for {}", uuid) + + return withContext(ProxyDispatchers.general) { + webClient.get() + .uri("$baseUrl/user-activity/trade/$uuid") + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt index 65740dbb7..c58755aa5 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt @@ -1,6 +1,7 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.* +import co.nilin.opex.api.core.inout.analytics.DailyAmount import co.nilin.opex.api.core.spi.WalletProxy import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.api.ports.proxy.data.TransactionRequest @@ -719,5 +720,24 @@ class WalletProxyImpl(@Qualifier("generalWebClient") private val webClient: WebC .bodyToMono() .awaitFirstOrElse { throw OpexError.BadRequest.exception() } } + + override suspend fun getDailyBalanceLast31Days( + token: String, + uuid: String + ): List { + + logger.info("fetching daily balance stats for {}", uuid) + + return withContext(ProxyDispatchers.wallet) { + webClient.get() + .uri("$baseUrl/stats/balance/$uuid") + .accept(MediaType.APPLICATION_JSON) + .header(HttpHeaders.AUTHORIZATION, "Bearer $token") + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } } diff --git a/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt b/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt index a3c125119..12a44e8b2 100644 --- a/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt +++ b/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt @@ -33,6 +33,4 @@ class MarketStatsController(private val marketQueryHandler: MarketQueryHandler) suspend fun getMostTrades(@RequestParam interval: Interval): TradeVolumeStat? { return marketQueryHandler.mostTrades(interval) } - - } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt index 648b3a70f..58af21af5 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt @@ -1,5 +1,6 @@ package co.nilin.opex.wallet.app.controller +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.inout.WalletData import co.nilin.opex.wallet.core.inout.WalletDataResponse import co.nilin.opex.wallet.core.inout.WalletTotal @@ -55,4 +56,13 @@ class WalletStatController( ): TotalAssetsSnapshot? { return totalAssetsSnapshotManager.getUserLastSnapshot(uuid) } + + @GetMapping("/balance/{userId}") + suspend fun getDailyBalanceLast31Days( + @PathVariable userId: String + ): List { + return walletDataManager.getLastDaysBalance( + userId = userId + ) + } } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt index 331a3a4cd..ba0635db6 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt @@ -9,7 +9,6 @@ import co.nilin.opex.wallet.core.inout.* import co.nilin.opex.wallet.core.model.* import co.nilin.opex.wallet.core.model.DepositType import co.nilin.opex.wallet.core.spi.* -import co.nilin.opex.wallet.ports.kafka.listener.submitter.DepositSubmitter import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service @@ -30,7 +29,8 @@ class DepositService( private val transferManager: TransferManager, private val currencyService: CurrencyServiceManager, @Value("\${app.deposit.snapshot.enabled:true}") - private val depositSnapshotEnabled: Boolean) { + private val depositSnapshotEnabled: Boolean +) { private val logger = LoggerFactory.getLogger(DepositService::class.java) @@ -123,7 +123,7 @@ class DepositService( transferRef: String?, chain: String?, attachment: String?, - depositType: co.nilin.opex.wallet.core.model.DepositType, + depositType: DepositType, gatewayUuid: String?, transferMethod: TransferMethod?, ): TransferResult? { @@ -166,12 +166,12 @@ class DepositService( } // todo add statusReason field - if (isValid || depositCommand.depositType == co.nilin.opex.wallet.core.model.DepositType.ON_CHAIN) { + if (isValid || depositCommand.depositType == DepositType.ON_CHAIN) { traceDepositService.saveDepositInNewTransaction(depositCommand) } - if (depositCommand.status != DepositStatus.DONE) { - throw OpexError.InvalidDeposit.exception() + if (!isValid && depositCommand.depositType != DepositType.OFF_CHAIN) { + return null } logger.info( diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt new file mode 100644 index 000000000..8bef67de8 --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.wallet.core.inout + +import java.math.BigDecimal +import java.time.LocalDate + +data class DailyAmount( + val date: LocalDate, + val totalAmount: BigDecimal +) diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt index 333e2c95c..afcc58233 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt @@ -1,10 +1,20 @@ package co.nilin.opex.wallet.core.spi +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot +import java.time.LocalDate interface TotalAssetsSnapshotManager { suspend fun createSnapshot() suspend fun getUserLastSnapshot( uuid: String ): TotalAssetsSnapshot? + suspend fun getLastDaysBalance( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long = 31 + ): List + + } \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt index a957e072b..c5901b03d 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt @@ -1,5 +1,6 @@ package co.nilin.opex.wallet.core.spi +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.inout.WalletData import co.nilin.opex.wallet.core.inout.WalletDataResponse import co.nilin.opex.wallet.core.inout.WalletTotal @@ -27,4 +28,10 @@ interface WalletDataManager { suspend fun findSystemWalletsTotal(): List suspend fun findUserWalletsTotal(): List? + + suspend fun getLastDaysBalance( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List } \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt index 7473644fe..fe2f24c8f 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt @@ -1,8 +1,10 @@ package co.nilin.opex.wallet.core.spi import co.nilin.opex.wallet.core.inout.CurrencyCommand +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.* import java.math.BigDecimal +import java.time.LocalDate interface WalletManager { @@ -40,4 +42,6 @@ interface WalletManager { suspend fun findWalletById(walletId: Long): Wallet? suspend fun findAllWalletsBriefNotZero(ownerId: Long): List + + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt index 67c7edc02..1b7259ff3 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt @@ -1,12 +1,15 @@ package co.nilin.opex.wallet.ports.postgres.dao +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.ports.postgres.model.TotalAssetsSnapshotModel import org.springframework.data.r2dbc.repository.Modifying import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal +import java.time.LocalDate @Repository interface TotalAssetsSnapshotRepository : ReactiveCrudRepository { @@ -46,4 +49,21 @@ interface TotalAssetsSnapshotRepository : ReactiveCrudRepository + + @Query( + """ + select snapshot_date as date, total_amount + from total_assets_snapshot + where uuid = :userId + and snapshot_date >= :startDate + and quote_currency = :quoteCurrency + order by snapshot_date desc + """ + ) + fun findDailyBalance( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt index 3bb827f64..9358ea5a3 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt @@ -1,22 +1,27 @@ package co.nilin.opex.wallet.ports.postgres.impl import co.nilin.opex.common.OpexError +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepositoryV2 import co.nilin.opex.wallet.ports.postgres.dao.TotalAssetsSnapshotRepository import co.nilin.opex.wallet.ports.postgres.util.toTotalAssetsSnapshot import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service +import java.time.LocalDate +import java.time.ZoneOffset @Service class TotalAssetsSnapshotImpl( private val currencyRepository: CurrencyRepositoryV2, private val totalAssetsSnapshotRepository: TotalAssetsSnapshotRepository, @Value("\${app.snapshot-currency}") - private val snapshotCurrency: String + private val snapshotCurrency: String, + @Value("\${app.zone-offset}") private val zoneOffsetString: String ) : TotalAssetsSnapshotManager { private val logger = LoggerFactory.getLogger(TotalAssetsSnapshotImpl::class.java) @@ -38,4 +43,27 @@ class TotalAssetsSnapshotImpl( ): TotalAssetsSnapshot? { return totalAssetsSnapshotRepository.findLastSnapshotByUuid(uuid).awaitFirstOrNull()?.toTotalAssetsSnapshot() } + + override suspend fun getLastDaysBalance( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return totalAssetsSnapshotRepository.findDailyBalance(userId, startDate, quatCurrency ?: snapshotCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } + } diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt index 0d1483a43..96fc114ce 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt @@ -1,9 +1,7 @@ package co.nilin.opex.wallet.ports.postgres.impl -import co.nilin.opex.wallet.core.inout.WalletCurrencyData -import co.nilin.opex.wallet.core.inout.WalletData -import co.nilin.opex.wallet.core.inout.WalletDataResponse -import co.nilin.opex.wallet.core.inout.WalletTotal +import co.nilin.opex.common.utils.CacheManager +import co.nilin.opex.wallet.core.inout.* import co.nilin.opex.wallet.core.model.WalletType import co.nilin.opex.wallet.core.spi.WalletDataManager import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepositoryV2 @@ -13,14 +11,23 @@ import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import kotlinx.coroutines.reactive.awaitFirstOrElse import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors @Component class WalletDataManagerImpl( private val walletRepository: WalletRepository, + private val totalAssetsSnapshotImpl: TotalAssetsSnapshotImpl, private val currencyRepositoryV2: CurrencyRepositoryV2, - private val objectMapper: ObjectMapper -) : WalletDataManager { + private val objectMapper: ObjectMapper, + private val cacheManager: CacheManager, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, + ) : WalletDataManager { override suspend fun findWalletDataByCriteria( uuid: String?, @@ -89,4 +96,56 @@ class WalletDataManagerImpl( WalletTotal(c, (allDepositedCurrency.filter { it.currency == c }?.firstOrNull()?.balance) ?: 0.0) }?.collectList()?.awaitFirstOrNull() } + + override suspend fun getLastDaysBalance( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "trade:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = totalAssetsSnapshotImpl.getLastDaysBalance(userId, startDate, quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "trade:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt index c50d859b0..90623da02 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt @@ -3,6 +3,7 @@ package co.nilin.opex.wallet.ports.postgres.impl import co.nilin.opex.common.OpexError import co.nilin.opex.wallet.core.exc.ConcurrentBalanceChangException import co.nilin.opex.wallet.core.inout.CurrencyCommand +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.* import co.nilin.opex.wallet.core.spi.WalletManager import co.nilin.opex.wallet.ports.postgres.dao.* @@ -13,12 +14,17 @@ import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitFirstOrElse import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service import java.math.BigDecimal +import java.time.LocalDate import java.time.LocalDateTime +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors @Service class WalletManagerImplV2( @@ -294,12 +300,9 @@ class WalletManagerImplV2( ) } } - override suspend fun findWallet(ownerId: Long, currency: String, walletType: WalletType): BriefWallet? { val wallet = walletRepository.findByOwnerAndTypeAndCurrency(ownerId, walletType, currency) .awaitSingleOrNull() ?: return null return BriefWallet(wallet.id, wallet.owner, wallet.balance, wallet.currency, wallet.type) } - - } From da631767513a4ef8d07f73baab8f0bbcdc61fe9c Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Sun, 4 Jan 2026 19:10:15 +0330 Subject: [PATCH 4/9] Change the property of accountant in test profile --- accountant/accountant-app/src/test/resources/application.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/accountant/accountant-app/src/test/resources/application.yml b/accountant/accountant-app/src/test/resources/application.yml index 60c9a7a91..f495cb20e 100644 --- a/accountant/accountant-app/src/test/resources/application.yml +++ b/accountant/accountant-app/src/test/resources/application.yml @@ -57,4 +57,5 @@ app: delay-multiplier: 3 zone-offset: +03:30 trade-volume-calculation-currency: USDT - withdraw-volume-calculation-currency: USDT \ No newline at end of file + withdraw-volume-calculation-currency: USDT + deposit-volume-calculation-currency: USDT \ No newline at end of file From a2fdd4b2964b7580d2782a573ce99ef67d2561e3 Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Sun, 4 Jan 2026 19:37:23 +0330 Subject: [PATCH 5/9] Change the property of wallet in test profile --- wallet/wallet-app/src/test/resources/application.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/wallet/wallet-app/src/test/resources/application.yml b/wallet/wallet-app/src/test/resources/application.yml index 9efe60395..48f868180 100644 --- a/wallet/wallet-app/src/test/resources/application.yml +++ b/wallet/wallet-app/src/test/resources/application.yml @@ -72,6 +72,9 @@ app: enabled: false otp-required-count: 0 bank-account-validation : false + deposit: + snapshot: + enabled: ${DEPOSIT_SNAPSHOT_ENABLED:false} testcontainers: db: username: ${dbusername:opex} From a11178c95c42870856ff3aba160fd952ddf5946d Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Sun, 4 Jan 2026 19:44:30 +0330 Subject: [PATCH 6/9] Set transferMethod in the service instead of controller --- .../kotlin/co/nilin/opex/wallet/app/service/DepositService.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt index ba0635db6..2cb75f481 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt @@ -381,8 +381,9 @@ class DepositService( depositType = co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN, network = null, attachment = null, - transferMethod = + transferMethod = if (request.transferMethod == TransferMethod.REWARD) TransferMethod.REWARD else { if (request.isIPG == true) TransferMethod.IPG else TransferMethod.MPG + } ) traceDepositService.saveDepositInNewTransaction(deposit) From dfdcc1bc6e0fd5e97d6e57ce8319d0de475e3959 Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Sun, 4 Jan 2026 20:21:59 +0330 Subject: [PATCH 7/9] Fix bug --- .../kotlin/co/nilin/opex/wallet/app/service/DepositService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt index 2cb75f481..79d6bebb2 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt @@ -347,7 +347,7 @@ class DepositService( ): PaymentDepositResponse { val currency = currencyService - .fetchCurrency(FetchCurrency(symbol = request.currency.name)) + .fetchCurrency(FetchCurrency(symbol = request.currency)) ?: throw OpexError.CurrencyNotFound.exception() val sourceOwner = walletOwnerManager From db46daf9b5782bb01c0a14ba3b480013bb3bdd3c Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Tue, 6 Jan 2026 15:26:26 +0330 Subject: [PATCH 8/9] Calculate the cachout wallets in balance snapshot --- .../ports/postgres/dao/UserTradeVolumeRepository.kt | 4 ++-- .../kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt | 1 - .../nilin/opex/wallet/app/controller/WalletStatController.kt | 1 - .../nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt index 730452f0e..d52675cb0 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt @@ -63,12 +63,12 @@ interface UserTradeVolumeRepository : ReactiveCrudRepository= :startDate and quote_currency = :quoteCurrency - order by date desc + group by date """ ) fun findDailyTradeVolume( diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt index ef1548906..ce6d27821 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt @@ -47,7 +47,6 @@ class SecurityConfig(private val webClient: WebClient) { .pathMatchers("/admin/v1/swap/history").hasAnyAuthority("ROLE_monitoring", "ROLE_admin") .pathMatchers("/admin/**").hasAuthority("ROLE_admin") .pathMatchers("/stats/total-assets/**").permitAll() - .pathMatchers("/stats/**").hasAuthority("ROLE_admin") .pathMatchers(HttpMethod.GET, "/currency/**").permitAll() .pathMatchers("/actuator/**").permitAll() .pathMatchers("/storage/**").hasAuthority("ROLE_admin") diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt index 019da597c..6af632f02 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt @@ -74,4 +74,3 @@ class WalletStatController( } - diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt index 20c81d23a..ee560ff6d 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/WalletRepository.kt @@ -133,7 +133,7 @@ interface WalletRepository : ReactiveCrudRepository { """ select currency, sum(balance) as balance from wallet w join wallet_owner wo on w.owner = wo.id - where wallet_type in ('MAIN', 'EXCHANGE') + where wallet_type in ('MAIN', 'EXCHANGE', 'CASHOUT') and wo.uuid != '1' and w.id not in (select wallet_id from wallet_stat_exclusion) group by currency From b59e6cfc8f45a47e9908df731e55e0337c3bd029 Mon Sep 17 00:00:00 2001 From: fatemeh imanipour Date: Tue, 6 Jan 2026 18:18:17 +0330 Subject: [PATCH 9/9] Handle exception in Api filter --- .../opex/api/app/config/RateLimitConfig.kt | 15 ++++---- .../api/app/interceptor/APIKeyFilterImpl.kt | 12 ++++--- .../app/service/OpexFilterExceptionHandler.kt | 36 +++++++++++++++++++ .../kotlin/co/nilin/opex/common/OpexError.kt | 4 +++ .../opex/wallet/app/service/DepositService.kt | 9 ++--- 5 files changed, 58 insertions(+), 18 deletions(-) create mode 100644 api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/OpexFilterExceptionHandler.kt diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/config/RateLimitConfig.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/config/RateLimitConfig.kt index fa67d5e65..e11c77efa 100644 --- a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/config/RateLimitConfig.kt +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/config/RateLimitConfig.kt @@ -108,12 +108,13 @@ class RateLimitConfig( retryAfterSeconds: Int ): Mono { logger.info("Rate limit exceeded ($identity) -- $method:$url") - exchange.response.statusCode = HttpStatus.TOO_MANY_REQUESTS - return exchange.response.writeWith( - Mono.just( - exchange.response.bufferFactory() - .wrap("Rate limit exceeded ($identity) -- $method:$url -- Retry-After, $retryAfterSeconds".toByteArray()) - ) - ) +// exchange.response.statusCode = HttpStatus.TOO_MANY_REQUESTS + throw OpexError.RateLimit.exception() +// return exchange.response.writeWith( +// Mono.just( +// exchange.response.bufferFactory() +// .wrap("Rate limit exceeded ($identity) -- $method:$url -- Retry-After, $retryAfterSeconds".toByteArray()) +// ) +// ) } } \ No newline at end of file diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt index a02319314..6b505346d 100644 --- a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/interceptor/APIKeyFilterImpl.kt @@ -4,8 +4,10 @@ import co.nilin.opex.api.app.security.ClientCredentialsTokenService import co.nilin.opex.api.app.security.HmacVerifier import co.nilin.opex.api.core.spi.APIKeyService import co.nilin.opex.api.core.spi.APIKeyFilter +import co.nilin.opex.common.OpexError import kotlinx.coroutines.reactor.mono import org.slf4j.LoggerFactory +import org.springframework.http.HttpStatus import org.springframework.stereotype.Component import org.springframework.web.server.ServerWebExchange import org.springframework.web.server.WebFilter @@ -42,17 +44,17 @@ class APIKeyFilterImpl( val sourceIp = request.remoteAddress?.address?.hostAddress if (!entry.allowedIps.isNullOrEmpty() && (sourceIp == null || !entry.allowedIps!!.contains(sourceIp))) { logger.warn("API key {} request from disallowed IP {}", apiKeyId, sourceIp) - null + throw OpexError.Forbidden.exception() } if (!entry.allowedEndpoints.isNullOrEmpty() && ( !entry.allowedEndpoints!!.contains(path))) { logger.warn("API key {} request to unauthorized resource {}", apiKeyId, path) - null + throw OpexError.Forbidden.exception() } else { val ts = tsHeader.toLongOrNull() val bodyHash = request.headers["X-API-BODY-SHA256"]?.firstOrNull() if (ts == null) { logger.warn("Invalid timestamp header for bot {}", apiKeyId) - null + throw OpexError.InvalidTime.exception() } else { val ok = hmacVerifier.verify( entry.secret, @@ -67,12 +69,12 @@ class APIKeyFilterImpl( ) if (!ok) { logger.warn("Invalid signature for apiKey {}", apiKeyId) - null + throw OpexError.InvalidSignature.exception() } else { val userId = entry.keycloakUserId if (userId.isNullOrBlank()) { logger.warn("API key {} has no mapped Keycloak userId; rejecting", apiKeyId) - null + throw OpexError.UnAuthorized.exception() } else { val bearer = clientTokenService.exchangeToUserToken(userId) val req = request.mutate() diff --git a/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/OpexFilterExceptionHandler.kt b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/OpexFilterExceptionHandler.kt new file mode 100644 index 000000000..131fb0522 --- /dev/null +++ b/api/api-app/src/main/kotlin/co/nilin/opex/api/app/service/OpexFilterExceptionHandler.kt @@ -0,0 +1,36 @@ +package co.nilin.opex.api.app.service + +import co.nilin.opex.utility.error.data.OpexException +import co.nilin.opex.utility.error.spi.ErrorTranslator +import com.fasterxml.jackson.databind.ObjectMapper +import org.springframework.boot.web.reactive.error.ErrorWebExceptionHandler +import org.springframework.core.annotation.Order +import org.springframework.http.HttpStatusCode +import org.springframework.http.MediaType +import org.springframework.stereotype.Component +import org.springframework.web.server.ServerWebExchange +import reactor.core.publisher.Mono + +@Component +@Order(-2) +class OpexFilterExceptionHandler( + private val translator: ErrorTranslator, + private val objectMapper: ObjectMapper +) : ErrorWebExceptionHandler { + + override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono { + + if (ex is OpexException) { + return translator.translate(ex).flatMap { error -> + exchange.response.statusCode = HttpStatusCode.valueOf(error.status.value()) + exchange.response.headers.contentType = MediaType.APPLICATION_JSON + + val bytes = objectMapper.writeValueAsBytes(error) + val buffer = exchange.response.bufferFactory().wrap(bytes) + + exchange.response.writeWith(Mono.just(buffer)) + } + } + return Mono.error(ex) + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/co/nilin/opex/common/OpexError.kt b/common/src/main/kotlin/co/nilin/opex/common/OpexError.kt index 2e44f81fa..8fab4bbe5 100644 --- a/common/src/main/kotlin/co/nilin/opex/common/OpexError.kt +++ b/common/src/main/kotlin/co/nilin/opex/common/OpexError.kt @@ -18,6 +18,10 @@ enum class OpexError(val code: Int, val message: String?, val status: HttpStatus InvalidRequestBody(1021, "Request body is invalid", HttpStatus.BAD_REQUEST), NoRecordFound(1022, "No record found for this service", HttpStatus.NOT_FOUND), ServiceDeprecated(1023, "Service deprecated", HttpStatus.SERVICE_UNAVAILABLE), + RateLimit(1024, null, HttpStatus.TOO_MANY_REQUESTS), + InvalidSignature(1025, null, HttpStatus.BAD_REQUEST), + InvalidTime(1026, null, HttpStatus.BAD_REQUEST), + // code 2000: accountant InvalidPair(2001, "%s is not available", HttpStatus.BAD_REQUEST), diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt index 79d6bebb2..bc9500e14 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt @@ -165,12 +165,9 @@ class DepositService( depositCommand.status = DepositStatus.INVALID } - // todo add statusReason field - if (isValid || depositCommand.depositType == DepositType.ON_CHAIN) { - traceDepositService.saveDepositInNewTransaction(depositCommand) - } + traceDepositService.saveDepositInNewTransaction(depositCommand) - if (!isValid && depositCommand.depositType != DepositType.OFF_CHAIN) { + if (!isValid) { return null } @@ -381,7 +378,7 @@ class DepositService( depositType = co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN, network = null, attachment = null, - transferMethod = if (request.transferMethod == TransferMethod.REWARD) TransferMethod.REWARD else { + transferMethod = if (request.transferMethod == TransferMethod.REWARD) TransferMethod.REWARD else { if (request.isIPG == true) TransferMethod.IPG else TransferMethod.MPG } )