diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt index 9ee0a1322..4a160cefb 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/config/AppConfig.kt @@ -74,7 +74,7 @@ class AppConfig( feeCalculator: FeeCalculator, financialActionPublisher: FinancialActionPublisher, currencyRatePersister: CurrencyRatePersister, - userVolumePersister: UserVolumePersister + userVolumePersister: UserTradeVolumePersister ): TradeManager { return TradeManagerImpl( financeActionPersister, @@ -157,4 +157,12 @@ class AppConfig( withdrawRequestKafkaListener.addListener(withdrawRequestEventListener) } + + @Autowired + fun configureDepositEventListener( + depositKafkaListener: DepositKafkaListener, + depositEventListener: DepositEventListener + ) { + depositKafkaListener.addListener(depositEventListener) + } } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt new file mode 100644 index 000000000..e447f7646 --- /dev/null +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDailyActivityController.kt @@ -0,0 +1,46 @@ +package co.nilin.opex.accountant.app.controller + +import co.nilin.opex.accountant.core.api.DepositActivityManager +import co.nilin.opex.accountant.core.api.TradeActivityManager +import co.nilin.opex.accountant.core.api.WithdrawActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/user-activity") +class UserDailyActivityController( + private val withdrawManager: WithdrawActivityManager, + private val depositManager: DepositActivityManager, + private val TradeManager: TradeActivityManager +) { + + @GetMapping("/withdraw/{userId}") + suspend fun getDailyWithdrawLast31Days( + @PathVariable userId: String + ): List { + return withdrawManager.getLastDaysWithdrawActivity( + userId = userId + ) + } + + @GetMapping("/deposit/{userId}") + suspend fun getDailyDepositLast31Days( + @PathVariable userId: String + ): List { + return depositManager.getLastDaysDepositActivity( + userId = userId + ) + } + + @GetMapping("/trade/{userId}") + suspend fun getDailyTradeLast31Days( + @PathVariable userId: String + ): List { + return TradeManager.getLastDaysTradeActivity( + userId = userId + ) + } +} \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt index 49b9bdfde..4066e8730 100644 --- a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/controller/UserDataController.kt @@ -2,7 +2,7 @@ package co.nilin.opex.accountant.app.controller import co.nilin.opex.accountant.core.api.FeeCalculator import co.nilin.opex.accountant.core.model.UserFee -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.core.spi.UserWithdrawVolumePersister import co.nilin.opex.common.utils.Interval import org.springframework.beans.factory.annotation.Value @@ -13,7 +13,7 @@ import java.time.LocalDateTime @RestController @RequestMapping("/user/data") class UserDataController( - private val userVolumePersister: UserVolumePersister, + private val userVolumePersister: UserTradeVolumePersister, private val feeCalculator: FeeCalculator, private val userWithdrawVolumePersister: UserWithdrawVolumePersister, @Value("\${app.trade-volume-calculation-currency}") @@ -60,4 +60,5 @@ class UserDataController( (interval?.getLocalDateTime() ?: LocalDateTime.now()) ) } + } \ No newline at end of file diff --git a/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt new file mode 100644 index 000000000..01a2662d0 --- /dev/null +++ b/accountant/accountant-app/src/main/kotlin/co/nilin/opex/accountant/app/listener/DepositEventListener.kt @@ -0,0 +1,38 @@ +package co.nilin.opex.accountant.app.listener + +import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent +import co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent +import co.nilin.opex.accountant.ports.kafka.listener.spi.DepositListener +import co.nilin.opex.accountant.ports.kafka.listener.spi.WithdrawRequestListener +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class DepositEventListener(private val userDepositVolumePersister: UserDepositVolumePersister) : + DepositListener { + + private val logger = LoggerFactory.getLogger(DepositEventListener::class.java) + val scope = CoroutineScope(Dispatchers.IO) + override fun id(): String { + return "DepositEventListener" + } + + override fun onEvent( + event: DepositEvent, + partition: Int, + offset: Long, + timestamp: Long + ) { + logger.info("==========================================================================") + logger.info("Incoming Deposit event: $event") + logger.info("==========================================================================") + scope.launch { + userDepositVolumePersister.update(event.uuid, event.currency, event.amount, event.createDate) + } + } + +} diff --git a/accountant/accountant-app/src/main/resources/application.yml b/accountant/accountant-app/src/main/resources/application.yml index 0d588412a..a2b0499a6 100644 --- a/accountant/accountant-app/src/main/resources/application.yml +++ b/accountant/accountant-app/src/main/resources/application.yml @@ -108,6 +108,7 @@ app: zone-offset: +03:30 trade-volume-calculation-currency: ${TRADE_VOLUME_CALCULATION_CURRENCY:USDT} withdraw-volume-calculation-currency: ${WITHDRAW_VOLUME_CALCULATION_CURRENCY:USDT} + deposit-volume-calculation-currency: ${DEPOSIT_VOLUME_CALCULATION_CURRENCY:USDT} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} diff --git a/accountant/accountant-app/src/test/resources/application.yml b/accountant/accountant-app/src/test/resources/application.yml index 60c9a7a91..f495cb20e 100644 --- a/accountant/accountant-app/src/test/resources/application.yml +++ b/accountant/accountant-app/src/test/resources/application.yml @@ -57,4 +57,5 @@ app: delay-multiplier: 3 zone-offset: +03:30 trade-volume-calculation-currency: USDT - withdraw-volume-calculation-currency: USDT \ No newline at end of file + withdraw-volume-calculation-currency: USDT + deposit-volume-calculation-currency: USDT \ No newline at end of file diff --git a/accountant/accountant-core/pom.xml b/accountant/accountant-core/pom.xml index 4ef1fd63d..9e689f4d0 100644 --- a/accountant/accountant-core/pom.xml +++ b/accountant/accountant-core/pom.xml @@ -66,5 +66,11 @@ jackson-datatype-jsr310 2.15.2 + + com.sun.mail + mailapi + 1.6.2 + compile + diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt new file mode 100644 index 000000000..b655d77ac --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/DepositActivityManager.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.DailyAmount + +interface DepositActivityManager { + suspend fun getLastDaysDepositActivity( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt new file mode 100644 index 000000000..c19491256 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/TradeActivityManager.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.DailyAmount + +interface TradeActivityManager { + suspend fun getLastDaysTradeActivity( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt new file mode 100644 index 000000000..c21b4eb56 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/api/WithdrawActivityManager.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.accountant.core.api + +import co.nilin.opex.accountant.core.model.DailyAmount + +interface WithdrawActivityManager { + suspend fun getLastDaysWithdrawActivity( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt new file mode 100644 index 000000000..c30d8c689 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/model/DailyAmount.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.accountant.core.model + +import java.math.BigDecimal +import java.time.LocalDate + +data class DailyAmount( + val date: LocalDate, + val totalAmount: BigDecimal +) diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt new file mode 100644 index 000000000..22c5fa720 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/DepositActivityManagerImpl.kt @@ -0,0 +1,73 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.DepositActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister +import co.nilin.opex.common.utils.CacheManager +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors + +@Service +class DepositActivityManagerImpl( + private val cacheManager: CacheManager, + private val depositVolumePersister: UserDepositVolumePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, +) : DepositActivityManager { + override suspend fun getLastDaysDepositActivity( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "deposit:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = depositVolumePersister.getLastDaysDeposit(userId, startDate, quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "deposit:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt index 6e5b29273..789cc388f 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImpl.kt @@ -4,7 +4,7 @@ import co.nilin.opex.accountant.core.api.FeeCalculator import co.nilin.opex.accountant.core.model.* import co.nilin.opex.accountant.core.spi.FeeConfigService import co.nilin.opex.accountant.core.spi.JsonMapper -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.core.spi.WalletProxy import co.nilin.opex.common.utils.CacheManager import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit class FeeCalculatorImpl( private val walletProxy: WalletProxy, private val feeConfigService: FeeConfigService, - private val userVolumePersister: UserVolumePersister, + private val userVolumePersister: UserTradeVolumePersister, @Qualifier("appCacheManager") private val cacheManager: CacheManager, @Value("\${app.address}") private val platformAddress: String, @Value("\${app.zone-offset}") private val zoneOffsetString: String, diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt new file mode 100644 index 000000000..51ed4dbd8 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeActivityManagerImpl.kt @@ -0,0 +1,73 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.TradeActivityManager +import co.nilin.opex.accountant.core.api.WithdrawActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister +import co.nilin.opex.common.utils.CacheManager +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors +@Service +class TradeActivityManagerImpl( + private val cacheManager: CacheManager, + private val tradeVolumePersister: UserTradeVolumePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, +) : TradeActivityManager { + override suspend fun getLastDaysTradeActivity( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "trade:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = tradeVolumePersister.getLastDaysTrade(userId, startDate,quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "trade:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt index 005008af0..f6ce24662 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImpl.kt @@ -9,7 +9,6 @@ import co.nilin.opex.accountant.core.model.* import co.nilin.opex.accountant.core.spi.* import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Value import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal import java.time.LocalDateTime @@ -25,7 +24,7 @@ open class TradeManagerImpl( private val feeCalculator: FeeCalculator, private val financialActionPublisher: FinancialActionPublisher, private val currencyRatePersister: CurrencyRatePersister, - private val userVolumePersister: UserVolumePersister, + private val userVolumePersister: UserTradeVolumePersister, private val tradeVolumeCalculationCurrency: String, private val zoneOffsetString: String, ) : TradeManager { diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt new file mode 100644 index 000000000..cbf8dd491 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/service/WithdrawActivityManagerImpl.kt @@ -0,0 +1,72 @@ +package co.nilin.opex.accountant.core.service + +import co.nilin.opex.accountant.core.api.WithdrawActivityManager +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserWithdrawVolumePersister +import co.nilin.opex.common.utils.CacheManager +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors +@Service +class WithdrawActivityManagerImpl( + private val cacheManager: CacheManager, + private val withdrawVolumePersister: UserWithdrawVolumePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, +) : WithdrawActivityManager { + override suspend fun getLastDaysWithdrawActivity( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "withdraw:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = withdrawVolumePersister.getLastDaysWithdraw(userId, startDate,quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "withdraw:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt new file mode 100644 index 000000000..496ccf3a4 --- /dev/null +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserDepositVolumePersister.kt @@ -0,0 +1,20 @@ +package co.nilin.opex.accountant.core.spi + +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.model.WithdrawStatus +import java.math.BigDecimal +import java.time.LocalDate +import java.time.LocalDateTime + +interface UserDepositVolumePersister { + suspend fun update( + userId: String, + currency: String, + amount: BigDecimal, + date: LocalDateTime) + + suspend fun getTotalValueByUserAndDateAfter(uuid: String, startDate: LocalDateTime): BigDecimal + + suspend fun getLastDaysDeposit(userId: String, startDate: LocalDate?, quatCurrency: String?, lastDays: Long = 31): List + +} \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserTradeVolumePersister.kt similarity index 72% rename from accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserVolumePersister.kt rename to accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserTradeVolumePersister.kt index ba96e888c..5e6de6bbe 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserVolumePersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserTradeVolumePersister.kt @@ -1,9 +1,10 @@ package co.nilin.opex.accountant.core.spi +import co.nilin.opex.accountant.core.model.DailyAmount import java.math.BigDecimal import java.time.LocalDate -interface UserVolumePersister { +interface UserTradeVolumePersister { suspend fun update( userId: String, @@ -21,4 +22,6 @@ interface UserVolumePersister { startDate: LocalDate, quoteCurrency: String ): BigDecimal? + suspend fun getLastDaysTrade(userId: String, startDate: LocalDate?, quatCurrency: String?, lastDays: Long = 31): List + } \ No newline at end of file diff --git a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt index e283bf6e2..7bc056c49 100644 --- a/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt +++ b/accountant/accountant-core/src/main/kotlin/co/nilin/opex/accountant/core/spi/UserWithdrawVolumePersister.kt @@ -1,17 +1,17 @@ package co.nilin.opex.accountant.core.spi +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.core.model.WithdrawStatus import java.math.BigDecimal +import java.time.LocalDate import java.time.LocalDateTime interface UserWithdrawVolumePersister { suspend fun update( - userId: String, - currency: String, - amount: BigDecimal, - date: LocalDateTime, - withdrawStatus: WithdrawStatus + userId: String, currency: String, amount: BigDecimal, date: LocalDateTime, withdrawStatus: WithdrawStatus ) suspend fun getTotalValueByUserAndDateAfter(uuid: String, startDate: LocalDateTime): BigDecimal + + suspend fun getLastDaysWithdraw(userId: String, startDate: LocalDate?, quatCurrency: String?, lastDays: Long = 31): List } \ No newline at end of file diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt index 2c1ad86d3..55e69d80d 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/FeeCalculatorImplTest.kt @@ -5,7 +5,7 @@ import co.nilin.opex.accountant.core.model.FinancialActionCategory import co.nilin.opex.accountant.core.model.UserFee import co.nilin.opex.accountant.core.model.WalletType import co.nilin.opex.accountant.core.spi.FeeConfigService -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.core.spi.WalletProxy import co.nilin.opex.common.utils.CacheManager import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit internal class FeeCalculatorImplTest { private val walletProxy = mockk() private val feeConfigService = mockk() - private val userVolumePersister = mockk() + private val userVolumePersister = mockk() @Qualifier("appCacheManager") private val cacheManager = mockk>() diff --git a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt index e6d3f2f29..455204684 100644 --- a/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt +++ b/accountant/accountant-core/src/test/kotlin/co/nilin/opex/accountant/core/service/TradeManagerImplTest.kt @@ -30,7 +30,7 @@ internal class TradeManagerImplTest { private val richTradePublisher = mockk() private val financialActionPublisher = mockk() private val currencyRatePersister = mockk() - private val userVolumePersister = mockk() + private val userVolumePersister = mockk() private val feeCalculator = mockk() private val walletProxy = mockk() private val feeConfigService = mockk() diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt index 75cb6dd41..6ae6f37dd 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.listener.config import co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent import co.nilin.opex.accountant.ports.kafka.listener.consumer.* +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent import co.nilin.opex.accountant.ports.kafka.listener.inout.FinancialActionResponseEvent import co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent @@ -37,7 +38,7 @@ class AccountantKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent,kyc_level_updated_event:co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent,fiAction_response_event:co.nilin.opex.accountant.ports.kafka.listener.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent" + JsonDeserializer.TYPE_MAPPINGS to "order_request_event:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderRequestEvent,order_request_submit:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderSubmitRequestEvent,order_request_cancel:co.nilin.opex.accountant.ports.kafka.listener.inout.OrderCancelRequestEvent,kyc_level_updated_event:co.nilin.opex.accountant.core.inout.KycLevelUpdatedEvent,fiAction_response_event:co.nilin.opex.accountant.ports.kafka.listener.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.accountant.ports.kafka.listener.inout.WithdrawRequestEvent,depositEvent:co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent" ) } @@ -60,6 +61,10 @@ class AccountantKafkaConfig { fun withdrawRequestConsumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } + @Bean("depositConsumerFactory") + fun depositConsumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } @Autowired @ConditionalOnBean(TradeKafkaListener::class) @@ -157,6 +162,16 @@ class AccountantKafkaConfig { return KafkaTemplate(producerFactory) } + @Bean("depositProducerFactory") + fun depositProducerFactory(@Qualifier("consumerConfig") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("depositKafkaTemplate") + fun depositKafkaTemplate(@Qualifier("depositProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + @Autowired @ConditionalOnBean(KycLevelUpdatedKafkaListener::class) fun configureKycLevelUpdatedListener( @@ -187,6 +202,21 @@ class AccountantKafkaConfig { container.start() } + @Autowired + @ConditionalOnBean(DepositKafkaListener::class) + fun configureDepositRequestEventListener( + listener: DepositKafkaListener, + @Qualifier("depositKafkaTemplate") template: KafkaTemplate, + @Qualifier("depositConsumerFactory") consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("deposit")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("DepositKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "deposit.DLT") + container.start() + } + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> cr.headers().add("dlt-origin-module", "ACCOUNTANT".toByteArray()) diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt new file mode 100644 index 000000000..da4347aa9 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/consumer/DepositKafkaListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.accountant.ports.kafka.listener.consumer + +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent +import co.nilin.opex.accountant.ports.kafka.listener.spi.DepositListener +import org.springframework.stereotype.Component + +@Component +class DepositKafkaListener : EventConsumer() \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt new file mode 100644 index 000000000..37624a599 --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/inout/DepositEvent.kt @@ -0,0 +1,12 @@ +package co.nilin.opex.accountant.ports.kafka.listener.inout + +import java.math.BigDecimal +import java.time.LocalDateTime + +data class DepositEvent( + val uuid: String, + val depositRef: String? = null, + val currency: String, + val amount: BigDecimal, + val createDate: LocalDateTime, +) diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt new file mode 100644 index 000000000..765899b4f --- /dev/null +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/spi/DepositListener.kt @@ -0,0 +1,5 @@ +package co.nilin.opex.accountant.ports.kafka.listener.spi + +import co.nilin.opex.accountant.ports.kafka.listener.inout.DepositEvent + +interface DepositListener : Listener \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt new file mode 100644 index 000000000..56f5f4c03 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserDepositVolumeRepository.kt @@ -0,0 +1,63 @@ +package co.nilin.opex.accountant.ports.postgres.dao + +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.ports.postgres.model.UserDepositVolumeModel +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.math.BigDecimal +import java.time.LocalDate + +@Repository +interface UserDepositVolumeRepository : ReactiveCrudRepository { + + @Query( + """ + insert into user_deposit_volume (user_id, date, total_amount, quote_currency) + values (:userId, :date, :totalAmount, :quoteCurrency) + on conflict (user_id, date,quote_currency) + do update + set total_amount = user_deposit_volume.total_amount + EXCLUDED.total_amount + """ + ) + fun insertOrUpdate( + userId: String, + date: LocalDate, + totalAmount: BigDecimal, + quoteCurrency: String + ): Mono + + + @Query( + """ + select sum(total_amount) as total_Amount + from user_deposit_volume + where user_id = :userId and date >= :startDate and quote_currency=:quoteCurrency + group by user_id + """ + ) + fun findTotalValueByUserAndAndDateAfter( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Mono + + + @Query( + """ + select date, total_amount + from user_deposit_volume + where user_id = :userId + and date >= :startDate + and quote_currency = :quoteCurrency + order by date desc + """ + ) + fun findDailyDepositVolume( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt index 953c45d9f..730452f0e 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserTradeVolumeRepository.kt @@ -1,9 +1,11 @@ package co.nilin.opex.accountant.ports.postgres.dao +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.ports.postgres.model.UserTradeVolumeModel import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.LocalDate @@ -58,4 +60,20 @@ interface UserTradeVolumeRepository : ReactiveCrudRepository + + @Query( + """ + select date, total_amount + from user_trade_volume + where user_id = :userId + and date >= :startDate + and quote_currency = :quoteCurrency + order by date desc + """ + ) + fun findDailyTradeVolume( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt index 36a158e73..8f65a4a38 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/dao/UserWithdrawVolumeRepository.kt @@ -1,9 +1,11 @@ package co.nilin.opex.accountant.ports.postgres.dao +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.ports.postgres.model.UserWithdrawVolumeModel import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal import java.time.LocalDate @@ -41,4 +43,20 @@ interface UserWithdrawVolumeRepository : ReactiveCrudRepository + + @Query( + """ + select date, total_amount + from user_withdraw_volume + where user_id = :userId + and date >= :startDate + and quote_currency = :quoteCurrency + order by date desc + """ + ) + fun findDailyWithdrawVolume( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt new file mode 100644 index 000000000..580c3ca36 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserDepositVolumePersisterImpl.kt @@ -0,0 +1,82 @@ +package co.nilin.opex.accountant.ports.postgres.impl + +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.model.WithdrawStatus +import co.nilin.opex.accountant.core.spi.CurrencyRatePersister +import co.nilin.opex.accountant.core.spi.UserDepositVolumePersister +import co.nilin.opex.accountant.ports.postgres.dao.UserDepositVolumeRepository +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle +import kotlinx.coroutines.reactor.awaitSingleOrNull +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.math.RoundingMode +import java.time.LocalDate + + +@Component +class UserDepositVolumePersisterImpl( + private val repository: UserDepositVolumeRepository, + private val currencyRatePersister: CurrencyRatePersister, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, + @Value("\${app.deposit-volume-calculation-currency}") private val calculationCurrency: String, + @Value("\${app.deposit-volume-calculation-currency-precision:2}") private val calculationCurrencyPrecision: Int + +) : UserDepositVolumePersister { + + override suspend fun update( + userId: String, + currency: String, + amount: BigDecimal, + date: LocalDateTime + ) { + val rate = if (currency == calculationCurrency) BigDecimal.ONE + else currencyRatePersister.getRate(currency, calculationCurrency) + + val signedAmount = amount.multiply(rate).setScale(calculationCurrencyPrecision, RoundingMode.DOWN) + + repository.insertOrUpdate( + userId, + date.atOffset(ZoneOffset.of(zoneOffsetString)).toLocalDate(), + signedAmount, + calculationCurrency + ).awaitSingleOrNull() + } + + override suspend fun getTotalValueByUserAndDateAfter( + uuid: String, + startDate: LocalDateTime + ): BigDecimal { + return repository.findTotalValueByUserAndAndDateAfter( + uuid, + startDate.atOffset(ZoneOffset.of(zoneOffsetString)).toLocalDate(), + calculationCurrency + ).awaitFirstOrNull() ?: BigDecimal.ZERO + } + + override suspend fun getLastDaysDeposit( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return repository + .findDailyDepositVolume(userId, startDate, quatCurrency?:calculationCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } +} diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserTradeVolumePersisterImpl.kt similarity index 50% rename from accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserVolumePersisterImpl.kt rename to accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserTradeVolumePersisterImpl.kt index 16fd79f1a..9291c0d36 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserVolumePersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserTradeVolumePersisterImpl.kt @@ -1,14 +1,23 @@ package co.nilin.opex.accountant.ports.postgres.impl -import co.nilin.opex.accountant.core.spi.UserVolumePersister +import co.nilin.opex.accountant.core.model.DailyAmount +import co.nilin.opex.accountant.core.spi.UserTradeVolumePersister import co.nilin.opex.accountant.ports.postgres.dao.UserTradeVolumeRepository +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull +import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import java.math.BigDecimal import java.time.LocalDate +import java.time.ZoneOffset @Component -class UserVolumePersisterImpl(private val repository: UserTradeVolumeRepository) : UserVolumePersister { +class UserTradeVolumePersisterImpl( + private val repository: UserTradeVolumeRepository, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, + @Value("\${app.trade-volume-calculation-currency}") private val calculationCurrency: String, + + ) : UserTradeVolumePersister { override suspend fun update( userId: String, @@ -38,4 +47,27 @@ class UserVolumePersisterImpl(private val repository: UserTradeVolumeRepository) return repository.findTotalValueByUserAndAndDateAfterAndCurrency(uuid, currency, startDate, quoteCurrency) .awaitSingleOrNull() } + + override suspend fun getLastDaysTrade( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return repository + .findDailyTradeVolume(userId, startDate, quatCurrency ?: calculationCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt index dc02a6874..6e72f5bcc 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/impl/UserWithdrawVolumePersisterImpl.kt @@ -1,17 +1,20 @@ package co.nilin.opex.accountant.ports.postgres.impl +import co.nilin.opex.accountant.core.model.DailyAmount import co.nilin.opex.accountant.core.model.WithdrawStatus import co.nilin.opex.accountant.core.spi.CurrencyRatePersister import co.nilin.opex.accountant.core.spi.UserWithdrawVolumePersister import co.nilin.opex.accountant.ports.postgres.dao.UserWithdrawVolumeRepository import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import java.math.BigDecimal +import java.math.RoundingMode +import java.time.LocalDate import java.time.LocalDateTime import java.time.ZoneOffset -import java.math.RoundingMode @Component @@ -55,4 +58,27 @@ class UserWithdrawVolumePersisterImpl( calculationCurrency ).awaitFirstOrNull() ?: BigDecimal.ZERO } + + override suspend fun getLastDaysWithdraw( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return repository + .findDailyWithdrawVolume(userId, startDate, quatCurrency?:calculationCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } } diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt new file mode 100644 index 000000000..5f0cbead1 --- /dev/null +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/kotlin/co/nilin/opex/accountant/ports/postgres/model/UserDepositVolumeModel.kt @@ -0,0 +1,16 @@ +package co.nilin.opex.accountant.ports.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal +import java.time.LocalDate + +@Table("user_deposit_volume") +data class UserDepositVolumeModel( + @Id val id: Long? = null, + val userId: String, + val date: LocalDate, + val totalAmount: BigDecimal, + val quoteCurrency: String, +) diff --git a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql index a217b6e39..c4d6999c6 100644 --- a/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql +++ b/accountant/accountant-ports/accountant-persister-postgres/src/main/resources/schema.sql @@ -149,4 +149,14 @@ CREATE TABLE IF NOT EXISTS withdraw_limit_config daily_max_amount decimal not null ); +CREATE TABLE IF NOT EXISTS user_deposit_volume +( + id SERIAL PRIMARY KEY, + user_id VARCHAR(36) NOT NULL, + date DATE not null, + total_amount decimal not null, + quote_currency VARCHAR(50) NOT NULL, + unique (user_id, date,quote_currency) +); + COMMIT; diff --git a/api/api-app/src/main/resources/application.yml b/api/api-app/src/main/resources/application.yml index 8b654ad05..63ec4437e 100644 --- a/api/api-app/src/main/resources/application.yml +++ b/api/api-app/src/main/resources/application.yml @@ -134,9 +134,7 @@ app: id: opex-api-key binance: api-url: https://api1.binance.com - trade-volume-calculation-currency: ${TRADE_VOLUME_CALCULATION_CURRENCY:USDT} - withdraw-volume-calculation-currency: ${WITHDRAW_VOLUME_CALCULATION_CURRENCY:USDT} - total-asset-calculation-currency: ${TOTAL_ASSET_CALCULATION_CURRENCY:USDT} + user-activity-reference-currency: ${USER_ACTIVITY_REFERENCE_CURRENCY:USDT} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt index de0882559..de478609e 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/MarketBasicData.kt @@ -3,7 +3,5 @@ package co.nilin.opex.api.core.inout data class MarketBasicData( val quoteCurrencies: List, val referenceCurrencies: List, - val withdrawReferenceCurrency: String, - val tradeReferenceCurrency: String, - val totalAssetReferenceCurrency: String, + val userActivityReferenceCurrency: String ) diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt index 5799b1044..43203da9d 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/ActivityTotals.kt @@ -10,5 +10,5 @@ data class ActivityTotals( val totalWithdraw: BigDecimal, val totalDeposit: BigDecimal, val totalTrade: BigDecimal, - val totalSwap: BigDecimal + val totalOrder: BigDecimal ) diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt new file mode 100644 index 000000000..f07212c51 --- /dev/null +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/analytics/DailyAmount.kt @@ -0,0 +1,6 @@ +package co.nilin.opex.api.core.inout.analytics + +import java.math.BigDecimal +import java.time.LocalDate + +data class DailyAmount(val date: LocalDate, var totalAmount: BigDecimal) diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt index a98dfcc3d..1497d7a32 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/AccountantProxy.kt @@ -1,11 +1,10 @@ package co.nilin.opex.api.core.spi -import co.nilin.opex.api.core.inout.Chain -import co.nilin.opex.api.core.inout.ChainInfo import co.nilin.opex.api.core.inout.FeeConfig import co.nilin.opex.api.core.inout.PairConfigResponse import co.nilin.opex.api.core.inout.UserFee import co.nilin.opex.api.core.inout.WithdrawLimitConfig +import co.nilin.opex.api.core.inout.analytics.DailyAmount import co.nilin.opex.common.utils.Interval import java.math.BigDecimal @@ -24,4 +23,7 @@ interface AccountantProxy { suspend fun getWithdrawLimitConfigs(): List suspend fun getTotalWithdrawVolumeValue(uuid: String, interval: Interval?): BigDecimal + suspend fun getDailyDepositLast31Days(uuid: String): List + suspend fun getDailyWithdrawLast31Days(uuid: String): List + suspend fun getDailyTradeLast31Days(uuid: String): List } \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt index 7726d3940..c64ea66bb 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt @@ -1,6 +1,7 @@ package co.nilin.opex.api.core.spi import co.nilin.opex.api.core.inout.* +import co.nilin.opex.api.core.inout.analytics.DailyAmount import java.math.BigDecimal interface WalletProxy { @@ -197,4 +198,6 @@ interface WalletProxy { amount: BigDecimal, request: ManualTransferRequest ): TransferResult + + suspend fun getDailyBalanceLast31Days(token: String, uuid: String): List } \ No newline at end of file diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt index 037565ee5..234d4f6ad 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/MarketController.kt @@ -25,12 +25,8 @@ class MarketController( private val walletProxy: WalletProxy, private val matchingGatewayProxy: MatchingGatewayProxy, private val blockChainGatewayProxy: BlockchainGatewayProxy, - @Value("\${app.trade-volume-calculation-currency}") - private val tradeVolumeCalculationCurrency: String, - @Value("\${app.withdraw-volume-calculation-currency}") - private val withdrawVolumeCalculationCurrency: String, - @Value("\${app.total-asset-calculation-currency}") - private val totalAssetCalculationCurrency: String + @Value("\${app.user-activity-reference-currency}") + private val userActivityReferenceCurrency: String, ) { private val orderBookValidLimits = arrayListOf(5, 10, 20, 50, 100, 500, 1000, 5000) private val validDurations = arrayListOf("24h", "7d", "1M") @@ -266,9 +262,8 @@ class MarketController( return MarketBasicData( (quoteCurrencies.map { it.currency }), (quoteCurrencies.filter { it.isReference }.map { it.currency }), - withdrawVolumeCalculationCurrency, - tradeVolumeCalculationCurrency, - totalAssetCalculationCurrency + userActivityReferenceCurrency + ) } diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt index 6c86f7eaf..1451528d7 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt @@ -1,7 +1,9 @@ package co.nilin.opex.api.ports.opex.controller import co.nilin.opex.api.core.inout.analytics.ActivityTotals +import co.nilin.opex.api.ports.opex.service.UserActivityAggregationService import co.nilin.opex.api.ports.opex.util.jwtAuthentication +import co.nilin.opex.api.ports.opex.util.tokenValue import org.springframework.security.core.annotation.CurrentSecurityContext import org.springframework.security.core.context.SecurityContext import org.springframework.web.bind.annotation.GetMapping @@ -14,73 +16,11 @@ import kotlin.random.Random @RestController @RequestMapping("/opex/v1/analytics") -class UserAnalyticsController { +class UserAnalyticsController(private val userActivityAggregationService: UserActivityAggregationService) { @GetMapping("/user-activity") - suspend fun userActivity(@CurrentSecurityContext securityContext: SecurityContext): Map { - val jwt = securityContext.jwtAuthentication().token - val uuid = jwt.subject ?: "unknown" - - val zone = ZoneId.systemDefault() - val todayStart = LocalDate.now(zone).atStartOfDay(zone).toInstant() - - val days = 30 - val result = LinkedHashMap(days) - - // Initial balance seeded by user hash - var runningBalance = BigDecimal.valueOf(100 + (uuid.hashCode() and Int.MAX_VALUE) % 900L) - - for (i in (days - 1) downTo 0) { - val dayInstant = todayStart.minusSeconds(86400L * i) - val dayKey = dayInstant.toEpochMilli().toString() - - // deterministic seed per user+day - val seed = deterministicSeed(uuid.hashCode(), dayInstant.toEpochMilli()) - val rnd = Random(seed) - - val base = BigDecimal.valueOf((50 + rnd.nextInt(0, 950)).toLong()) // 50..999 - - val deposit = scaleMoney(base.multiply(BigDecimal.valueOf(rnd.nextDouble(0.0, 5.0)))) - val withdraw = scaleMoney(deposit.multiply(BigDecimal.valueOf(rnd.nextDouble(0.0, 0.9)))) - - val trade = scaleMoney(base.multiply(BigDecimal.valueOf(rnd.nextDouble(0.5, 8.0)))) - val swap = scaleMoney(base.multiply(BigDecimal.valueOf(rnd.nextDouble(0.0, 3.0)))) - - val pnlDrift = scaleMoney(trade.multiply(BigDecimal.valueOf(rnd.nextDouble(-0.01, 0.01)))) - - runningBalance = (runningBalance + deposit - withdraw + pnlDrift).coerceAtLeast(BigDecimal.ZERO) - - result[dayKey] = ActivityTotals( - totalBalance = runningBalance.min(MAX_BALANCE), - totalWithdraw = withdraw, - totalDeposit = deposit, - totalTrade = trade.min(MAX_TRADE), - totalSwap = swap.min(MAX_SWAP) - ) - } - - return result - } - - private fun scaleMoney(v: BigDecimal) = v.setScale(2, RoundingMode.HALF_UP) - - private val MAX_BALANCE = BigDecimal("10000000") - private val MAX_TRADE = BigDecimal("250000") - private val MAX_SWAP = BigDecimal("100000") - - /** - * Simple deterministic seed generator for a user and day. - * Combines user hash and day epoch millis into a reproducible seed. - */ - private fun deterministicSeed(userHash: Int, dayEpochMillis: Long): Long { - var seed = userHash.toLong() * 31 + dayEpochMillis - // simple bit mixing - seed = seed xor (seed shr 33) - seed *= 0xBF58476D1CE4E5BL - seed = seed xor (seed shr 33) - seed *= 0xc4ceb9fe1a85ec5L - seed = seed xor (seed shr 33) - return seed + suspend fun userActivity(@CurrentSecurityContext securityContext: SecurityContext): Map { + val auth=securityContext.jwtAuthentication() + return userActivityAggregationService.getLast31DaysUserStats(auth.tokenValue(),auth.name) } - } diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt new file mode 100644 index 000000000..02705f24e --- /dev/null +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/service/UserActivityAggregationService.kt @@ -0,0 +1,51 @@ +package co.nilin.opex.api.ports.opex.service + +import co.nilin.opex.api.core.inout.analytics.ActivityTotals +import co.nilin.opex.api.core.spi.AccountantProxy +import co.nilin.opex.api.core.spi.WalletProxy +import co.nilin.opex.api.ports.opex.util.toTimestamp +import org.springframework.stereotype.Service +import java.math.BigDecimal + +@Service +class UserActivityAggregationService( + private val walletProxy: WalletProxy, + private val accountantProxy: AccountantProxy +) { + + suspend fun getLast31DaysUserStats( + token: String, + userId: String + ): Map { + + val balances = walletProxy.getDailyBalanceLast31Days(token, userId) + val withdraws = accountantProxy.getDailyWithdrawLast31Days(userId) + val deposits = accountantProxy.getDailyDepositLast31Days(userId) + val trades = accountantProxy.getDailyTradeLast31Days(userId) + + // Collect all dates + val allDates = ( + balances.map { it.date } + + withdraws.map { it.date } + + deposits.map { it.date } + + trades.map { it.date } + ).toSet() + + // Index by date for fast lookup + val balanceMap = balances.associateBy { it.date } + val withdrawMap = withdraws.associateBy { it.date } + val depositMap = deposits.associateBy { it.date } + val tradeMap = trades.associateBy { it.date } + + // Build final map + return allDates.associateWith { date -> + ActivityTotals( + totalBalance = balanceMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalWithdraw = withdrawMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalDeposit = depositMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalTrade = tradeMap[date]?.totalAmount ?: BigDecimal.ZERO, + totalOrder = BigDecimal.ZERO + ) + }.mapKeys { (key, value) -> key.toTimestamp() }.toSortedMap() + } +} \ No newline at end of file diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt index ca232e74b..ad76448bc 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/util/ConvertorExtenstions.kt @@ -3,6 +3,10 @@ package co.nilin.opex.api.ports.opex.util import co.nilin.opex.api.core.inout.OrderData import co.nilin.opex.api.core.inout.OrderStatus import co.nilin.opex.api.ports.opex.data.OrderDataResponse +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.ZoneId fun OrderData.toResponse(): OrderDataResponse { return OrderDataResponse( @@ -20,4 +24,47 @@ fun OrderData.toResponse(): OrderDataResponse { createDate = this.createDate, updateDate = this.updateDate, ) -} \ No newline at end of file +} + + +/** + * LocalDate -> epoch millis (start of day) + */ +fun LocalDate.toTimestamp( + zoneId: ZoneId = ZoneId.of("UTC") +): Long = + this + .atStartOfDay(zoneId) + .toInstant() + .toEpochMilli() + +/** + * LocalDateTime -> epoch millis + */ +fun LocalDateTime.toTimestamp( + zoneId: ZoneId = ZoneId.of("UTC") +): Long = + this + .atZone(zoneId) + .toInstant() + .toEpochMilli() + +/** + * epoch millis -> LocalDate + */ +fun Long.toLocalDate( + zoneId: ZoneId = ZoneId.of("UTC") +): LocalDate = + Instant.ofEpochMilli(this) + .atZone(zoneId) + .toLocalDate() + +/** + * epoch millis -> LocalDateTime + */ +fun Long.toLocalDateTime( + zoneId: ZoneId = ZoneId.of("UTC") +): LocalDateTime = + Instant.ofEpochMilli(this) + .atZone(zoneId) + .toLocalDateTime() diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt index db020e061..a712bc771 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt @@ -4,6 +4,7 @@ import co.nilin.opex.api.core.inout.FeeConfig import co.nilin.opex.api.core.inout.PairConfigResponse import co.nilin.opex.api.core.inout.UserFee import co.nilin.opex.api.core.inout.WithdrawLimitConfig +import co.nilin.opex.api.core.inout.analytics.DailyAmount import co.nilin.opex.api.core.spi.AccountantProxy import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.common.utils.Interval @@ -139,5 +140,50 @@ class AccountantProxyImpl(@Qualifier("generalWebClient") private val webClient: } } + override suspend fun getDailyWithdrawLast31Days( + uuid: String): List { + logger.info("fetching daily withdraw stats for {}", uuid) + return withContext(ProxyDispatchers.general) { + webClient.get() + .uri("$baseUrl/user-activity/withdraw/$uuid") + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } + + override suspend fun getDailyDepositLast31Days( + uuid: String): List { + + logger.info("fetching daily deposit stats for {}", uuid) + + return withContext(ProxyDispatchers.general) { + webClient.get() + .uri("$baseUrl/user-activity/deposit/$uuid") + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } + + override suspend fun getDailyTradeLast31Days( + uuid: String): List { + + logger.info("fetching daily trade stats for {}", uuid) + + return withContext(ProxyDispatchers.general) { + webClient.get() + .uri("$baseUrl/user-activity/trade/$uuid") + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } } \ No newline at end of file diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt index 65740dbb7..c58755aa5 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt @@ -1,6 +1,7 @@ package co.nilin.opex.api.ports.proxy.impl import co.nilin.opex.api.core.inout.* +import co.nilin.opex.api.core.inout.analytics.DailyAmount import co.nilin.opex.api.core.spi.WalletProxy import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers import co.nilin.opex.api.ports.proxy.data.TransactionRequest @@ -719,5 +720,24 @@ class WalletProxyImpl(@Qualifier("generalWebClient") private val webClient: WebC .bodyToMono() .awaitFirstOrElse { throw OpexError.BadRequest.exception() } } + + override suspend fun getDailyBalanceLast31Days( + token: String, + uuid: String + ): List { + + logger.info("fetching daily balance stats for {}", uuid) + + return withContext(ProxyDispatchers.wallet) { + webClient.get() + .uri("$baseUrl/stats/balance/$uuid") + .accept(MediaType.APPLICATION_JSON) + .header(HttpHeaders.AUTHORIZATION, "Bearer $token") + .retrieve() + .onStatus({ it.isError }) { it.createException() } + .bodyToMono>() + .awaitSingle() + } + } } diff --git a/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt b/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt index f983badff..5fe5e21b7 100644 --- a/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt +++ b/auth-gateway/auth-gateway-app/src/main/kotlin/co/nilin/opex/auth/proxy/KeycloakProxy.kt @@ -60,7 +60,7 @@ class KeycloakProxy( .bodyValue("client_id=${clientId}&client_secret=${clientSecret}&grant_type=password&username=${users[0].username}&password=${password}") .retrieve() .onStatus({ it == HttpStatus.valueOf(401) }) { - throw OpexError.InvalidUserCredentials.exception() + throw OpexError.UsernameOrPasswordIsIncorrect.exception() } .awaitBody() } diff --git a/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt b/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt index a3c125119..12a44e8b2 100644 --- a/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt +++ b/market/market-app/src/main/kotlin/co/nilin/opex/market/app/controller/MarketStatsController.kt @@ -33,6 +33,4 @@ class MarketStatsController(private val marketQueryHandler: MarketQueryHandler) suspend fun getMostTrades(@RequestParam interval: Interval): TradeVolumeStat? { return marketQueryHandler.mostTrades(interval) } - - } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt index 1e154a587..ffe8962a1 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/PaymentGatewayController.kt @@ -1,35 +1,18 @@ package co.nilin.opex.wallet.app.controller -import co.nilin.opex.common.OpexError import co.nilin.opex.wallet.app.dto.PaymentDepositRequest import co.nilin.opex.wallet.app.dto.PaymentDepositResponse -import co.nilin.opex.wallet.app.service.TraceDepositService -import co.nilin.opex.wallet.core.inout.Deposit -import co.nilin.opex.wallet.core.inout.TransferCommand -import co.nilin.opex.wallet.core.inout.TransferMethod -import co.nilin.opex.wallet.core.model.* -import co.nilin.opex.wallet.core.service.GatewayService -import co.nilin.opex.wallet.core.spi.CurrencyServiceManager -import co.nilin.opex.wallet.core.spi.TransferManager -import co.nilin.opex.wallet.core.spi.WalletManager -import co.nilin.opex.wallet.core.spi.WalletOwnerManager +import co.nilin.opex.wallet.app.service.DepositService import org.springframework.transaction.annotation.Transactional import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RestController -import java.math.BigDecimal -import java.util.* @RestController @RequestMapping("/payment") class PaymentGatewayController( - val transferManager: TransferManager, - val currencyService: CurrencyServiceManager, - val walletManager: WalletManager, - val walletOwnerManager: WalletOwnerManager, - val traceDepositService: TraceDepositService, - val gatewayService: GatewayService + val depositService: DepositService ) { @@ -38,57 +21,7 @@ class PaymentGatewayController( @PostMapping("/internal/deposit") @Transactional suspend fun paymentDeposit(@RequestBody request: PaymentDepositRequest): PaymentDepositResponse { - val receiverWalletType = WalletType.MAIN - - val currency = - currencyService.fetchCurrency(FetchCurrency(symbol = request.currency)) - ?: throw OpexError.CurrencyNotFound.exception() - val sourceOwner = walletOwnerManager.findWalletOwner(walletOwnerManager.systemUuid) - ?: throw OpexError.WalletOwnerNotFound.exception() - val sourceWallet = walletManager.findWalletByOwnerAndCurrencyAndType(sourceOwner, WalletType.MAIN, currency) - ?: walletManager.createWallet(sourceOwner, Amount(currency, BigDecimal.ZERO), currency, WalletType.MAIN) - - val receiverOwner = walletOwnerManager.findWalletOwner(request.userId) - ?: walletOwnerManager.createWalletOwner(request.userId, "not set", "") - - val receiverWallet = walletManager.findWalletByOwnerAndCurrencyAndType( - receiverOwner, - receiverWalletType, - currency - ) ?: walletManager.createWallet( - receiverOwner, - Amount(currency, BigDecimal.ZERO), - currency, - receiverWalletType - ) - transferManager.transfer( - TransferCommand( - sourceWallet, - receiverWallet, - Amount(sourceWallet.currency, request.amount), - request.description, - request.reference, - TransferCategory.DEPOSIT - ) - ) - var depositCommand = Deposit( - receiverOwner.uuid, - UUID.randomUUID().toString(), - currency.symbol, - request.amount, - note = request.description, - transactionRef = request.reference, - status = DepositStatus.DONE, - depositType = DepositType.OFF_CHAIN, - network = null, - attachment = null, - transferMethod = if (request.transferMethod == TransferMethod.REWARD) TransferMethod.REWARD else { - if (request.isIPG == true) TransferMethod.IPG else TransferMethod.MPG - } - ) - traceDepositService.saveDepositInNewTransaction(depositCommand) - - return PaymentDepositResponse(true) + return depositService.commitPaymentDeposit(request) } } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt index d32e0b205..a98e6e8d0 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/TransferController.kt @@ -13,9 +13,7 @@ import java.math.BigDecimal @RestController class TransferController( - private val transferService: TransferService, - private val depositService: DepositService -) { + private val transferService: TransferService) { data class TransferBody( val description: String?, diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt index 648b3a70f..58af21af5 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt @@ -1,5 +1,6 @@ package co.nilin.opex.wallet.app.controller +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.inout.WalletData import co.nilin.opex.wallet.core.inout.WalletDataResponse import co.nilin.opex.wallet.core.inout.WalletTotal @@ -55,4 +56,13 @@ class WalletStatController( ): TotalAssetsSnapshot? { return totalAssetsSnapshotManager.getUserLastSnapshot(uuid) } + + @GetMapping("/balance/{userId}") + suspend fun getDailyBalanceLast31Days( + @PathVariable userId: String + ): List { + return walletDataManager.getLastDaysBalance( + userId = userId + ) + } } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt index 7ccf88410..79d6bebb2 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/DepositService.kt @@ -3,14 +3,14 @@ package co.nilin.opex.wallet.app.service import co.nilin.opex.common.OpexError import co.nilin.opex.utility.error.data.OpexException import co.nilin.opex.wallet.app.dto.ManualTransferRequest +import co.nilin.opex.wallet.app.dto.PaymentDepositRequest +import co.nilin.opex.wallet.app.dto.PaymentDepositResponse import co.nilin.opex.wallet.core.inout.* -import co.nilin.opex.wallet.core.model.DepositStatus +import co.nilin.opex.wallet.core.model.* import co.nilin.opex.wallet.core.model.DepositType -import co.nilin.opex.wallet.core.model.TransferCategory -import co.nilin.opex.wallet.core.model.WalletType -import co.nilin.opex.wallet.core.spi.DepositPersister -import co.nilin.opex.wallet.core.spi.WalletOwnerManager +import co.nilin.opex.wallet.core.spi.* import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.math.BigDecimal @@ -20,15 +20,49 @@ import java.util.* @Service class DepositService( private val walletOwnerManager: WalletOwnerManager, - private val currencyService: CurrencyServiceV2, + private val walletManager: WalletManager, private val depositPersister: DepositPersister, private val transferService: TransferService, private val traceDepositService: TraceDepositService, private val currencyServiceV2: CurrencyServiceV2, + private val depositEventSubmitter: DepositEventSubmitter, + private val transferManager: TransferManager, + private val currencyService: CurrencyServiceManager, + @Value("\${app.deposit.snapshot.enabled:true}") + private val depositSnapshotEnabled: Boolean +) { - ) { private val logger = LoggerFactory.getLogger(DepositService::class.java) + // ------------------------------------------------------------------------- + // Helpers (NO LOGIC CHANGE) + // ------------------------------------------------------------------------- + + private suspend fun getOrCreateWalletOwner(ownerUuid: String): WalletOwner { + return walletOwnerManager.findWalletOwner(ownerUuid) + ?: walletOwnerManager.createWalletOwner(ownerUuid, "not set", "") + } + + private suspend fun getOrCreateMainWallet( + owner: WalletOwner, + currency: CurrencyCommand + ): Wallet { + return walletManager.findWalletByOwnerAndCurrencyAndType( + owner, + WalletType.MAIN, + currency + ) ?: walletManager.createWallet( + owner, + Amount(currency, BigDecimal.ZERO), + currency, + WalletType.MAIN + ) + } + + // ------------------------------------------------------------------------- + // Manual Deposit + // ------------------------------------------------------------------------- + @Transactional suspend fun depositManually( symbol: String, @@ -37,41 +71,46 @@ class DepositService( amount: BigDecimal, request: ManualTransferRequest, ): TransferResult? { - logger.info("deposit manually: $senderUuid to $receiverUuid on $symbol at ${LocalDateTime.now()}") - val gateway = currencyServiceV2.fetchCurrencyGateway(request.gatewayUuid, symbol) + logger.info( + "deposit manually: $senderUuid to $receiverUuid on $symbol at ${LocalDateTime.now()}" + ) + + val gateway = currencyServiceV2 + .fetchCurrencyGateway(request.gatewayUuid, symbol) ?: throw OpexError.GatewayNotFount.exception() if (gateway !is OffChainGatewayCommand || gateway.transferMethod != TransferMethod.MANUALLY) { throw OpexError.GatewayNotFount.exception() } - walletOwnerManager.findWalletOwner(senderUuid)?.let { it.level } + walletOwnerManager.findWalletOwner(senderUuid) + ?.level ?: throw OpexException(OpexError.WalletOwnerNotFound) - walletOwnerManager.findWalletOwner(receiverUuid)?.let { it.level } - ?: walletOwnerManager.createWalletOwner( - receiverUuid, - "not set", - "1" - ).level + walletOwnerManager.findWalletOwner(receiverUuid) + ?.level + ?: walletOwnerManager.createWalletOwner(receiverUuid, "not set", "1").level return deposit( - symbol, - receiverUuid, - WalletType.MAIN, - senderUuid, - amount, - request.description, - request.ref, - null, - request.attachment, - DepositType.OFF_CHAIN, - request.gatewayUuid, - TransferMethod.MANUALLY + symbol = symbol, + receiverUuid = receiverUuid, + receiverWalletType = WalletType.MAIN, + senderUuid = senderUuid, + amount = amount, + description = request.description, + transferRef = request.ref, + chain = null, + attachment = request.attachment, + depositType = DepositType.OFF_CHAIN, + gatewayUuid = request.gatewayUuid, + transferMethod = TransferMethod.MANUALLY ) } + // ------------------------------------------------------------------------- + // Core Deposit + // ------------------------------------------------------------------------- @Transactional suspend fun deposit( @@ -88,13 +127,16 @@ class DepositService( gatewayUuid: String?, transferMethod: TransferMethod?, ): TransferResult? { - logger.info("A ${depositType.name} deposit tx on $symbol-$chain was received for $receiverUuid .......") - var depositCommand = Deposit( - receiverUuid, - UUID.randomUUID().toString(), - symbol, - amount, + logger.info( + "A ${depositType.name} deposit tx on $symbol-$chain was received for $receiverUuid ......." + ) + + val depositCommand = Deposit( + ownerUuid = receiverUuid, + depositUuid = UUID.randomUUID().toString(), + currency = symbol, + amount = amount, note = description, transactionRef = transferRef, status = DepositStatus.DONE, @@ -104,100 +146,128 @@ class DepositService( transferMethod = transferMethod ) - val gatewayData = fetchDepositData(gatewayUuid, symbol, depositType, depositCommand) + val gatewayData = fetchDepositData( + gatewayUuid = gatewayUuid, + symbol = symbol, + depositType = depositType, + depositCommand = depositCommand + ) - if (depositCommand.depositType != depositType) + if (depositCommand.depositType != depositType) { throw OpexError.GatewayNotFount.exception() + } - val validDeposit = isValidDeposit(depositCommand, gatewayData) - if (!validDeposit) { - logger.info("An invalid deposit command : $symbol-$chain-$receiverUuid-$amount") + val isValid = isValidDeposit(depositCommand, gatewayData) + if (!isValid) { + logger.info( + "An invalid deposit command : $symbol-$chain-$receiverUuid-$amount" + ) depositCommand.status = DepositStatus.INVALID } - //todo add statusReason field to be able to trace invalid deposit's reason - if (validDeposit || depositCommand.depositType == DepositType.ON_CHAIN) + // todo add statusReason field + if (isValid || depositCommand.depositType == DepositType.ON_CHAIN) { traceDepositService.saveDepositInNewTransaction(depositCommand) + } + + if (!isValid && depositCommand.depositType != DepositType.OFF_CHAIN) { + return null + } - if (depositCommand.status == DepositStatus.DONE) { - logger.info("Going to charge wallet on a ${depositType.name} deposit event :$symbol-$chain-$receiverUuid-$amount") - val (actualSenderUuid, actualTransferCategory) = if ( + logger.info( + "Going to charge wallet on a ${depositType.name} deposit event :" + + "$symbol-$chain-$receiverUuid-$amount" + ) + + val (actualSenderUuid, transferCategory) = + if ( senderUuid != null && - depositType == DepositType.OFF_CHAIN && + depositType == co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN && transferMethod == TransferMethod.MANUALLY ) { senderUuid to TransferCategory.DEPOSIT_MANUALLY } else { walletOwnerManager.systemUuid to TransferCategory.DEPOSIT } - return transferService.transfer( - symbol, - WalletType.MAIN, - actualSenderUuid, - receiverWalletType, - receiverUuid, - amount, - description, - transferRef, - actualTransferCategory, - ).transferResult - } else throw OpexError.InvalidDeposit.exception() + val transferResult = transferService.transfer( + symbol = symbol, + senderWalletType = WalletType.MAIN, + senderUuid = actualSenderUuid, + receiverWalletType = receiverWalletType, + receiverUuid = receiverUuid, + amount = amount, + description = description, + transferRef = transferRef, + transferCategory = transferCategory + ).transferResult + + publishDepositEvent(depositCommand) + return transferResult } + // ------------------------------------------------------------------------- + // Validation & Gateway + // ------------------------------------------------------------------------- - fun isValidDeposit(depositCommand: Deposit, gatewayData: GatewayData): Boolean { - return gatewayData.isEnabled && depositCommand.amount >= gatewayData.minimum && depositCommand.amount <= gatewayData.maximum + fun isValidDeposit(deposit: Deposit, gatewayData: GatewayData): Boolean { + return gatewayData.isEnabled && + deposit.amount >= gatewayData.minimum && + deposit.amount <= gatewayData.maximum } suspend fun fetchDepositData( gatewayUuid: String?, symbol: String, - depositType: DepositType, + depositType: co.nilin.opex.wallet.core.model.DepositType, depositCommand: Deposit, ): GatewayData { - gatewayUuid?.let { - currencyService.fetchCurrencyGateway(gatewayUuid, symbol)?.let { - when (it) { - is OnChainGatewayCommand -> { - depositCommand.depositType = DepositType.ON_CHAIN - depositCommand.currency = it.currencySymbol!! - depositCommand.network = it.chain - return GatewayData( - it.isDepositActive ?: true && it.depositAllowed ?: true, - BigDecimal.ZERO, - it.depositMin ?: BigDecimal.ZERO, - it.depositMax - ) - } - - is OffChainGatewayCommand -> { - depositCommand.depositType = DepositType.OFF_CHAIN - depositCommand.currency = it.currencySymbol!! - depositCommand.network = null - depositCommand.transferMethod = it.transferMethod - return GatewayData( - it.isDepositActive ?: true && it.depositAllowed ?: true, - BigDecimal.ZERO, - it.depositMin ?: BigDecimal.ZERO, - it.depositMax - ) - } - - else -> { - throw OpexError.GatewayNotFount.exception() - } - - - } - - - } ?: throw OpexError.GatewayNotFount.exception() - - } ?: return GatewayData(true, BigDecimal.ZERO, BigDecimal.ZERO, null) + if (gatewayUuid == null) { + return GatewayData(true, BigDecimal.ZERO, BigDecimal.ZERO, null) + } + + val gateway = currencyServiceV2 + .fetchCurrencyGateway(gatewayUuid, symbol) + ?: throw OpexError.GatewayNotFount.exception() + + return when (gateway) { + + is OnChainGatewayCommand -> { + depositCommand.depositType = co.nilin.opex.wallet.core.model.DepositType.ON_CHAIN + depositCommand.currency = gateway.currencySymbol!! + depositCommand.network = gateway.chain + + GatewayData( + gateway.isDepositActive ?: true && gateway.depositAllowed ?: true, + BigDecimal.ZERO, + gateway.depositMin ?: BigDecimal.ZERO, + gateway.depositMax + ) + } + + is OffChainGatewayCommand -> { + depositCommand.depositType = co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN + depositCommand.currency = gateway.currencySymbol!! + depositCommand.network = null + depositCommand.transferMethod = gateway.transferMethod + + GatewayData( + gateway.isDepositActive ?: true && gateway.depositAllowed ?: true, + BigDecimal.ZERO, + gateway.depositMin ?: BigDecimal.ZERO, + gateway.depositMax + ) + } + + else -> throw OpexError.GatewayNotFount.exception() + } } + // ------------------------------------------------------------------------- + // Queries + // ------------------------------------------------------------------------- + suspend fun findDepositHistory( uuid: String, symbol: String?, @@ -206,8 +276,9 @@ class DepositService( limit: Int?, size: Int?, ascendingByTime: Boolean?, - ): List { - return depositPersister.findDepositHistory(uuid, symbol, startTime, endTime, limit, size, ascendingByTime) + ): List = + depositPersister + .findDepositHistory(uuid, symbol, startTime, endTime, limit, size, ascendingByTime) .map { DepositResponse( it.depositUuid, @@ -225,17 +296,14 @@ class DepositService( it.transferMethod ) } - } suspend fun getDepositHistoryCount( uuid: String, symbol: String?, startTime: LocalDateTime?, endTime: LocalDateTime?, - ): Long { - return depositPersister.getDepositHistoryCount(uuid, symbol, startTime, endTime) - } - + ): Long = + depositPersister.getDepositHistoryCount(uuid, symbol, startTime, endTime) suspend fun searchDeposit( ownerUuid: String?, @@ -248,9 +316,8 @@ class DepositService( offset: Int?, size: Int?, ascendingByTime: Boolean?, - ): List { - - return depositPersister.findByCriteria( + ): List = + depositPersister.findByCriteria( ownerUuid, symbol, sourceAddress, @@ -262,19 +329,81 @@ class DepositService( size, ascendingByTime ) - } suspend fun getDepositSummary( uuid: String, startTime: LocalDateTime?, endTime: LocalDateTime?, limit: Int?, - ): List { - return depositPersister.getDepositSummary( - uuid, - startTime, - endTime, - limit, + ): List = + depositPersister.getDepositSummary(uuid, startTime, endTime, limit) + + // ------------------------------------------------------------------------- + // Payment Deposit + // ------------------------------------------------------------------------- + + suspend fun commitPaymentDeposit( + request: PaymentDepositRequest + ): PaymentDepositResponse { + + val currency = currencyService + .fetchCurrency(FetchCurrency(symbol = request.currency)) + ?: throw OpexError.CurrencyNotFound.exception() + + val sourceOwner = walletOwnerManager + .findWalletOwner(walletOwnerManager.systemUuid) + ?: throw OpexError.WalletOwnerNotFound.exception() + + val sourceWallet = getOrCreateMainWallet(sourceOwner, currency) + + val receiverOwner = getOrCreateWalletOwner(request.userId) + val receiverWallet = getOrCreateMainWallet(receiverOwner, currency) + + transferManager.transfer( + TransferCommand( + sourceWallet, + receiverWallet, + Amount(currency, request.amount), + request.description, + request.reference, + TransferCategory.DEPOSIT + ) ) + + val deposit = Deposit( + ownerUuid = receiverOwner.uuid, + depositUuid = UUID.randomUUID().toString(), + currency = currency.symbol, + amount = request.amount, + note = request.description, + transactionRef = request.reference, + status = DepositStatus.DONE, + depositType = co.nilin.opex.wallet.core.model.DepositType.OFF_CHAIN, + network = null, + attachment = null, + transferMethod = if (request.transferMethod == TransferMethod.REWARD) TransferMethod.REWARD else { + if (request.isIPG == true) TransferMethod.IPG else TransferMethod.MPG + } + ) + + traceDepositService.saveDepositInNewTransaction(deposit) + publishDepositEvent(deposit) + + return PaymentDepositResponse(true) + } + + // ------------------------------------------------------------------------- + // Events + // ------------------------------------------------------------------------- + + suspend fun publishDepositEvent(deposit: Deposit) { + if (depositSnapshotEnabled) { + depositEventSubmitter.send( + deposit.ownerUuid, + deposit.transactionRef, + deposit.currency, + deposit.amount + ) + } } -} \ No newline at end of file +} diff --git a/wallet/wallet-app/src/main/resources/application-otc.yml b/wallet/wallet-app/src/main/resources/application-otc.yml index 47217af27..bf489d739 100644 --- a/wallet/wallet-app/src/main/resources/application-otc.yml +++ b/wallet/wallet-app/src/main/resources/application-otc.yml @@ -110,10 +110,12 @@ app: enabled: ${WITHDRAW_LIMIT_ENABLED:false} otp-required-count: ${WITHDRAW_OTP_REQUIRED_COUNT:0} bank-account-validation: ${WITHDRAW_BANK_ACCOUNT_VALIDATION:false} + deposit: + snapshot: + enabled: ${DEPOSIT_SNAPSHOT_ENABLED:false} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} - logging: level: root: INFO diff --git a/wallet/wallet-app/src/main/resources/application.yml b/wallet/wallet-app/src/main/resources/application.yml index d993574e8..4f12fa225 100644 --- a/wallet/wallet-app/src/main/resources/application.yml +++ b/wallet/wallet-app/src/main/resources/application.yml @@ -154,6 +154,9 @@ app: enabled: ${WITHDRAW_LIMIT_ENABLED:false} otp-required-count: ${WITHDRAW_OTP_REQUIRED_COUNT:0} bank-account-validation: ${WITHDRAW_BANK_ACCOUNT_VALIDATION:false} + deposit: + snapshot: + enabled: ${DEPOSIT_SNAPSHOT_ENABLED:true} custom-message: enabled: ${CUSTOM_MESSAGE_ENABLED:false} base-url: ${CUSTOM_MESSAGE_URL} diff --git a/wallet/wallet-app/src/test/resources/application.yml b/wallet/wallet-app/src/test/resources/application.yml index 9efe60395..48f868180 100644 --- a/wallet/wallet-app/src/test/resources/application.yml +++ b/wallet/wallet-app/src/test/resources/application.yml @@ -72,6 +72,9 @@ app: enabled: false otp-required-count: 0 bank-account-validation : false + deposit: + snapshot: + enabled: ${DEPOSIT_SNAPSHOT_ENABLED:false} testcontainers: db: username: ${dbusername:opex} diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt new file mode 100644 index 000000000..8bef67de8 --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/inout/DailyAmount.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.wallet.core.inout + +import java.math.BigDecimal +import java.time.LocalDate + +data class DailyAmount( + val date: LocalDate, + val totalAmount: BigDecimal +) diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt new file mode 100644 index 000000000..3109003ec --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/DepositEventSubmitter.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.wallet.core.spi +import java.math.BigDecimal +import java.time.LocalDateTime + +interface DepositEventSubmitter { + suspend fun send( + uuid: String, + depositRef: String?, + currency: String, + amount: BigDecimal, + createDate: LocalDateTime?= LocalDateTime.now() + ) +} \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt index 333e2c95c..afcc58233 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt @@ -1,10 +1,20 @@ package co.nilin.opex.wallet.core.spi +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot +import java.time.LocalDate interface TotalAssetsSnapshotManager { suspend fun createSnapshot() suspend fun getUserLastSnapshot( uuid: String ): TotalAssetsSnapshot? + suspend fun getLastDaysBalance( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long = 31 + ): List + + } \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt index a957e072b..c5901b03d 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletDataManager.kt @@ -1,5 +1,6 @@ package co.nilin.opex.wallet.core.spi +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.inout.WalletData import co.nilin.opex.wallet.core.inout.WalletDataResponse import co.nilin.opex.wallet.core.inout.WalletTotal @@ -27,4 +28,10 @@ interface WalletDataManager { suspend fun findSystemWalletsTotal(): List suspend fun findUserWalletsTotal(): List? + + suspend fun getLastDaysBalance( + userId: String, + quoteCurrency: String? = null, + n: Int = 31 + ): List } \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt index 7473644fe..fe2f24c8f 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/WalletManager.kt @@ -1,8 +1,10 @@ package co.nilin.opex.wallet.core.spi import co.nilin.opex.wallet.core.inout.CurrencyCommand +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.* import java.math.BigDecimal +import java.time.LocalDate interface WalletManager { @@ -40,4 +42,6 @@ interface WalletManager { suspend fun findWalletById(walletId: Long): Wallet? suspend fun findAllWalletsBriefNotZero(ownerId: Long): List + + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt index 2acd93193..b4db5991a 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -29,7 +29,7 @@ class KafkaProducerConfig { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", - JsonSerializer.TYPE_MAPPINGS to "fiAction_response_event:co.nilin.opex.wallet.core.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.wallet.ports.kafka.listener.model.WithdrawRequestEvent" + JsonSerializer.TYPE_MAPPINGS to "fiAction_response_event:co.nilin.opex.wallet.core.inout.FinancialActionResponseEvent,withdrawRequestEvent:co.nilin.opex.wallet.ports.kafka.listener.model.WithdrawRequestEvent,depositEvent:co.nilin.opex.wallet.ports.kafka.listener.model.DepositEvent" ) } @@ -68,4 +68,14 @@ class KafkaProducerConfig { return DefaultKafkaProducerFactory(producerConfigs) } + @Bean + fun depositKafkaTemplate(@Qualifier("depositProducerFactory") factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + + @Bean + fun depositProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt index 3f7a237de..aafc3193b 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt @@ -30,6 +30,17 @@ class KafkaTopicConfig { .replicas(1) .build() }) + + } + + @Autowired + fun depositTopics(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_deposit", NewTopic::class.java, Supplier { + TopicBuilder.name("deposit") + .partitions(1) + .replicas(1) + .build() + }) } } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt new file mode 100644 index 000000000..29f49e745 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DepositEvent.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +import co.nilin.opex.wallet.core.model.WithdrawStatus +import java.math.BigDecimal +import java.time.LocalDateTime + +data class DepositEvent( + val uuid: String, + val depositRef: String? = null, + val currency: String, + val amount: BigDecimal, + val createDate: LocalDateTime?= LocalDateTime.now(), +) \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt new file mode 100644 index 000000000..4a5f2d293 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DepositSubmitter.kt @@ -0,0 +1,48 @@ +package co.nilin.opex.wallet.ports.kafka.listener.submitter + +import co.nilin.opex.common.utils.LoggerDelegate +import co.nilin.opex.wallet.core.spi.DepositEventSubmitter +import co.nilin.opex.wallet.ports.kafka.listener.model.DepositEvent +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.retry.support.RetryTemplate +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDateTime + +@Component +@ConditionalOnProperty(name = ["app.deposit.snapshot.enabled"], havingValue = "true") +class DepositSubmitter( + @Qualifier("depositKafkaTemplate") private val template: KafkaTemplate +) : DepositEventSubmitter { + + private val logger by LoggerDelegate() + + private val retryTemplate = RetryTemplate.builder() + .maxAttempts(10) + .exponentialBackoff(1000, 1.8, 5 * 60 * 1000) + .retryOn(Exception::class.java) + .build() + + override suspend fun send( + uuid: String, + depositRef: String?, + currency: String, + amount: BigDecimal, + createDate: LocalDateTime? + ) { + retryTemplate.execute { + try { + template.send( + "deposit", + DepositEvent(uuid, depositRef, currency, amount, createDate) + ).get() + logger.info("Deposit event sent") + } catch (ex: Exception) { + logger.error("Error sending deposit event", ex) + throw ex + } + } + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt new file mode 100644 index 000000000..c723e884a --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/submitter/DummyDepositSubmitter.kt @@ -0,0 +1,22 @@ +package co.nilin.opex.wallet.ports.kafka.listener.submitter + +import co.nilin.opex.wallet.core.spi.DepositEventSubmitter +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDateTime + +@Component +@ConditionalOnProperty(name = ["app.deposit.snapshot.enabled"], havingValue = "false") +class DummyDepositSubmitter( +) : DepositEventSubmitter { + override suspend fun send( + uuid: String, + depositRef: String?, + currency: String, + amount: BigDecimal, + createDate: LocalDateTime? + ) { + + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt index 67c7edc02..1b7259ff3 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt @@ -1,12 +1,15 @@ package co.nilin.opex.wallet.ports.postgres.dao +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.ports.postgres.model.TotalAssetsSnapshotModel import org.springframework.data.r2dbc.repository.Modifying import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.math.BigDecimal +import java.time.LocalDate @Repository interface TotalAssetsSnapshotRepository : ReactiveCrudRepository { @@ -46,4 +49,21 @@ interface TotalAssetsSnapshotRepository : ReactiveCrudRepository + + @Query( + """ + select snapshot_date as date, total_amount + from total_assets_snapshot + where uuid = :userId + and snapshot_date >= :startDate + and quote_currency = :quoteCurrency + order by snapshot_date desc + """ + ) + fun findDailyBalance( + userId: String, + startDate: LocalDate, + quoteCurrency: String + ): Flux + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt index 3bb827f64..9358ea5a3 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt @@ -1,22 +1,27 @@ package co.nilin.opex.wallet.ports.postgres.impl import co.nilin.opex.common.OpexError +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepositoryV2 import co.nilin.opex.wallet.ports.postgres.dao.TotalAssetsSnapshotRepository import co.nilin.opex.wallet.ports.postgres.util.toTotalAssetsSnapshot import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service +import java.time.LocalDate +import java.time.ZoneOffset @Service class TotalAssetsSnapshotImpl( private val currencyRepository: CurrencyRepositoryV2, private val totalAssetsSnapshotRepository: TotalAssetsSnapshotRepository, @Value("\${app.snapshot-currency}") - private val snapshotCurrency: String + private val snapshotCurrency: String, + @Value("\${app.zone-offset}") private val zoneOffsetString: String ) : TotalAssetsSnapshotManager { private val logger = LoggerFactory.getLogger(TotalAssetsSnapshotImpl::class.java) @@ -38,4 +43,27 @@ class TotalAssetsSnapshotImpl( ): TotalAssetsSnapshot? { return totalAssetsSnapshotRepository.findLastSnapshotByUuid(uuid).awaitFirstOrNull()?.toTotalAssetsSnapshot() } + + override suspend fun getLastDaysBalance( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return totalAssetsSnapshotRepository.findDailyBalance(userId, startDate, quatCurrency ?: snapshotCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } + } diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt index 0d1483a43..96fc114ce 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt @@ -1,9 +1,7 @@ package co.nilin.opex.wallet.ports.postgres.impl -import co.nilin.opex.wallet.core.inout.WalletCurrencyData -import co.nilin.opex.wallet.core.inout.WalletData -import co.nilin.opex.wallet.core.inout.WalletDataResponse -import co.nilin.opex.wallet.core.inout.WalletTotal +import co.nilin.opex.common.utils.CacheManager +import co.nilin.opex.wallet.core.inout.* import co.nilin.opex.wallet.core.model.WalletType import co.nilin.opex.wallet.core.spi.WalletDataManager import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepositoryV2 @@ -13,14 +11,23 @@ import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import kotlinx.coroutines.reactive.awaitFirstOrElse import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.LocalDate +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors @Component class WalletDataManagerImpl( private val walletRepository: WalletRepository, + private val totalAssetsSnapshotImpl: TotalAssetsSnapshotImpl, private val currencyRepositoryV2: CurrencyRepositoryV2, - private val objectMapper: ObjectMapper -) : WalletDataManager { + private val objectMapper: ObjectMapper, + private val cacheManager: CacheManager, + @Value("\${app.zone-offset}") private val zoneOffsetString: String, + ) : WalletDataManager { override suspend fun findWalletDataByCriteria( uuid: String?, @@ -89,4 +96,56 @@ class WalletDataManagerImpl( WalletTotal(c, (allDepositedCurrency.filter { it.currency == c }?.firstOrNull()?.balance) ?: 0.0) }?.collectList()?.awaitFirstOrNull() } + + override suspend fun getLastDaysBalance( + userId: String, + quoteCurrency: String?, + n: Int + ): List { + val today = LocalDate.now(ZoneOffset.of(zoneOffsetString)) + val dates = (0..n - 1).map { today.minusDays(it.toLong()) } + + val result = mutableMapOf() + val missingDates = mutableListOf() + + for (date in dates) { + val cacheKey = "trade:daily:$userId:$date" + val cached = cacheManager.get(cacheKey) + + if (cached != null) { + result[date] = cached + } else { + missingDates.add(date) + } + } + + if (missingDates.isNotEmpty()) { + val startDate = missingDates.minOrNull()!! + + val dbData = totalAssetsSnapshotImpl.getLastDaysBalance(userId, startDate, quoteCurrency) + .stream().collect(Collectors.toMap(DailyAmount::date, DailyAmount::totalAmount)); + + for (date in missingDates) { + val value = dbData[date] ?: BigDecimal.ZERO + val (ttl, unit) = ttlFor(date, today) + val cacheKey = "trade:daily:$userId:$date" + + cacheManager.put(cacheKey, value, ttl, unit) + result[date] = value + } + } + + return result + .map { DailyAmount(it.key, it.value) } + .sortedBy { it.date } + + } + + + private fun ttlFor(date: LocalDate, today: LocalDate): Pair = + if (date == today) { + 15L to TimeUnit.MINUTES + } else { + 100L to TimeUnit.DAYS + } } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt index c50d859b0..90623da02 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletManagerImplV2.kt @@ -3,6 +3,7 @@ package co.nilin.opex.wallet.ports.postgres.impl import co.nilin.opex.common.OpexError import co.nilin.opex.wallet.core.exc.ConcurrentBalanceChangException import co.nilin.opex.wallet.core.inout.CurrencyCommand +import co.nilin.opex.wallet.core.inout.DailyAmount import co.nilin.opex.wallet.core.model.* import co.nilin.opex.wallet.core.spi.WalletManager import co.nilin.opex.wallet.ports.postgres.dao.* @@ -13,12 +14,17 @@ import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitFirstOrElse import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Service import java.math.BigDecimal +import java.time.LocalDate import java.time.LocalDateTime +import java.time.ZoneOffset +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors @Service class WalletManagerImplV2( @@ -294,12 +300,9 @@ class WalletManagerImplV2( ) } } - override suspend fun findWallet(ownerId: Long, currency: String, walletType: WalletType): BriefWallet? { val wallet = walletRepository.findByOwnerAndTypeAndCurrency(ownerId, walletType, currency) .awaitSingleOrNull() ?: return null return BriefWallet(wallet.id, wallet.owner, wallet.balance, wallet.currency, wallet.type) } - - }