diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..94535e0
Binary files /dev/null and b/.DS_Store differ
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..549e00a
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/antifraud-service/Dockerfile b/antifraud-service/Dockerfile
new file mode 100644
index 0000000..5edc16c
--- /dev/null
+++ b/antifraud-service/Dockerfile
@@ -0,0 +1,4 @@
+FROM openjdk:11-jre-slim
+#COPY wait-for-kafka.sh /wait-for-kafka.sh
+COPY target/antifraud-service-0.0.1-SNAPSHOT.jar antifraud-service.jar
+ENTRYPOINT ["java", "-jar", "/antifraud-service.jar"]
\ No newline at end of file
diff --git a/antifraud-service/pom.xml b/antifraud-service/pom.xml
new file mode 100644
index 0000000..9c5e342
--- /dev/null
+++ b/antifraud-service/pom.xml
@@ -0,0 +1,129 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+ com.yape.antifraud
+ antifraud-service
+ 0.0.1-SNAPSHOT
+ antifraud-service
+ antifraud
+
+ 11
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb-reactive
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.projectlombok
+ lombok
+ 1.18.28
+
+
+
+ io.springfox
+ springfox-swagger2
+ 2.9.2
+
+
+ io.springfox
+ springfox-swagger-ui
+ 2.9.2
+
+
+ org.springframework.plugin
+ spring-plugin-core
+ 1.2.0.RELEASE
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+ javax.validation
+ validation-api
+ 2.0.1.Final
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+ io.projectreactor.kafka
+ reactor-kafka
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.7.3
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.5
+
+
+
+ prepare-agent
+
+
+
+ report
+ test
+
+ report
+
+
+
+
+
+
+
+
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/AntifraudServiceApplication.java b/antifraud-service/src/main/java/com/yape/antifraud/AntifraudServiceApplication.java
new file mode 100644
index 0000000..840f929
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/AntifraudServiceApplication.java
@@ -0,0 +1,38 @@
+package com.yape.antifraud;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.core.env.Environment;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+@Slf4j
+@SpringBootApplication
+public class AntifraudServiceApplication {
+ static { System.setProperty("os.arch", "i686_64"); }
+ public static void main(String[] args) throws UnknownHostException {
+
+ Environment env = SpringApplication.run(AntifraudServiceApplication.class, args).getEnvironment();
+ log.info("\n----------------------------------------------------------\n\t"
+ .concat("Application '{}' is running! Access URLs:\n\t")
+ .concat("Local: \t\thttp://localhost:{}\n\t")
+ .concat("External: \thttp://{}:{}\n\t")
+ .concat("DB: \t{}\n\t")
+ .concat("Profile(s): \t{}\n----------------------------------------------------------"),
+ env.getProperty("spring.application.name"),
+ env.getProperty("server.port"),
+ InetAddress.getLocalHost().getHostAddress(),
+ env.getProperty("server.port"),
+ env.getProperty("spring.data.mongodb.database"),
+ env.getActiveProfiles());
+
+ String configServerStatus = env.getProperty("configserver.status");
+ log.info("\n----------------------------------------------------------\n\t"
+ .concat("Config Server: \t{}\n----------------------------------------------------------"),
+ configServerStatus == null ? "Not found or not setup for this application" : configServerStatus);
+ }
+
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/config/JsonMessageConverterConfig.java b/antifraud-service/src/main/java/com/yape/antifraud/config/JsonMessageConverterConfig.java
new file mode 100644
index 0000000..9a64c1c
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/config/JsonMessageConverterConfig.java
@@ -0,0 +1,14 @@
+package com.yape.antifraud.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
+import org.springframework.kafka.support.converter.JsonMessageConverter;
+
+@Configuration
+public class JsonMessageConverterConfig {
+ @Bean
+ public JsonMessageConverter jsonMessageConverter() {
+ return new ByteArrayJsonMessageConverter();
+ }
+}
\ No newline at end of file
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/config/KafkaConfig.java b/antifraud-service/src/main/java/com/yape/antifraud/config/KafkaConfig.java
new file mode 100644
index 0000000..829555f
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/config/KafkaConfig.java
@@ -0,0 +1,98 @@
+package com.yape.antifraud.config;
+
+import com.yape.antifraud.model.entity.Transaction;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.KafkaAdmin;
+import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
+import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.receiver.ReceiverOptions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaConfig {
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ configProps.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
+ configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // Disable type info headers
+
+ SenderOptions senderOptions = SenderOptions.create(configProps);
+ return new ReactiveKafkaProducerTemplate<>(senderOptions);
+ }
+
+ @Bean
+ public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
+ props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Transaction.class.getName());
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.yape.transaction.model.entity,com.yape.antifraud.model.entity");
+
+ ReceiverOptions receiverOptions = ReceiverOptions.create(props);
+ return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
+ }
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
+ props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.yape.antifraud.model.entity.Transaction");
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.yape.antifraud.model.entity");
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ return factory;
+ }
+
+ @Bean
+ public KafkaAdmin kafkaAdmin() {
+ KafkaAdmin kafkaAdmin = new KafkaAdmin(Collections.singletonMap("bootstrap.servers", bootstrapServers));
+ kafkaAdmin.setAutoCreate(true);
+ return kafkaAdmin;
+ }
+
+ @Bean
+ public AdminClient adminClient() {
+ return AdminClient.create(kafkaAdmin().getConfig());
+ }
+
+ @Bean
+ public NewTopic topic() {
+ return new NewTopic("transactionCreated", 1, (short) 1);
+ }
+
+}
+
+
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/model/entity/Transaction.java b/antifraud-service/src/main/java/com/yape/antifraud/model/entity/Transaction.java
new file mode 100644
index 0000000..67791c4
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/model/entity/Transaction.java
@@ -0,0 +1,23 @@
+package com.yape.antifraud.model.entity;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.time.LocalDateTime;
+
+@Document(collection = "transactions")
+@Getter
+@Setter
+public class Transaction {
+ @Id
+ private String transactionExternalId;
+ private String accountExternalIdDebit;
+ private String accountExternalIdCredit;
+ private String transferTypeId;
+ private Double value;
+ private String status;
+ private LocalDateTime createdAt;
+
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/model/request/TransactionRequest.java b/antifraud-service/src/main/java/com/yape/antifraud/model/request/TransactionRequest.java
new file mode 100644
index 0000000..896e2fe
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/model/request/TransactionRequest.java
@@ -0,0 +1,18 @@
+package com.yape.antifraud.model.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@Builder
+public class TransactionRequest {
+
+ private String accountExternalIdDebit;
+ private String accountExternalIdCredit;
+ private Integer transferTypeId;
+ private Double value;
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/repository/TransactionRepository.java b/antifraud-service/src/main/java/com/yape/antifraud/repository/TransactionRepository.java
new file mode 100644
index 0000000..0f07094
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/repository/TransactionRepository.java
@@ -0,0 +1,12 @@
+package com.yape.antifraud.repository;
+
+import com.yape.antifraud.model.entity.Transaction;
+import org.apache.kafka.common.protocol.types.Field;
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.UUID;
+
+@Repository
+public interface TransactionRepository extends ReactiveMongoRepository {
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/service/AntiFraudService.java b/antifraud-service/src/main/java/com/yape/antifraud/service/AntiFraudService.java
new file mode 100644
index 0000000..82b05c1
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/service/AntiFraudService.java
@@ -0,0 +1,8 @@
+package com.yape.antifraud.service;
+
+import com.yape.antifraud.model.entity.Transaction;
+import reactor.core.publisher.Mono;
+
+public interface AntiFraudService {
+ Mono validateTransaction(Transaction transaction);
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/service/KafkaConsumerService.java b/antifraud-service/src/main/java/com/yape/antifraud/service/KafkaConsumerService.java
new file mode 100644
index 0000000..3339ebd
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/service/KafkaConsumerService.java
@@ -0,0 +1,7 @@
+package com.yape.antifraud.service;
+
+import com.yape.antifraud.model.entity.Transaction;
+
+public interface KafkaConsumerService {
+ void consume(Transaction transaction);
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/service/KafkaProducerService.java b/antifraud-service/src/main/java/com/yape/antifraud/service/KafkaProducerService.java
new file mode 100644
index 0000000..71f985a
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/service/KafkaProducerService.java
@@ -0,0 +1,7 @@
+package com.yape.antifraud.service;
+
+import com.yape.antifraud.model.entity.Transaction;
+
+public interface KafkaProducerService {
+ void sendTransactionStatusEvent(Transaction transaction);
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/service/impl/AntifraudServiceImpl.java b/antifraud-service/src/main/java/com/yape/antifraud/service/impl/AntifraudServiceImpl.java
new file mode 100644
index 0000000..a461c9d
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/service/impl/AntifraudServiceImpl.java
@@ -0,0 +1,27 @@
+package com.yape.antifraud.service.impl;
+
+import com.yape.antifraud.model.entity.Transaction;
+import com.yape.antifraud.service.AntiFraudService;
+import com.yape.antifraud.service.KafkaProducerService;
+import lombok.AllArgsConstructor;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+@Service
+@AllArgsConstructor
+public class AntifraudServiceImpl implements AntiFraudService {
+
+ private final KafkaProducerService kafkaProducerService;
+
+ @Override
+ public Mono validateTransaction(Transaction transaction) {
+ if (transaction.getValue() > 1000) {
+ transaction.setStatus("REJECTED");
+ } else {
+ transaction.setStatus("APPROVED");
+ }
+ return Mono.just(transaction)
+ .doOnSuccess(kafkaProducerService::sendTransactionStatusEvent);
+ }
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/service/impl/KafkaConsumerServiceImpl.java b/antifraud-service/src/main/java/com/yape/antifraud/service/impl/KafkaConsumerServiceImpl.java
new file mode 100644
index 0000000..cf8c1f4
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/service/impl/KafkaConsumerServiceImpl.java
@@ -0,0 +1,24 @@
+package com.yape.antifraud.service.impl;
+
+import com.yape.antifraud.model.entity.Transaction;
+import com.yape.antifraud.service.AntiFraudService;
+import com.yape.antifraud.service.KafkaConsumerService;
+import lombok.AllArgsConstructor;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+@Service
+@AllArgsConstructor
+public class KafkaConsumerServiceImpl implements KafkaConsumerService {
+
+ private final AntiFraudService antiFraudService;
+
+ @Override
+ @KafkaListener(topics = "transactionCreated", groupId = "group_id")
+ public void consume(@Payload Transaction transaction) {
+ System.out.println("inicio de consumo");
+ antiFraudService.validateTransaction(transaction).subscribe();
+ System.out.println("consumio validacion");
+ }
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/service/impl/KafkaProducerServiceImpl.java b/antifraud-service/src/main/java/com/yape/antifraud/service/impl/KafkaProducerServiceImpl.java
new file mode 100644
index 0000000..5a7601e
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/service/impl/KafkaProducerServiceImpl.java
@@ -0,0 +1,19 @@
+package com.yape.antifraud.service.impl;
+
+import com.yape.antifraud.model.entity.Transaction;
+import com.yape.antifraud.service.KafkaProducerService;
+import lombok.AllArgsConstructor;
+import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@AllArgsConstructor
+public class KafkaProducerServiceImpl implements KafkaProducerService {
+
+ private ReactiveKafkaProducerTemplate kafkaTemplate;
+
+ @Override
+ public void sendTransactionStatusEvent(Transaction transaction) {
+ kafkaTemplate.send("transactionStatusUpdated", transaction).subscribe();
+ }
+}
diff --git a/antifraud-service/src/main/java/com/yape/antifraud/util/DateUtil.java b/antifraud-service/src/main/java/com/yape/antifraud/util/DateUtil.java
new file mode 100644
index 0000000..2e7ef58
--- /dev/null
+++ b/antifraud-service/src/main/java/com/yape/antifraud/util/DateUtil.java
@@ -0,0 +1,16 @@
+package com.yape.antifraud.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DateUtil {
+ public static final String DATE_TIME_AMPM_FORMAT = "dd/MM/yyyy hh:mm:ss";
+
+ public static String parseDateToString(LocalDateTime date) {
+ return date != null ? date.format(DateTimeFormatter.ofPattern(DATE_TIME_AMPM_FORMAT)) : null;
+ }
+}
diff --git a/antifraud-service/src/main/resources/application.yml b/antifraud-service/src/main/resources/application.yml
new file mode 100644
index 0000000..d7dc178
--- /dev/null
+++ b/antifraud-service/src/main/resources/application.yml
@@ -0,0 +1,48 @@
+spring:
+ application:
+ name: antifraud-service
+ data:
+ mongodb:
+ database: embeded_db
+ port: 27017
+ host: localhost
+ kafka:
+ bootstrap-servers: localhost9092
+ consumer:
+ group-id: group_id
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring:
+ deserializer:
+ value:
+ delegate:
+ class: org.springframework.kafka.support.serializer.JsonDeserializer
+ json:
+ trusted:
+ packages: "com.yape.transaction.model.entity,com.yape.antifraud.model.entity"
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ admin:
+ auto-create-topics: true
+ topic:
+ transactionCreated:
+ partitions: 1
+ replicas: 1
+ listener:
+ missing-topics-fatal: false
+ template:
+ default-topic: transactionCreated
+ profiles:
+ active: local
+
+server:
+ port: 8080
+
+logging:
+ level:
+ org.springframework.data.mongodb.core.ReactiveMongoTemplate: DEBUG
+ web: TRACE
+ org.springframework.web: TRACE
diff --git a/antifraud-service/wait-for-kafka.sh b/antifraud-service/wait-for-kafka.sh
new file mode 100644
index 0000000..5a7c1f1
--- /dev/null
+++ b/antifraud-service/wait-for-kafka.sh
@@ -0,0 +1,17 @@
+#!/bin/sh
+
+set -e
+
+host="$1"
+shift
+cmd="$@"
+
+until nc -z "$host" 9092; do
+ >&2 echo "Kafka is unavailable - sleeping"
+ sleep 5
+done
+# Crear el tema transactionCreated
+kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic transactionCreated
+
+>&2 echo "Kafka is up - executing command"
+exec $cmd
diff --git a/docker-compose.yml b/docker-compose.yml
index 6e9a9c5..5d7ca10 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,25 +1,76 @@
-version: "3.7"
+version: '3.8'
services:
- postgres:
- image: postgres:14
+ mongodb:
+ image: mongo:4.4
+ container_name: mongo
ports:
- - "5432:5432"
- environment:
- - POSTGRES_USER=postgres
- - POSTGRES_PASSWORD=postgres
+ - "27017:27017"
+ volumes:
+ - mongo-data:/data/db
+ networks:
+ - challenge-network
+
zookeeper:
- image: confluentinc/cp-zookeeper:5.5.3
+ image: bitnami/zookeeper:latest
+ container_name: zookeeper
+ ports:
+ - "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ALLOW_ANONYMOUS_LOGIN: "yes"
+ networks:
+ - challenge-network
+
kafka:
- image: confluentinc/cp-enterprise-kafka:5.5.3
- depends_on: [zookeeper]
+ image: bitnami/kafka:latest
+ container_name: kafka
+ ports:
+ - "9092:9092"
environment:
- KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_JMX_PORT: 9991
+ depends_on:
+ - zookeeper
+ networks:
+ - challenge-network
+
+
+ transaction-service:
+ build:
+ context: ./transaction-service
+ container_name: transaction-service
ports:
- - 9092:9092
\ No newline at end of file
+ - "8081:8081"
+ environment:
+ SPRING_PROFILES_ACTIVE: docker
+ SPRING_DATA_MONGODB_URI: mongodb://mongo:27017/transactions
+ KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+ depends_on:
+ - mongodb
+ - kafka
+ networks:
+ - challenge-network
+
+ antifraud-service:
+ build:
+ context: ./antifraud-service
+ container_name: antifraud-service
+ ports:
+ - "8000:8080"
+ environment:
+ SPRING_PROFILES_ACTIVE: docker
+ KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+
+ depends_on:
+ - mongodb
+ - kafka
+ networks:
+ - challenge-network
+
+volumes:
+ mongo-data:
+networks:
+ challenge-network:
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..affd6f4
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,60 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+ com.yape
+ app-java-codechanllenge
+ 0.0.1-SNAPSHOT
+ app-java-codechanllenge
+ challenge yape
+
+ 11
+
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.7.3
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.5
+
+
+
+ prepare-agent
+
+
+
+ report
+ test
+
+ report
+
+
+
+
+
+
+
+
diff --git a/transaction-service/Dockerfile b/transaction-service/Dockerfile
new file mode 100644
index 0000000..1b7b025
--- /dev/null
+++ b/transaction-service/Dockerfile
@@ -0,0 +1,4 @@
+FROM openjdk:11-jre-slim
+#COPY wait-for-kafka.sh /wait-for-kafka.sh
+COPY target/transaction-service-0.0.1-SNAPSHOT.jar transaction-service.jar
+ENTRYPOINT ["java", "-jar", "/transaction-service.jar"]
diff --git a/transaction-service/pom.xml b/transaction-service/pom.xml
new file mode 100644
index 0000000..b43ae7f
--- /dev/null
+++ b/transaction-service/pom.xml
@@ -0,0 +1,135 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.7.0
+
+
+ com.yape.transaction
+ transaction-service
+ 0.0.1-SNAPSHOT
+ transaction-service
+ transaction
+
+ 11
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb-reactive
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.projectlombok
+ lombok
+ 1.18.28
+
+
+
+ io.springfox
+ springfox-swagger2
+ 2.9.2
+
+
+ io.springfox
+ springfox-swagger-ui
+ 2.9.2
+
+
+ org.springframework.plugin
+ spring-plugin-core
+ 1.2.0.RELEASE
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+ javax.validation
+ validation-api
+ 2.0.1.Final
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+ io.projectreactor.kafka
+ reactor-kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-graphql
+
+
+ org.springframework.graphql
+ spring-graphql-test
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.7.3
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.5
+
+
+
+ prepare-agent
+
+
+
+ report
+ test
+
+ report
+
+
+
+
+
+
+
+
diff --git a/transaction-service/src/main/java/com/yape/transaction/TransactionServiceApplication.java b/transaction-service/src/main/java/com/yape/transaction/TransactionServiceApplication.java
new file mode 100644
index 0000000..1186f15
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/TransactionServiceApplication.java
@@ -0,0 +1,38 @@
+package com.yape.transaction;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.core.env.Environment;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+@Slf4j
+@SpringBootApplication
+public class TransactionServiceApplication {
+ static { System.setProperty("os.arch", "i686_64"); }
+ public static void main(String[] args) throws UnknownHostException {
+
+ Environment env = SpringApplication.run(TransactionServiceApplication.class, args).getEnvironment();
+ log.info("\n----------------------------------------------------------\n\t"
+ .concat("Application '{}' is running! Access URLs:\n\t")
+ .concat("Local: \t\thttp://localhost:{}\n\t")
+ .concat("External: \thttp://{}:{}\n\t")
+ .concat("DB: \t{}\n\t")
+ .concat("Profile(s): \t{}\n----------------------------------------------------------"),
+ env.getProperty("spring.application.name"),
+ env.getProperty("server.port"),
+ InetAddress.getLocalHost().getHostAddress(),
+ env.getProperty("server.port"),
+ env.getProperty("spring.data.mongodb.database"),
+ env.getActiveProfiles());
+
+ String configServerStatus = env.getProperty("configserver.status");
+ log.info("\n----------------------------------------------------------\n\t"
+ .concat("Config Server: \t{}\n----------------------------------------------------------"),
+ configServerStatus == null ? "Not found or not setup for this application" : configServerStatus);
+ }
+
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/builder/TransactionBuilder.java b/transaction-service/src/main/java/com/yape/transaction/builder/TransactionBuilder.java
new file mode 100644
index 0000000..39c5a59
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/builder/TransactionBuilder.java
@@ -0,0 +1,36 @@
+package com.yape.transaction.builder;
+
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.model.request.TransactionRequest;
+import com.yape.transaction.model.response.TransactionResponse;
+import com.yape.transaction.model.response.ValueName;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+import static com.yape.transaction.util.DateUtil.parseDateToString;
+
+public class TransactionBuilder {
+
+ public static TransactionResponse buildTransactionToResponse(Transaction transaction) {
+ return TransactionResponse.builder()
+ .transactionExternalId(transaction.getTransactionExternalId())
+ .transactionStatus(ValueName.builder().name(transaction.getStatus()).build())
+ .transactionType(ValueName.builder().name(transaction.getTransferTypeId()).build())
+ .value(transaction.getValue())
+ .createdAt(parseDateToString(transaction.getCreatedAt()))
+ .build();
+ }
+
+ public static Transaction buildTransactionFromRequest(TransactionRequest transactionRequest){
+ return Transaction.builder()
+ .transactionExternalId(UUID.randomUUID().toString())
+ .transferTypeId(transactionRequest.getTransferTypeId())
+ .accountExternalIdCredit(transactionRequest.getAccountExternalIdCredit())
+ .accountExternalIdDebit(transactionRequest.getAccountExternalIdDebit())
+ .status("PENDING")
+ .value(transactionRequest.getValue())
+ .createdAt(LocalDateTime.now())
+ .build();
+ }
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/config/JsonMessageConverterConfig.java b/transaction-service/src/main/java/com/yape/transaction/config/JsonMessageConverterConfig.java
new file mode 100644
index 0000000..267d621
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/config/JsonMessageConverterConfig.java
@@ -0,0 +1,14 @@
+package com.yape.transaction.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
+import org.springframework.kafka.support.converter.JsonMessageConverter;
+
+@Configuration
+public class JsonMessageConverterConfig {
+ @Bean
+ public JsonMessageConverter jsonMessageConverter() {
+ return new ByteArrayJsonMessageConverter();
+ }
+}
\ No newline at end of file
diff --git a/transaction-service/src/main/java/com/yape/transaction/config/KafkaConfig.java b/transaction-service/src/main/java/com/yape/transaction/config/KafkaConfig.java
new file mode 100644
index 0000000..14a0fd1
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/config/KafkaConfig.java
@@ -0,0 +1,78 @@
+package com.yape.transaction.config;
+
+import com.yape.transaction.model.entity.Transaction;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
+import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import reactor.kafka.receiver.ReceiverOptions;
+import reactor.kafka.sender.SenderOptions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaConfig {
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
+ configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // Disable type info headers
+
+ SenderOptions senderOptions = SenderOptions.create(configProps);
+ return new ReactiveKafkaProducerTemplate<>(senderOptions);
+ }
+
+ @Bean
+ public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
+ props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Transaction.class.getName());
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.yape.antifraud.model.entity,com.yape.transaction.model.entity");
+
+ ReceiverOptions receiverOptions = ReceiverOptions.create(props);
+ return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
+ props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.yape.transaction.model.entity.Transaction");
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.yape.transaction.model.entity");
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ return factory;
+ }
+}
+
+
diff --git a/transaction-service/src/main/java/com/yape/transaction/controller/GraphQLTransactionController.java b/transaction-service/src/main/java/com/yape/transaction/controller/GraphQLTransactionController.java
new file mode 100644
index 0000000..6b4a4a7
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/controller/GraphQLTransactionController.java
@@ -0,0 +1,44 @@
+package com.yape.transaction.controller;
+
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.model.request.TransactionRequest;
+import com.yape.transaction.model.response.TransactionResponse;
+import com.yape.transaction.service.TransactionService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.graphql.data.method.annotation.Argument;
+import org.springframework.graphql.data.method.annotation.MutationMapping;
+import org.springframework.graphql.data.method.annotation.QueryMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+@RestController
+@Slf4j
+@AllArgsConstructor
+public class GraphQLTransactionController {
+
+ private TransactionService transactionService;
+
+ @QueryMapping
+ public Mono getTransaction(@Argument String transactionExternalId) {
+ return transactionService.getTransaction(transactionExternalId);
+ }
+
+
+ @MutationMapping
+ public Mono createTransaction(@Argument String accountExternalIdDebit,
+ @Argument String accountExternalIdCredit,
+ @Argument String transferTypeId,
+ @Argument Double value) {
+
+ TransactionRequest transactionRequest =
+ TransactionRequest.builder()
+ .accountExternalIdCredit(accountExternalIdCredit)
+ .accountExternalIdDebit(accountExternalIdDebit)
+ .transferTypeId(transferTypeId)
+ .value(value)
+ .build();
+
+ return transactionService.createTransaction(transactionRequest);
+ }
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/controller/TransactionController.java b/transaction-service/src/main/java/com/yape/transaction/controller/TransactionController.java
new file mode 100644
index 0000000..71d8844
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/controller/TransactionController.java
@@ -0,0 +1,31 @@
+package com.yape.transaction.controller;
+
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.model.request.TransactionRequest;
+import com.yape.transaction.model.response.TransactionResponse;
+import com.yape.transaction.service.TransactionService;
+import lombok.AllArgsConstructor;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequestMapping("/api/transactions")
+@AllArgsConstructor
+public class TransactionController {
+
+ private final TransactionService transactionService;
+
+
+ @PostMapping
+ @ResponseStatus(HttpStatus.CREATED)
+ public Mono createTransaction(@RequestBody TransactionRequest transactionRequest) {
+ return transactionService.createTransaction(transactionRequest);
+ }
+
+ @GetMapping("/{transactionExternalId}")
+ @ResponseStatus(HttpStatus.OK)
+ public Mono getTransaction(@PathVariable String transactionExternalId) {
+ return transactionService.getTransaction(transactionExternalId);
+ }
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/model/entity/Transaction.java b/transaction-service/src/main/java/com/yape/transaction/model/entity/Transaction.java
new file mode 100644
index 0000000..36accfc
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/model/entity/Transaction.java
@@ -0,0 +1,26 @@
+package com.yape.transaction.model.entity;
+
+import lombok.*;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+@Document(collection = "transactions")
+@Getter
+@Setter
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class Transaction {
+ @Id
+ private String transactionExternalId;
+ private String accountExternalIdDebit;
+ private String accountExternalIdCredit;
+ private String transferTypeId;
+ private Double value;
+ private String status;
+ private LocalDateTime createdAt;
+
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/model/request/TransactionRequest.java b/transaction-service/src/main/java/com/yape/transaction/model/request/TransactionRequest.java
new file mode 100644
index 0000000..d655af2
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/model/request/TransactionRequest.java
@@ -0,0 +1,18 @@
+package com.yape.transaction.model.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+@Builder
+public class TransactionRequest {
+
+ private String accountExternalIdDebit;
+ private String accountExternalIdCredit;
+ private String transferTypeId;
+ private Double value;
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/model/response/TransactionResponse.java b/transaction-service/src/main/java/com/yape/transaction/model/response/TransactionResponse.java
new file mode 100644
index 0000000..39fbbe4
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/model/response/TransactionResponse.java
@@ -0,0 +1,16 @@
+package com.yape.transaction.model.response;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@Builder
+public class TransactionResponse {
+ private String transactionExternalId;
+ private ValueName transactionType;
+ private ValueName transactionStatus;
+ private Double value;
+ private String createdAt;
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/model/response/ValueName.java b/transaction-service/src/main/java/com/yape/transaction/model/response/ValueName.java
new file mode 100644
index 0000000..a6fe20e
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/model/response/ValueName.java
@@ -0,0 +1,13 @@
+package com.yape.transaction.model.response;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.kafka.common.protocol.types.Field;
+
+@Getter
+@Setter
+@Builder
+public class ValueName {
+ private String name;
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/repository/TransactionRepository.java b/transaction-service/src/main/java/com/yape/transaction/repository/TransactionRepository.java
new file mode 100644
index 0000000..dd1ae7a
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/repository/TransactionRepository.java
@@ -0,0 +1,9 @@
+package com.yape.transaction.repository;
+
+import com.yape.transaction.model.entity.Transaction;
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface TransactionRepository extends ReactiveMongoRepository {
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/service/KafkaConsumerService.java b/transaction-service/src/main/java/com/yape/transaction/service/KafkaConsumerService.java
new file mode 100644
index 0000000..9ab53a0
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/service/KafkaConsumerService.java
@@ -0,0 +1,8 @@
+package com.yape.transaction.service;
+
+import com.yape.transaction.model.entity.Transaction;
+
+public interface KafkaConsumerService {
+
+ void consume(Transaction transaction);
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/service/KafkaProducerService.java b/transaction-service/src/main/java/com/yape/transaction/service/KafkaProducerService.java
new file mode 100644
index 0000000..b783aa6
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/service/KafkaProducerService.java
@@ -0,0 +1,7 @@
+package com.yape.transaction.service;
+
+import com.yape.transaction.model.entity.Transaction;
+
+public interface KafkaProducerService {
+ void sendTransactionCreatedEvent(Transaction transaction);
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/service/TransactionService.java b/transaction-service/src/main/java/com/yape/transaction/service/TransactionService.java
new file mode 100644
index 0000000..ac823db
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/service/TransactionService.java
@@ -0,0 +1,14 @@
+package com.yape.transaction.service;
+
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.model.request.TransactionRequest;
+import com.yape.transaction.model.response.TransactionResponse;
+import reactor.core.publisher.Mono;
+
+public interface TransactionService {
+ Mono createTransaction(TransactionRequest transactionDto);
+
+ Mono getTransaction(String transactionExternalId);
+
+ Mono updateTransactionStatus(String transactionExternalId, String status);
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/service/impl/KafkaConsumerServiceImpl.java b/transaction-service/src/main/java/com/yape/transaction/service/impl/KafkaConsumerServiceImpl.java
new file mode 100644
index 0000000..f3d738e
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/service/impl/KafkaConsumerServiceImpl.java
@@ -0,0 +1,20 @@
+package com.yape.transaction.service.impl;
+
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.service.TransactionService;
+import lombok.AllArgsConstructor;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@AllArgsConstructor
+public class KafkaConsumerServiceImpl {
+
+
+ private final TransactionService transactionService;
+
+ @KafkaListener(topics = "transactionStatusUpdated", groupId = "group_id")
+ public void consume(Transaction transaction) {
+ transactionService.updateTransactionStatus(transaction.getTransactionExternalId(), transaction.getStatus()).subscribe();
+ }
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/service/impl/KafkaProducerServiceImpl.java b/transaction-service/src/main/java/com/yape/transaction/service/impl/KafkaProducerServiceImpl.java
new file mode 100644
index 0000000..1c1c03b
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/service/impl/KafkaProducerServiceImpl.java
@@ -0,0 +1,19 @@
+package com.yape.transaction.service.impl;
+
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.service.KafkaProducerService;
+import lombok.AllArgsConstructor;
+import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@AllArgsConstructor
+public class KafkaProducerServiceImpl implements KafkaProducerService {
+
+ private final ReactiveKafkaProducerTemplate kafkaTemplate;
+
+ @Override
+ public void sendTransactionCreatedEvent(Transaction transaction) {
+ kafkaTemplate.send("transactionCreated", transaction).subscribe();
+ }
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/service/impl/TransactionServiceImpl.java b/transaction-service/src/main/java/com/yape/transaction/service/impl/TransactionServiceImpl.java
new file mode 100644
index 0000000..312c89d
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/service/impl/TransactionServiceImpl.java
@@ -0,0 +1,59 @@
+package com.yape.transaction.service.impl;
+
+import com.yape.transaction.builder.TransactionBuilder;
+import com.yape.transaction.model.entity.Transaction;
+import com.yape.transaction.model.request.TransactionRequest;
+import com.yape.transaction.model.response.TransactionResponse;
+import com.yape.transaction.repository.TransactionRepository;
+import com.yape.transaction.service.KafkaProducerService;
+import com.yape.transaction.service.TransactionService;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+import static com.yape.transaction.builder.TransactionBuilder.buildTransactionFromRequest;
+
+@Service
+@AllArgsConstructor
+public class TransactionServiceImpl implements TransactionService {
+
+
+ private final TransactionRepository transactionRepository;
+
+
+ private final KafkaProducerService kafkaProducer;
+
+ @Override
+ public Mono createTransaction(TransactionRequest transactionRequest) {
+
+ Transaction transaction = new Transaction();
+ transaction.setTransactionExternalId(UUID.randomUUID().toString());
+ transaction.setAccountExternalIdDebit(transactionRequest.getAccountExternalIdDebit());
+ transaction.setAccountExternalIdCredit(transactionRequest.getAccountExternalIdCredit());
+ transaction.setTransferTypeId(transactionRequest.getTransferTypeId());
+ transaction.setValue(transactionRequest.getValue());
+ transaction.setStatus("PENDING");
+ transaction.setCreatedAt(LocalDateTime.now());
+
+ return transactionRepository.save(buildTransactionFromRequest(transactionRequest))
+ .doOnSuccess(kafkaProducer::sendTransactionCreatedEvent);
+ }
+
+ @Override
+ public Mono getTransaction(String transactionExternalId) {
+ return transactionRepository.findById(transactionExternalId)
+ .map(TransactionBuilder::buildTransactionToResponse);
+ }
+
+ @Override
+ public Mono updateTransactionStatus(String transactionExternalId, String status) {
+ return transactionRepository.findById(transactionExternalId)
+ .flatMap(transaction -> {
+ transaction.setStatus(status);
+ return transactionRepository.save(transaction);
+ });
+ }
+}
diff --git a/transaction-service/src/main/java/com/yape/transaction/util/DateUtil.java b/transaction-service/src/main/java/com/yape/transaction/util/DateUtil.java
new file mode 100644
index 0000000..4c75656
--- /dev/null
+++ b/transaction-service/src/main/java/com/yape/transaction/util/DateUtil.java
@@ -0,0 +1,16 @@
+package com.yape.transaction.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DateUtil {
+ public static final String DATE_TIME_AMPM_FORMAT = "dd/MM/yyyy hh:mm:ss";
+
+ public static String parseDateToString(LocalDateTime date) {
+ return date != null ? date.format(DateTimeFormatter.ofPattern(DATE_TIME_AMPM_FORMAT)) : null;
+ }
+}
diff --git a/transaction-service/src/main/resources/application.yml b/transaction-service/src/main/resources/application.yml
new file mode 100644
index 0000000..e375596
--- /dev/null
+++ b/transaction-service/src/main/resources/application.yml
@@ -0,0 +1,44 @@
+spring:
+ application:
+ name: transaction-service
+ data:
+ mongodb:
+ database: transactions
+ uri: mongodb://localhost:27017/transactions
+ kafka:
+ bootstrap-servers: localhost:9092
+ consumer:
+ group-id: group_id
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring:
+ deserializer:
+ value:
+ delegate:
+ class: org.springframework.kafka.support.serializer.JsonDeserializer
+ json:
+ trusted:
+ packages: "com.yape.transaction.model.entity,com.yape.antifraud.model.entity"
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ listener:
+ missing-topics-fatal: false
+ graphql:
+ graphiql:
+ enabled: true
+ path: /graphiql
+ profiles:
+ active: local
+
+server:
+ port: 8081
+
+logging:
+ level:
+ org.springframework.data.mongodb.core.ReactiveMongoTemplate: DEBUG
+ web: TRACE
+ org.springframework.web: TRACE
+
diff --git a/transaction-service/src/main/resources/graphql/QueryTest b/transaction-service/src/main/resources/graphql/QueryTest
new file mode 100644
index 0000000..f44927b
--- /dev/null
+++ b/transaction-service/src/main/resources/graphql/QueryTest
@@ -0,0 +1,22 @@
+query GetTransaction {
+ getTransaction(transactionExternalId: "bb86c138-df4e-4f77-a4b0-71121394064d"){
+ transactionExternalId
+ accountExternalIdDebit
+ transactionExternalId
+ transferTypeId
+ value
+ status
+ createdAt
+ }
+}
+
+mutation CreateTransaction{
+
+ createTransaction(accountExternalIdDebit: "123123", accountExternalIdCredit: "321312", transferTypeId: "12", value: 999){
+ transactionExternalId
+ accountExternalIdDebit
+ value
+
+ }
+
+}
\ No newline at end of file
diff --git a/transaction-service/src/main/resources/graphql/transaction.graphqls b/transaction-service/src/main/resources/graphql/transaction.graphqls
new file mode 100644
index 0000000..478b993
--- /dev/null
+++ b/transaction-service/src/main/resources/graphql/transaction.graphqls
@@ -0,0 +1,24 @@
+type Transaction {
+ transactionExternalId: String
+ accountExternalIdDebit: String
+ accountExternalIdCredit: String
+ transferTypeId: String
+ value: Float
+ status: String
+ createdAt: String
+}
+
+input TransactionInput {
+ accountExternalIdDebit: String
+ accountExternalIdCredit: String
+ transferTypeId: String
+ value: Float
+}
+
+type Query {
+ getTransaction(transactionExternalId: String): Transaction
+}
+
+type Mutation {
+ createTransaction(accountExternalIdDebit: String, accountExternalIdCredit: String, transferTypeId: String, value: Float): Transaction
+}
diff --git a/transaction-service/wait-for-kafka.sh b/transaction-service/wait-for-kafka.sh
new file mode 100644
index 0000000..773dcef
--- /dev/null
+++ b/transaction-service/wait-for-kafka.sh
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+set -e
+
+host="$1"
+shift
+cmd="$@"
+
+until nc -z "$host" 9092; do
+ >&2 echo "Kafka is unavailable - sleeping"
+ sleep 5
+done
+
+>&2 echo "Kafka is up - executing command"
+exec $cmd