From 603f4ef2564997ec7b514b0c9f3a29959b9c0d48 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 16:53:41 +0100 Subject: [PATCH 1/7] Remove resultConverter from ReactiveStatementFactory --- .../se/fortnox/reactivewizard/db/DbProxy.java | 19 ---------------- .../db/ReactiveStatementFactory.java | 22 +++++++++---------- .../db/ReactiveStatementFactoryTest.java | 2 +- 3 files changed, 12 insertions(+), 31 deletions(-) diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java b/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java index b7246750..597f2159 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java @@ -3,9 +3,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import se.fortnox.reactivewizard.db.config.DatabaseConfig; @@ -123,7 +120,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl pagingOutput, createMetrics(method), databaseConfig, - converterFromPublisher(method), method); statementFactories.put(method, reactiveStatementFactory); } @@ -131,21 +127,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl return reactiveStatementFactory.create(args, connectionScheduler); } - private static Function converterFromPublisher(Method method) { - Class returnType = method.getReturnType(); - - if (Flux.class.isAssignableFrom(returnType)) { - return flux -> flux; - } else if (Mono.class.isAssignableFrom(returnType)) { - return Mono::from; - } else { - throw new IllegalArgumentException(String.format("DAO method %s::%s must return a Flux or Mono. Found %s", - method.getDeclaringClass().getName(), - method.getName(), - method.getReturnType().getName())); - } - } - private Metrics createMetrics(Method method) { String type = method.isAnnotationPresent(Query.class) ? "query" : "update"; String metricsName = format( diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java index 2ae4628b..1bddbba0 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java @@ -1,6 +1,5 @@ package se.fortnox.reactivewizard.db; -import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -18,7 +17,6 @@ import java.lang.reflect.Method; import java.sql.Connection; import java.sql.SQLException; -import java.util.function.Function; import static java.lang.String.format; import static se.fortnox.reactivewizard.util.ReactiveDecorator.decorated; @@ -30,25 +28,22 @@ public class ReactiveStatementFactory { private static final String QUERY_FAILED = "Query failed"; private final DbStatementFactory statementFactory; private final PagingOutput pagingOutput; - private final Function resultConverter; + private final Method method; private final Metrics metrics; private final DatabaseConfig config; - private final boolean isReturnTypeMono; public ReactiveStatementFactory( DbStatementFactory statementFactory, PagingOutput pagingOutput, Metrics metrics, DatabaseConfig config, - Function resultConverter, Method method) { this.statementFactory = statementFactory; this.pagingOutput = pagingOutput; this.metrics = metrics; this.config = config; - this.resultConverter = resultConverter; - isReturnTypeMono = Mono.class.isAssignableFrom(method.getReturnType()); + this.method = method; } @@ -101,7 +96,7 @@ private Mono getResultMono(StatementContext statementContext) { */ public Object create(Object[] args, ConnectionScheduler connectionScheduler) { StatementContext statementContext = new StatementContext(() -> statementFactory.create(args), connectionScheduler); - if (isReturnTypeMono) { + if (Mono.class.isAssignableFrom(method.getReturnType())) { Mono resultMono = getResultMono(statementContext); if (shouldAddDebugErrorHandling()) { Exception queryFailure = new RuntimeException(QUERY_FAILED); @@ -111,8 +106,8 @@ public Object create(Object[] args, ConnectionScheduler connectionScheduler) { }); } resultMono = Mono.from(metrics.measure(resultMono, this::logSlowQuery)); - return decorated(resultConverter.apply(resultMono), statementContext); - } else { + return decorated(resultMono, statementContext); + } else if (Flux.class.isAssignableFrom(method.getReturnType())) { Flux resultFlux = getResultFlux(statementContext); if (shouldAddDebugErrorHandling()) { Exception queryFailure = new RuntimeException(QUERY_FAILED); @@ -124,7 +119,12 @@ public Object create(Object[] args, ConnectionScheduler connectionScheduler) { resultFlux = pagingOutput.apply(resultFlux, args); resultFlux = Flux.from(metrics.measure(resultFlux, this::logSlowQuery)); resultFlux = resultFlux.onBackpressureBuffer(RECORD_BUFFER_SIZE); - return decorated(resultConverter.apply(resultFlux), statementContext); + return decorated(resultFlux, statementContext); + } else { + throw new IllegalArgumentException(String.format("DAO method %s::%s must return a Flux or Mono. Found %s", + method.getDeclaringClass().getName(), + method.getName(), + method.getReturnType().getName())); } } diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java b/dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java index a7b23f85..c6991caa 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java @@ -100,7 +100,7 @@ public void setMonoSink(MonoSink monoSink) { }); - statementFactory = new ReactiveStatementFactory(dbStatementFactory, pagingOutput, Metrics.get("test"), databaseConfig, o -> o, getRequiredMethod(TestDao.class, "select")); + statementFactory = new ReactiveStatementFactory(dbStatementFactory, pagingOutput, Metrics.get("test"), databaseConfig, getRequiredMethod(TestDao.class, "select")); } @Test From 378654eb2502a4ec3a0455b4e50da11386dbd8b9 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 17:57:58 +0100 Subject: [PATCH 2/7] Remove paramSerializer from DbProxy --- .../se/fortnox/reactivewizard/db/DbProxy.java | 27 +++++-------------- .../se/fortnox/reactivewizard/db/DaoTest.java | 5 ++-- .../db/DaoTransactionsTest.java | 5 ++-- .../reactivewizard/db/DbProxyTest.java | 25 ----------------- 4 files changed, 11 insertions(+), 51 deletions(-) diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java b/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java index 597f2159..de367113 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java @@ -1,6 +1,5 @@ package se.fortnox.reactivewizard.db; -import com.fasterxml.jackson.core.type.TypeReference; import jakarta.inject.Inject; import jakarta.inject.Singleton; import reactor.core.scheduler.Scheduler; @@ -10,7 +9,6 @@ import se.fortnox.reactivewizard.db.statement.DbStatementFactory; import se.fortnox.reactivewizard.db.statement.DbStatementFactoryFactory; import se.fortnox.reactivewizard.db.transactions.ConnectionScheduler; -import se.fortnox.reactivewizard.json.JsonSerializerFactory; import se.fortnox.reactivewizard.metrics.Metrics; import se.fortnox.reactivewizard.util.DebugUtil; import se.fortnox.reactivewizard.util.ReflectionUtil; @@ -21,43 +19,35 @@ import java.lang.reflect.Proxy; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; import static java.text.MessageFormat.format; @Singleton public class DbProxy implements InvocationHandler { - private static final TypeReference OBJECT_ARRAY_TYPE_REFERENCE = new TypeReference<>() { - }; private final DbStatementFactoryFactory dbStatementFactoryFactory; private final Scheduler scheduler; protected final Map statementFactories; private final ConnectionScheduler connectionScheduler; - protected final Function paramSerializer; private final DatabaseConfig databaseConfig; @Inject public DbProxy(DatabaseConfig databaseConfig, @Nullable ConnectionProvider connectionProvider, - DbStatementFactoryFactory dbStatementFactoryFactory, - JsonSerializerFactory jsonSerializerFactory + DbStatementFactoryFactory dbStatementFactoryFactory ) { this(databaseConfig, threadPool(databaseConfig.getPoolSize()), connectionProvider, - dbStatementFactoryFactory, - jsonSerializerFactory); + dbStatementFactoryFactory); } public DbProxy(DatabaseConfig databaseConfig, Scheduler scheduler, ConnectionProvider connectionProvider, - DbStatementFactoryFactory dbStatementFactoryFactory, - JsonSerializerFactory jsonSerializerFactory + DbStatementFactoryFactory dbStatementFactoryFactory ) { this(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, - jsonSerializerFactory.createStringSerializer(OBJECT_ARRAY_TYPE_REFERENCE), new ConcurrentHashMap<>()); } @@ -65,20 +55,17 @@ public DbProxy(DatabaseConfig databaseConfig, ConnectionProvider connectionProvi this(databaseConfig, Schedulers.boundedElastic(), connectionProvider, - new DbStatementFactoryFactory(), - new JsonSerializerFactory()); + new DbStatementFactoryFactory()); } protected DbProxy(DatabaseConfig databaseConfig, Scheduler scheduler, ConnectionProvider connectionProvider, DbStatementFactoryFactory dbStatementFactoryFactory, - Function paramSerializer, Map statementFactories ) { this.scheduler = scheduler; this.dbStatementFactoryFactory = dbStatementFactoryFactory; - this.paramSerializer = paramSerializer; this.databaseConfig = databaseConfig; this.statementFactories = statementFactories; this.connectionScheduler = new ConnectionScheduler(connectionProvider, scheduler); @@ -136,15 +123,15 @@ private Metrics createMetrics(Method method) { } public DbProxy usingConnectionProvider(ConnectionProvider connectionProvider) { - return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, paramSerializer, statementFactories); + return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, statementFactories); } public DbProxy usingConnectionProvider(ConnectionProvider connectionProvider, DatabaseConfig databaseConfig) { - return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, paramSerializer, statementFactories); + return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, statementFactories); } public DbProxy usingConnectionProvider(ConnectionProvider newConnectionProvider, Scheduler newScheduler) { - return new DbProxy(databaseConfig, newScheduler, newConnectionProvider, dbStatementFactoryFactory, paramSerializer, statementFactories); + return new DbProxy(databaseConfig, newScheduler, newConnectionProvider, dbStatementFactoryFactory, statementFactories); } public DatabaseConfig getDatabaseConfig() { diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTest.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTest.java index e7c2eafd..c2356b80 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTest.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTest.java @@ -8,7 +8,6 @@ import reactor.test.StepVerifier; import se.fortnox.reactivewizard.db.config.DatabaseConfig; import se.fortnox.reactivewizard.db.statement.DbStatementFactoryFactory; -import se.fortnox.reactivewizard.json.JsonSerializerFactory; import java.sql.SQLException; import java.util.concurrent.atomic.AtomicInteger; @@ -34,8 +33,8 @@ public void reset() { new DatabaseConfig(), Schedulers.newBoundedElastic(1, Integer.MAX_VALUE, "DaoTestDbProxy"), db.getConnectionProvider(), - new DbStatementFactoryFactory(), - new JsonSerializerFactory()).create(DaoTransactionsTest.TestDao.class); + new DbStatementFactoryFactory() + ).create(DaoTransactionsTest.TestDao.class); } @Test diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTransactionsTest.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTransactionsTest.java index d4100558..f25ff031 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTransactionsTest.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DaoTransactionsTest.java @@ -16,7 +16,6 @@ import se.fortnox.reactivewizard.db.transactions.DaoTransactions; import se.fortnox.reactivewizard.db.transactions.DaoTransactionsImpl; import se.fortnox.reactivewizard.db.transactions.StatementContext; -import se.fortnox.reactivewizard.json.JsonSerializerFactory; import se.fortnox.reactivewizard.util.ReactiveDecorator; import java.sql.Connection; @@ -432,7 +431,7 @@ void shouldUseSpecifiedConnectionAndScheduler() throws SQLException { VirtualTimeScheduler otherScheduler = VirtualTimeScheduler.create(); DbProxy otherDbProxy = new DbProxy( new DatabaseConfig(), otherScheduler, - null, new DbStatementFactoryFactory(), new JsonSerializerFactory()); + null, new DbStatementFactoryFactory()); otherDbProxy = otherDbProxy.usingConnectionProvider(otherConnectionProvider); when(otherDb.getPreparedStatement().executeBatch()) @@ -461,7 +460,7 @@ void shouldUseSecondConnectionAndSchedulerIfFirstPublisherHasNoConnectionProvide ConnectionProvider otherConnectionProvider = secondDb.getConnectionProvider(); DbProxy dbProxyWithoutConnectionProvider = new DbProxy( - new DatabaseConfig(), Schedulers.boundedElastic(), null, new DbStatementFactoryFactory(), new JsonSerializerFactory()); + new DatabaseConfig(), Schedulers.boundedElastic(), null, new DbStatementFactoryFactory()); DbProxy dbProxyWithConnectionProvider = dbProxyWithoutConnectionProvider.usingConnectionProvider(otherConnectionProvider); when(secondDb.getPreparedStatement().executeBatch()) diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/DbProxyTest.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DbProxyTest.java index 36ef014c..e89f8cc2 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/DbProxyTest.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DbProxyTest.java @@ -1,6 +1,5 @@ package se.fortnox.reactivewizard.db; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.inject.Injector; import org.assertj.core.api.ThrowableAssertAlternative; import org.junit.jupiter.api.BeforeEach; @@ -13,7 +12,6 @@ import reactor.test.StepVerifier; import se.fortnox.reactivewizard.config.TestInjector; import se.fortnox.reactivewizard.db.config.DatabaseConfig; -import se.fortnox.reactivewizard.json.JsonSerializerFactory; import se.fortnox.reactivewizard.util.DebugUtil; import java.sql.SQLException; @@ -21,7 +19,6 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.YearMonth; -import java.util.concurrent.atomic.AtomicReference; import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; @@ -29,13 +26,11 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -141,26 +136,6 @@ record Wrapper(String nullableValue) {}; verify(mockDb.getConnection()).close(); } - @Test - void shouldReuseTypeReference() { - final JsonSerializerFactory jsonSerializerFactoryReal = new JsonSerializerFactory(); - final JsonSerializerFactory jsonSerializerFactory = spy(jsonSerializerFactoryReal); - AtomicReference> typeReferenceAtomicReference = new AtomicReference<>(); - - when(jsonSerializerFactory.createStringSerializer(any(TypeReference.class))).thenAnswer(invocationOnMock -> { - final TypeReference argument = invocationOnMock.getArgument(0); - typeReferenceAtomicReference.set(argument); - return jsonSerializerFactoryReal.createStringSerializer(argument); - }); - new DbProxy(new DatabaseConfig(), null, null, jsonSerializerFactory); - - final TypeReference firstTypeReference = typeReferenceAtomicReference.get(); - new DbProxy(new DatabaseConfig(), null, null, jsonSerializerFactory); - final TypeReference secondTypeReference = typeReferenceAtomicReference.get(); - - assertThat(firstTypeReference).isSameAs(secondTypeReference); - } - @Test void shouldReturnDataFromDbForQueryWithFlux() throws SQLException, InterruptedException { mockDb.addRowColumn(1, 1, "sql_val", String.class, "myname"); From 90d778f17ded5e8344dff1bab28d82ae6c813382 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 20:24:02 +0100 Subject: [PATCH 3/7] Move dao method related code from ReactiveStatementFactory to DaoMethodHandler --- .../reactivewizard/db/DaoMethodHandler.java | 50 ++++++ .../se/fortnox/reactivewizard/db/DbProxy.java | 97 ++++-------- .../db/ReactiveStatementFactory.java | 149 +++++++++++------- ...oryTest.java => DaoMethodHandlerTest.java} | 26 ++- 4 files changed, 192 insertions(+), 130 deletions(-) create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/DaoMethodHandler.java rename dao/src/test/java/se/fortnox/reactivewizard/db/{ReactiveStatementFactoryTest.java => DaoMethodHandlerTest.java} (81%) diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/DaoMethodHandler.java b/dao/src/main/java/se/fortnox/reactivewizard/db/DaoMethodHandler.java new file mode 100644 index 00000000..81452b29 --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/DaoMethodHandler.java @@ -0,0 +1,50 @@ +package se.fortnox.reactivewizard.db; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import se.fortnox.reactivewizard.db.paging.PagingOutput; +import se.fortnox.reactivewizard.db.statement.DbStatementFactory; +import se.fortnox.reactivewizard.metrics.Metrics; + +import java.lang.reflect.Method; + +public class DaoMethodHandler { + + private final Method method; + private final DbStatementFactory statementFactory; + private final PagingOutput pagingOutput; + private final Metrics metrics; + + public DaoMethodHandler(Method method, + DbStatementFactory statementFactory, PagingOutput pagingOutput, Metrics metrics) { + this.method = method; + this.statementFactory = statementFactory; + this.pagingOutput = pagingOutput; + this.metrics = metrics; + } + + public Publisher run(Object[] args, ReactiveStatementFactory reactiveStatementFactory) { + if (Mono.class.isAssignableFrom(method.getReturnType())) { + return reactiveStatementFactory.createMono( + metrics, + statementFactory.toString(), + () -> statementFactory.create(args) + ); + } else if (Flux.class.isAssignableFrom(method.getReturnType())) { + return reactiveStatementFactory.createFlux( + metrics, + statementFactory.toString(), + () -> statementFactory.create(args), + flux -> pagingOutput.apply(flux, args) + ); + } else { + throw new IllegalArgumentException(String.format( + "DAO method %s::%s must return a Flux or Mono. Found %s", + method.getDeclaringClass().getName(), + method.getName(), + method.getReturnType().getName() + )); + } + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java b/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java index de367113..f62d9671 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/DbProxy.java @@ -6,14 +6,11 @@ import reactor.core.scheduler.Schedulers; import se.fortnox.reactivewizard.db.config.DatabaseConfig; import se.fortnox.reactivewizard.db.paging.PagingOutput; -import se.fortnox.reactivewizard.db.statement.DbStatementFactory; import se.fortnox.reactivewizard.db.statement.DbStatementFactoryFactory; -import se.fortnox.reactivewizard.db.transactions.ConnectionScheduler; import se.fortnox.reactivewizard.metrics.Metrics; import se.fortnox.reactivewizard.util.DebugUtil; import se.fortnox.reactivewizard.util.ReflectionUtil; -import javax.annotation.Nullable; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -26,56 +23,30 @@ public class DbProxy implements InvocationHandler { private final DbStatementFactoryFactory dbStatementFactoryFactory; - private final Scheduler scheduler; - protected final Map statementFactories; - private final ConnectionScheduler connectionScheduler; - private final DatabaseConfig databaseConfig; + private final Map handlers; + private final ReactiveStatementFactory reactiveStatementFactory; @Inject - public DbProxy(DatabaseConfig databaseConfig, - @Nullable ConnectionProvider connectionProvider, - DbStatementFactoryFactory dbStatementFactoryFactory - ) { - this(databaseConfig, - threadPool(databaseConfig.getPoolSize()), - connectionProvider, - dbStatementFactoryFactory); - } - - public DbProxy(DatabaseConfig databaseConfig, - Scheduler scheduler, - ConnectionProvider connectionProvider, - DbStatementFactoryFactory dbStatementFactoryFactory - ) { - this(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, - new ConcurrentHashMap<>()); + public DbProxy(ReactiveStatementFactory reactiveStatementFactory, + DbStatementFactoryFactory dbStatementFactoryFactory) { + this(reactiveStatementFactory, dbStatementFactoryFactory, new ConcurrentHashMap<>()); } public DbProxy(DatabaseConfig databaseConfig, ConnectionProvider connectionProvider) { - this(databaseConfig, - Schedulers.boundedElastic(), - connectionProvider, - new DbStatementFactoryFactory()); + this(databaseConfig, Schedulers.boundedElastic(), connectionProvider, new DbStatementFactoryFactory()); } - protected DbProxy(DatabaseConfig databaseConfig, - Scheduler scheduler, - ConnectionProvider connectionProvider, - DbStatementFactoryFactory dbStatementFactoryFactory, - Map statementFactories - ) { - this.scheduler = scheduler; - this.dbStatementFactoryFactory = dbStatementFactoryFactory; - this.databaseConfig = databaseConfig; - this.statementFactories = statementFactories; - this.connectionScheduler = new ConnectionScheduler(connectionProvider, scheduler); + public DbProxy(DatabaseConfig databaseConfig, Scheduler scheduler, ConnectionProvider connectionProvider, + DbStatementFactoryFactory dbStatementFactoryFactory) { + this(new ReactiveStatementFactory(databaseConfig, scheduler, connectionProvider), dbStatementFactoryFactory); } - private static Scheduler threadPool(int poolSize) { - if (poolSize == -1) { - return Schedulers.boundedElastic(); - } - return Schedulers.newBoundedElastic(10, Integer.MAX_VALUE, "DbProxy"); + protected DbProxy(ReactiveStatementFactory reactiveStatementFactory, + DbStatementFactoryFactory dbStatementFactoryFactory, + Map handlers) { + this.dbStatementFactoryFactory = dbStatementFactoryFactory; + this.reactiveStatementFactory = reactiveStatementFactory; + this.handlers = handlers; } /** @@ -87,55 +58,53 @@ private static Scheduler threadPool(int poolSize) { */ public T create(Class daoInterface) { return (T) Proxy.newProxyInstance(daoInterface.getClassLoader(), - new Class[]{daoInterface}, - this); + new Class[]{daoInterface}, + this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - ReactiveStatementFactory reactiveStatementFactory = statementFactories.get(method); - if (reactiveStatementFactory == null || DebugUtil.IS_DEBUG) { + var handler = handlers.get(method); + if (handler == null || DebugUtil.IS_DEBUG) { if (DebugUtil.IS_DEBUG) { // Need to get the actual interface method in order to get updated annotations method = ReflectionUtil.getRedefinedMethod(method); } - DbStatementFactory statementFactory = dbStatementFactoryFactory.createStatementFactory(method); - PagingOutput pagingOutput = new PagingOutput(method); - reactiveStatementFactory = new ReactiveStatementFactory( - statementFactory, - pagingOutput, - createMetrics(method), - databaseConfig, - method); - statementFactories.put(method, reactiveStatementFactory); + handler = new DaoMethodHandler( + method, + new DbStatementFactoryFactory().createStatementFactory(method), + new PagingOutput(method), + createMetrics(method) + ); + handlers.put(method, handler); } - return reactiveStatementFactory.create(args, connectionScheduler); + return handler.run(args, reactiveStatementFactory); } private Metrics createMetrics(Method method) { String type = method.isAnnotationPresent(Query.class) ? "query" : "update"; String metricsName = format( - "DAO_type:{0}_method:{1}.{2}_{3}", - type, method.getDeclaringClass().getName(), method.getName(), method.getParameterCount()); + "DAO_type:{0}_method:{1}.{2}_{3}", + type, method.getDeclaringClass().getName(), method.getName(), method.getParameterCount()); return Metrics.get(metricsName); } public DbProxy usingConnectionProvider(ConnectionProvider connectionProvider) { - return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, statementFactories); + return new DbProxy(reactiveStatementFactory.usingConnectionProvider(connectionProvider), dbStatementFactoryFactory, handlers); } public DbProxy usingConnectionProvider(ConnectionProvider connectionProvider, DatabaseConfig databaseConfig) { - return new DbProxy(databaseConfig, scheduler, connectionProvider, dbStatementFactoryFactory, statementFactories); + return new DbProxy(reactiveStatementFactory.usingConnectionProvider(connectionProvider, databaseConfig), dbStatementFactoryFactory, handlers); } public DbProxy usingConnectionProvider(ConnectionProvider newConnectionProvider, Scheduler newScheduler) { - return new DbProxy(databaseConfig, newScheduler, newConnectionProvider, dbStatementFactoryFactory, statementFactories); + return new DbProxy(reactiveStatementFactory.usingConnectionProvider(newConnectionProvider, newScheduler), dbStatementFactoryFactory, handlers); } public DatabaseConfig getDatabaseConfig() { - return databaseConfig; + return reactiveStatementFactory.getDatabaseConfig(); } } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java index 1bddbba0..a4dc9a5e 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/ReactiveStatementFactory.java @@ -1,22 +1,24 @@ package se.fortnox.reactivewizard.db; +import jakarta.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import se.fortnox.reactivewizard.db.config.DatabaseConfig; -import se.fortnox.reactivewizard.db.paging.PagingOutput; -import se.fortnox.reactivewizard.db.statement.DbStatementFactory; import se.fortnox.reactivewizard.db.statement.Statement; import se.fortnox.reactivewizard.db.transactions.ConnectionScheduler; import se.fortnox.reactivewizard.db.transactions.StatementContext; import se.fortnox.reactivewizard.metrics.Metrics; import se.fortnox.reactivewizard.util.DebugUtil; -import java.lang.reflect.Method; import java.sql.Connection; import java.sql.SQLException; +import java.util.function.Function; +import java.util.function.Supplier; import static java.lang.String.format; import static se.fortnox.reactivewizard.util.ReactiveDecorator.decorated; @@ -24,28 +26,28 @@ public class ReactiveStatementFactory { private static final int RECORD_BUFFER_SIZE = 100000; + private static final Logger LOG = LoggerFactory.getLogger("Dao"); + private static final String QUERY_FAILED = "Query failed"; - private final DbStatementFactory statementFactory; - private final PagingOutput pagingOutput; - private final Method method; - private final Metrics metrics; private final DatabaseConfig config; - public ReactiveStatementFactory( - DbStatementFactory statementFactory, - PagingOutput pagingOutput, - Metrics metrics, - DatabaseConfig config, - Method method) { - this.statementFactory = statementFactory; - this.pagingOutput = pagingOutput; - this.metrics = metrics; - this.config = config; - this.method = method; + private final Scheduler scheduler; + + private final ConnectionScheduler connectionScheduler; + + @Inject + public ReactiveStatementFactory(DatabaseConfig config, ConnectionProvider connectionProvider) { + this(config, threadPool(config.getPoolSize()), connectionProvider); } + protected ReactiveStatementFactory(DatabaseConfig config, Scheduler scheduler, + ConnectionProvider connectionProvider) { + this.config = config; + this.scheduler = scheduler; + this.connectionScheduler = new ConnectionScheduler(connectionProvider, scheduler); + } private static void closeSilently(Connection connection) { try { @@ -55,7 +57,7 @@ private static void closeSilently(Connection connection) { } } - private Flux getResultFlux(StatementContext statementContext) { + private Flux getResultFlux(StatementContext statementContext) { return Flux.create(fluxSink -> { try { statementContext.getConnectionScheduler() @@ -72,7 +74,7 @@ private Flux getResultFlux(StatementContext statementContext) { }, FluxSink.OverflowStrategy.ERROR); } - private Mono getResultMono(StatementContext statementContext) { + private Mono getResultMono(StatementContext statementContext) { return Mono.create(monoSink -> monoSink.onRequest(unusedRequestedAmount -> { try { statementContext.getConnectionScheduler() @@ -87,50 +89,54 @@ private Mono getResultMono(StatementContext statementContext) { })); } - /** - * Create Flux statement. - * - * @param args the arguments - * @param connectionScheduler the scheduler - * @return the Flux statement - */ - public Object create(Object[] args, ConnectionScheduler connectionScheduler) { - StatementContext statementContext = new StatementContext(() -> statementFactory.create(args), connectionScheduler); - if (Mono.class.isAssignableFrom(method.getReturnType())) { - Mono resultMono = getResultMono(statementContext); - if (shouldAddDebugErrorHandling()) { - Exception queryFailure = new RuntimeException(QUERY_FAILED); - resultMono = resultMono.onErrorResume(thrown -> { - queryFailure.initCause(thrown); - return Mono.error(queryFailure); - }); - } - resultMono = Mono.from(metrics.measure(resultMono, this::logSlowQuery)); - return decorated(resultMono, statementContext); - } else if (Flux.class.isAssignableFrom(method.getReturnType())) { - Flux resultFlux = getResultFlux(statementContext); - if (shouldAddDebugErrorHandling()) { - Exception queryFailure = new RuntimeException(QUERY_FAILED); - resultFlux = resultFlux.onErrorResume(thrown -> { - queryFailure.initCause(thrown); - return Flux.error(queryFailure); - }); - } - resultFlux = pagingOutput.apply(resultFlux, args); - resultFlux = Flux.from(metrics.measure(resultFlux, this::logSlowQuery)); - resultFlux = resultFlux.onBackpressureBuffer(RECORD_BUFFER_SIZE); - return decorated(resultFlux, statementContext); - } else { - throw new IllegalArgumentException(String.format("DAO method %s::%s must return a Flux or Mono. Found %s", - method.getDeclaringClass().getName(), - method.getName(), - method.getReturnType().getName())); + public Mono createMono( + Metrics metrics, + String rawSql, + Supplier statementSupplier + ) { + var statementContext = new StatementContext(statementSupplier, connectionScheduler); + Mono resultMono = getResultMono(statementContext); + if (shouldAddDebugErrorHandling()) { + resultMono = resultMono.onErrorResume(thrown -> + Mono.error(new RuntimeException(QUERY_FAILED, thrown)) + ); + } + resultMono = Mono.from(metrics.measure(resultMono, (time) -> logIfSlowQuery(time, rawSql))); + return decorated(resultMono, statementContext); + } + + public Flux createFlux( + Metrics metrics, + String rawSql, + Supplier statementSupplier, + Function, Flux> fluxMapper + ) { + var statementContext = new StatementContext(statementSupplier, connectionScheduler); + Flux resultFlux = getResultFlux(statementContext); + if (shouldAddDebugErrorHandling()) { + resultFlux = resultFlux.onErrorResume(thrown -> + Flux.error(new RuntimeException(QUERY_FAILED, thrown)) + ); + } + if (fluxMapper != null) { + resultFlux = fluxMapper.apply(resultFlux); } + resultFlux = Flux.from(metrics.measure(resultFlux, (time) -> logIfSlowQuery(time, rawSql))); + resultFlux = resultFlux.onBackpressureBuffer(RECORD_BUFFER_SIZE); + return decorated(resultFlux, statementContext); } - private void logSlowQuery(long time) { + public Flux createFlux( + Metrics metrics, + String rawSql, + Supplier statementSupplier + ) { + return createFlux(metrics, rawSql, statementSupplier, null); + } + + private void logIfSlowQuery(long time, String rawSql) { if (time > config.getSlowQueryLogThreshold()) { - LOG.warn(format("Slow query: %s%ntime: %d", statementFactory, time)); + LOG.warn(format("Slow query: %s%ntime: %d", rawSql, time)); } } @@ -149,4 +155,27 @@ private void executeStatement(Statement dbStatement, Connection connection) { private static boolean shouldAddDebugErrorHandling() { return DebugUtil.IS_DEBUG || LOG.isDebugEnabled(); } + + private static Scheduler threadPool(int poolSize) { + if (poolSize == -1) { + return Schedulers.boundedElastic(); + } + return Schedulers.newBoundedElastic(10, Integer.MAX_VALUE, "DbProxy"); + } + + public ReactiveStatementFactory usingConnectionProvider(ConnectionProvider connectionProvider) { + return new ReactiveStatementFactory(config, scheduler, connectionProvider); + } + + public ReactiveStatementFactory usingConnectionProvider(ConnectionProvider connectionProvider, DatabaseConfig databaseConfig) { + return new ReactiveStatementFactory(databaseConfig, scheduler, connectionProvider); + } + + public ReactiveStatementFactory usingConnectionProvider(ConnectionProvider newConnectionProvider, Scheduler newScheduler) { + return new ReactiveStatementFactory(config, newScheduler, newConnectionProvider); + } + + public DatabaseConfig getDatabaseConfig() { + return config; + } } diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DaoMethodHandlerTest.java similarity index 81% rename from dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java rename to dao/src/test/java/se/fortnox/reactivewizard/db/DaoMethodHandlerTest.java index c6991caa..fb82ae5b 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/ReactiveStatementFactoryTest.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DaoMethodHandlerTest.java @@ -14,11 +14,11 @@ import se.fortnox.reactivewizard.db.paging.PagingOutput; import se.fortnox.reactivewizard.db.statement.DbStatementFactory; import se.fortnox.reactivewizard.db.statement.Statement; -import se.fortnox.reactivewizard.db.transactions.ConnectionScheduler; import se.fortnox.reactivewizard.metrics.Metrics; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.SQLException; import static org.junit.platform.commons.util.ReflectionUtils.getRequiredMethod; import static org.mockito.ArgumentMatchers.any; @@ -27,7 +27,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class ReactiveStatementFactoryTest { +class DaoMethodHandlerTest { @Mock private PagingOutput pagingOutput; @@ -43,10 +43,12 @@ class ReactiveStatementFactoryTest { @Mock private Scheduler.Worker worker; - private ReactiveStatementFactory statementFactory; + private ReactiveStatementFactory reactiveStatementFactory; + + private DaoMethodHandler daoMethodHandler; @BeforeEach - public void setUp() { + public void setUp() throws SQLException { when(pagingOutput.apply(any(), any())).then(invocationOnMock -> invocationOnMock.getArgument(0, Flux.class)); when(scheduler.createWorker()).thenReturn(worker); @@ -100,12 +102,24 @@ public void setMonoSink(MonoSink monoSink) { }); - statementFactory = new ReactiveStatementFactory(dbStatementFactory, pagingOutput, Metrics.get("test"), databaseConfig, getRequiredMethod(TestDao.class, "select")); + reactiveStatementFactory = new ReactiveStatementFactory( + databaseConfig, + scheduler, + () -> mock(Connection.class) + ); + + var method = getRequiredMethod(TestDao.class, "select"); + daoMethodHandler = new DaoMethodHandler( + method, + dbStatementFactory, + pagingOutput, + Metrics.get("test") + ); } @Test void shouldReleaseSchedulerWorkers() { - Flux stmt = (Flux) statementFactory.create(new Object[0], new ConnectionScheduler(() -> mock(Connection.class), scheduler)); + Flux stmt = (Flux) daoMethodHandler.run(new Object[0], reactiveStatementFactory); stmt.blockFirst(); verify(scheduler).createWorker(); verify(worker).dispose(); From bf0f15c1fd927083c2865ac96eb3d1bb4f86b6d1 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 17:47:48 +0100 Subject: [PATCH 4/7] Rename DbResultSetDeserializer to DbResultSetDeserializerImpl --- ....java => DbResultSetDeserializerImpl.java} | 4 +- .../db/statement/SelectStatementFactory.java | 6 +- ...StatementReturningGeneratedKeyFactory.java | 7 +-- ...a => DbResultSetDeserializerImplTest.java} | 62 +++++++++---------- .../fortnox/reactivewizard/db/DbTestObj.java | 2 +- .../reactivewizard/db/DbTestObjRecord.java | 2 +- .../reactivewizard/db/ImmutableDbTestObj.java | 2 +- 7 files changed, 42 insertions(+), 43 deletions(-) rename dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/{DbResultSetDeserializer.java => DbResultSetDeserializerImpl.java} (94%) rename dao/src/test/java/se/fortnox/reactivewizard/db/{DbResultSetDeserializerTest.java => DbResultSetDeserializerImplTest.java} (89%) diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java similarity index 94% rename from dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java rename to dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java index 0b0398af..ef3c13c8 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java @@ -6,11 +6,11 @@ import static se.fortnox.reactivewizard.db.deserializing.MutabilityDetector.isImmutable; -public class DbResultSetDeserializer { +public class DbResultSetDeserializerImpl { private final Class cls; private Deserializer deserializer; - public DbResultSetDeserializer(Class cls) { + public DbResultSetDeserializerImpl(Class cls) { this.cls = cls; } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java index 9a67976b..23160e67 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java @@ -2,7 +2,7 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.MonoSink; -import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializer; +import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializerImpl; import se.fortnox.reactivewizard.db.query.ParameterizedQuery; import java.sql.Connection; @@ -24,13 +24,13 @@ record Wrapper(String nullableValue) {}; """; private static final String QUERY_PLACEHOLDER = "{{sqlQuery}}"; private static final String MONO_NEXT_ERROR = "%s returning a Mono received more than one result from the database"; - private final DbResultSetDeserializer deserializer; + private final DbResultSetDeserializerImpl deserializer; private final String methodName; public SelectStatementFactory(ParameterizedQuery parameterizedQuery, Class returnType) { super(parameterizedQuery); - this.deserializer = new DbResultSetDeserializer(returnType); + this.deserializer = new DbResultSetDeserializerImpl(returnType); this.methodName = parameterizedQuery.getMethodName(); } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java index e6743d7f..11883fb9 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java @@ -1,10 +1,9 @@ package se.fortnox.reactivewizard.db.statement; -import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.MonoSink; import se.fortnox.reactivewizard.db.GeneratedKey; -import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializer; +import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializerImpl; import se.fortnox.reactivewizard.db.query.ParameterizedQuery; import java.sql.Connection; @@ -16,13 +15,13 @@ public class UpdateStatementReturningGeneratedKeyFactory extends AbstractUpdateStatementFactory { - private final DbResultSetDeserializer deserializer; + private final DbResultSetDeserializerImpl deserializer; public UpdateStatementReturningGeneratedKeyFactory(ParameterizedQuery parameterizedQuery, Class keyType, int minimumAffected ) { super(minimumAffected, parameterizedQuery); - this.deserializer = new DbResultSetDeserializer(keyType); + this.deserializer = new DbResultSetDeserializerImpl(keyType); } @Override diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/DbResultSetDeserializerTest.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DbResultSetDeserializerImplTest.java similarity index 89% rename from dao/src/test/java/se/fortnox/reactivewizard/db/DbResultSetDeserializerTest.java rename to dao/src/test/java/se/fortnox/reactivewizard/db/DbResultSetDeserializerImplTest.java index b807ab7b..ce421902 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/DbResultSetDeserializerTest.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DbResultSetDeserializerImplTest.java @@ -5,7 +5,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializer; +import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializerImpl; import java.math.BigDecimal; import java.sql.Array; @@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class DbResultSetDeserializerTest { +class DbResultSetDeserializerImplTest { private final ResultSet rs = mock(ResultSet.class); private final ResultSetMetaData meta = mock(ResultSetMetaData.class); @@ -189,7 +189,7 @@ void shouldDeserializeNullEnum() throws SQLException { @Test void shouldDeserializeImmutableObject() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(ImmutableDbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(ImmutableDbTestObj.class); when(meta.getColumnCount()).thenReturn(5); when(meta.getColumnLabel(1)).thenReturn("sql_val"); when(meta.getColumnLabel(2)).thenReturn("my_bool"); @@ -213,7 +213,7 @@ void shouldDeserializeImmutableObject() throws SQLException { @Test void shouldDeserializeObject() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(3); when(meta.getColumnLabel(1)).thenReturn("sql_val"); when(meta.getColumnLabel(2)).thenReturn("my_bool"); @@ -230,7 +230,7 @@ void shouldDeserializeObject() throws SQLException { @Test void shouldDeserializeObjectRecord() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(3); when(meta.getColumnLabel(1)).thenReturn("sql_val"); when(meta.getColumnLabel(2)).thenReturn("my_bool"); @@ -247,7 +247,7 @@ void shouldDeserializeObjectRecord() throws SQLException { @Test void shouldDeserializeChildObject() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("child.sql_val"); when(rs.getString(1)).thenReturn("MyChildValue"); @@ -258,7 +258,7 @@ void shouldDeserializeChildObject() throws SQLException { @Test void shouldDeserializeChildObjectRecord() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("child.sql_val"); when(rs.getString(1)).thenReturn("MyChildValue"); @@ -269,7 +269,7 @@ void shouldDeserializeChildObjectRecord() throws SQLException { @Test void shouldDeserializeGrandChildObject() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("child.child.sql_val"); when(rs.getString(1)).thenReturn("MyChildValue"); @@ -280,7 +280,7 @@ void shouldDeserializeGrandChildObject() throws SQLException { @Test void shouldDeserializeGrandChildObjectRecord() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("child.child.sql_val"); when(rs.getString(1)).thenReturn("MyChildValue"); @@ -291,7 +291,7 @@ void shouldDeserializeGrandChildObjectRecord() throws SQLException { @Test void shouldDeserializeChildObjectWithoutSetter() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("child.no_getter"); when(rs.getString(1)).thenReturn("MyNonGettableChildValue"); @@ -302,7 +302,7 @@ void shouldDeserializeChildObjectWithoutSetter() throws SQLException { @Test void shouldDeserializePropertyWithoutSetter() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("no_getter"); when(rs.getString(1)).thenReturn("MyNonGettableChildValue"); @@ -313,7 +313,7 @@ void shouldDeserializePropertyWithoutSetter() throws SQLException { @Test void shouldIgnoreUnknownColumnsForRecords() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(2); when(meta.getColumnLabel(1)).thenReturn("sql_val"); when(meta.getColumnLabel(2)).thenReturn("non_existing_prop"); @@ -326,7 +326,7 @@ void shouldIgnoreUnknownColumnsForRecords() throws SQLException { @Test void shouldIgnoreColumnsWithoutSetters() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(2); when(meta.getColumnLabel(1)).thenReturn("non_existing_prop"); when(meta.getColumnLabel(2)).thenReturn("no_getter"); @@ -339,7 +339,7 @@ void shouldIgnoreColumnsWithoutSetters() throws SQLException { @Test void shouldDeserializeObjectWithDoubleProperty() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("double_val"); when(rs.getDouble(1)).thenReturn(63.53d); @@ -350,7 +350,7 @@ void shouldDeserializeObjectWithDoubleProperty() throws SQLException { @Test void shouldDeserializeObjectRecordWithDoubleProperty() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("double_val"); when(rs.getDouble(1)).thenReturn(63.53d); @@ -361,7 +361,7 @@ void shouldDeserializeObjectRecordWithDoubleProperty() throws SQLException { @Test void shouldDeserializeObjectWithNullValues() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(3); when(meta.getColumnLabel(1)).thenReturn("sql_val"); when(meta.getColumnLabel(2)).thenReturn("my_bool"); @@ -379,7 +379,7 @@ void shouldDeserializeObjectWithNullValues() throws SQLException { @Test void shouldDeserializeObjectRecordWithNullValues() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(3); when(meta.getColumnLabel(1)).thenReturn("sql_val"); when(meta.getColumnLabel(2)).thenReturn("my_bool"); @@ -397,7 +397,7 @@ void shouldDeserializeObjectRecordWithNullValues() throws SQLException { @Test void shouldDeserializeCollectionAsJson() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("list_of_strings"); when(rs.getString(1)).thenReturn("[\"one\",\"two\"]"); @@ -411,7 +411,7 @@ void shouldDeserializeCollectionAsJson() throws SQLException { @Test void shouldDeserializeRecordCollectionAsJson() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("list_of_strings"); when(rs.getString(1)).thenReturn("[\"one\",\"two\"]"); @@ -425,7 +425,7 @@ void shouldDeserializeRecordCollectionAsJson() throws SQLException { @Test void shouldDeserializeCollectionAsArray() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnType(1)).thenReturn(Types.ARRAY); when(meta.getColumnLabel(1)).thenReturn("list_of_strings"); @@ -441,7 +441,7 @@ void shouldDeserializeCollectionAsArray() throws SQLException { @Test void shouldDeserializeRecordCollectionAsArray() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnType(1)).thenReturn(Types.ARRAY); when(meta.getColumnLabel(1)).thenReturn("list_of_strings"); @@ -463,7 +463,7 @@ void shouldDeserializeListOfEnums() throws SQLException { when(rs.getString(1)).thenReturn("[\"T1\", \"T2\"]"); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); DbTestObj myTestObj = (DbTestObj)des.deserialize(rs); assertThat(myTestObj.getListOfEnums()).hasSize(2); assertThat(myTestObj.getListOfEnums().getFirst()).isInstanceOf(TestEnum.class); @@ -477,7 +477,7 @@ void shouldDeserializeRecordListOfEnums() throws SQLException { when(rs.getString(1)).thenReturn("[\"T1\", \"T2\"]"); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); DbTestObjRecord myTestObj = (DbTestObjRecord)des.deserialize(rs); assertThat(myTestObj.listOfEnums()).hasSize(2); assertThat(myTestObj.listOfEnums().getFirst()).isInstanceOf(TestEnum.class); @@ -491,7 +491,7 @@ void shouldDeserializeListOfObjects() throws SQLException { when(rs.getString(1)).thenReturn("[{}, {}]"); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); DbTestObj myTestObj = (DbTestObj)des.deserialize(rs); assertThat(myTestObj.getListOfObjects()).hasSize(2); assertThat(myTestObj.getListOfObjects().getFirst()).isInstanceOf(DbTestObj.class); @@ -506,7 +506,7 @@ void shouldDeserializeRecordListOfObjects() throws SQLException { when(rs.getString(1)).thenReturn("[{}, {}]"); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); DbTestObjRecord myTestObj = (DbTestObjRecord)des.deserialize(rs); assertThat(myTestObj.listOfObjects()).hasSize(2); assertThat(myTestObj.listOfObjects().getFirst()).isInstanceOf(DbTestObjRecord.class); @@ -523,7 +523,7 @@ void shouldDeserializeJsonObject() throws SQLException { when(rs.getString(1)).thenReturn("{}"); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); DbTestObj myTestObj = (DbTestObj)des.deserialize(rs); assertThat(myTestObj.getChild()).isNotNull(); } @@ -538,14 +538,14 @@ void shouldDeserializeJsonObjectRecord() throws SQLException { when(rs.getString(1)).thenReturn("{}"); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); DbTestObjRecord myTestObj = (DbTestObjRecord)des.deserialize(rs); assertThat(myTestObj.child()).isNotNull(); } @Test void shouldDeserializeNullCollectionAsNull() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("list_of_strings"); when(rs.getString(1)).thenReturn(null); @@ -557,7 +557,7 @@ void shouldDeserializeNullCollectionAsNull() throws SQLException { @Test void shouldDeserializeRecordNullCollectionAsNull() throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObjRecord.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObjRecord.class); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("list_of_strings"); when(rs.getString(1)).thenReturn(null); @@ -587,14 +587,14 @@ void shouldDeserializeByteArray() throws SQLException { when(rs.getBytes(1)).thenReturn("hello".getBytes()); when(rs.wasNull()).thenReturn(false); - DbResultSetDeserializer des = new DbResultSetDeserializer(DbTestObj.class); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(DbTestObj.class); DbTestObj myTestObj = (DbTestObj)des.deserialize(rs); assertThat(myTestObj.getBytes()).isNotNull(); assertThat(myTestObj.getBytes()).isEqualTo("hello".getBytes()); } private ObjectAssert thenDeserialized(Class cls) throws SQLException { - DbResultSetDeserializer des = new DbResultSetDeserializer(cls); + DbResultSetDeserializerImpl des = new DbResultSetDeserializerImpl(cls); when(meta.getColumnCount()).thenReturn(1); when(meta.getColumnLabel(1)).thenReturn("colname"); when(rs.getMetaData()).thenReturn(meta); diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObj.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObj.java index 3d6f6cab..55e31eaf 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObj.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObj.java @@ -1,6 +1,6 @@ package se.fortnox.reactivewizard.db; -import se.fortnox.reactivewizard.db.DbResultSetDeserializerTest.TestEnum; +import se.fortnox.reactivewizard.db.DbResultSetDeserializerImplTest.TestEnum; import java.util.List; diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObjRecord.java b/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObjRecord.java index 4e3d4ce7..bb207ce0 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObjRecord.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/DbTestObjRecord.java @@ -1,6 +1,6 @@ package se.fortnox.reactivewizard.db; -import se.fortnox.reactivewizard.db.DbResultSetDeserializerTest.TestEnum; +import se.fortnox.reactivewizard.db.DbResultSetDeserializerImplTest.TestEnum; import java.util.List; diff --git a/dao/src/test/java/se/fortnox/reactivewizard/db/ImmutableDbTestObj.java b/dao/src/test/java/se/fortnox/reactivewizard/db/ImmutableDbTestObj.java index e578a7c7..d5a2cbd7 100644 --- a/dao/src/test/java/se/fortnox/reactivewizard/db/ImmutableDbTestObj.java +++ b/dao/src/test/java/se/fortnox/reactivewizard/db/ImmutableDbTestObj.java @@ -1,6 +1,6 @@ package se.fortnox.reactivewizard.db; -import se.fortnox.reactivewizard.db.DbResultSetDeserializerTest.TestEnum; +import se.fortnox.reactivewizard.db.DbResultSetDeserializerImplTest.TestEnum; import java.util.List; From 73631f623c9c120458d793811b380edb35fff5f3 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 17:52:01 +0100 Subject: [PATCH 5/7] Add DbResultSetDeserializer interface and generic types --- .../deserializing/DbResultSetDeserializer.java | 8 ++++++++ .../DbResultSetDeserializerImpl.java | 14 +++++++------- .../deserializing/JacksonObjectDeserializer.java | 16 ++++++++-------- .../deserializing/SimpleObjectDeserializer.java | 14 +++++++------- 4 files changed, 30 insertions(+), 22 deletions(-) create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java new file mode 100644 index 00000000..bbf5a7ad --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializer.java @@ -0,0 +1,8 @@ +package se.fortnox.reactivewizard.db.deserializing; + +import java.sql.ResultSet; + +public interface DbResultSetDeserializer { + + T deserialize(ResultSet rs); +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java index ef3c13c8..966de996 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/DbResultSetDeserializerImpl.java @@ -6,11 +6,11 @@ import static se.fortnox.reactivewizard.db.deserializing.MutabilityDetector.isImmutable; -public class DbResultSetDeserializerImpl { - private final Class cls; - private Deserializer deserializer; +public class DbResultSetDeserializerImpl implements DbResultSetDeserializer { + private final Class cls; + private Deserializer deserializer; - public DbResultSetDeserializerImpl(Class cls) { + public DbResultSetDeserializerImpl(Class cls) { this.cls = cls; } @@ -19,7 +19,7 @@ public DbResultSetDeserializerImpl(Class cls) { * @param rs the ResultSet * @return the result */ - public Object deserialize(ResultSet rs) { + public T deserialize(ResultSet rs) { try { if (deserializer == null) { deserializer = createDeserializer(cls, rs); @@ -30,7 +30,7 @@ public Object deserialize(ResultSet rs) { } } - private Deserializer createDeserializer(Class cls, ResultSet recordSet) throws SQLException { + private Deserializer createDeserializer(Class cls, ResultSet recordSet) throws SQLException { ResultSetMetaData metaData = recordSet.getMetaData(); if (deserializer == null) { deserializer = ColumnDeserializerFactory.getColumnDeserializer(cls, recordSet.getMetaData().getColumnType(1), 1); @@ -41,7 +41,7 @@ private Deserializer createDeserializer(Class cls, ResultSet recordSet) throw return deserializer; } - private Deserializer createDeserializer(Class cls, ResultSetMetaData metaData) throws SQLException { + private Deserializer createDeserializer(Class cls, ResultSetMetaData metaData) throws SQLException { return isImmutable(cls) ? JacksonObjectDeserializer.create(cls, metaData) : SimpleObjectDeserializer.create(cls, metaData); diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/JacksonObjectDeserializer.java b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/JacksonObjectDeserializer.java index 998c4d5b..414ec546 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/JacksonObjectDeserializer.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/JacksonObjectDeserializer.java @@ -18,27 +18,27 @@ * The values from the ResultSet is put into a property map (instead of for example JSON) which is then used to create * the POJO. */ -public class JacksonObjectDeserializer implements Deserializer { +public class JacksonObjectDeserializer implements Deserializer { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .findAndRegisterModules() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .setDateFormat(new StdDateFormat()); - private final Class targetClass; - private final Map propertyDeserializers; + private final Class targetClass; + private final Map> propertyDeserializers; - private JacksonObjectDeserializer(Class targetClass, Map propertyDeserializers) { + private JacksonObjectDeserializer(Class targetClass, Map> propertyDeserializers) { this.targetClass = targetClass; this.propertyDeserializers = propertyDeserializers; } - public static Deserializer create(Class cls, ResultSetMetaData metaData) throws SQLException { - return new JacksonObjectDeserializer(cls, + public static Deserializer create(Class cls, ResultSetMetaData metaData) throws SQLException { + return new JacksonObjectDeserializer(cls, DeserializerUtil.createPropertyDeserializers(cls, metaData, (propertyResolver, deserializer) -> deserializer)); } @Override - public Optional deserialize(ResultSet rs) + public Optional deserialize(ResultSet rs) throws SQLException, InvocationTargetException, IllegalAccessException, InstantiationException { Map propertyMap = createPropertyMap(rs); @@ -50,7 +50,7 @@ private Map createPropertyMap(ResultSet rs) throws SQLException, InvocationTargetException, IllegalAccessException, InstantiationException { Map root = new HashMap<>(); - for (Map.Entry entry : propertyDeserializers.entrySet()) { + for (Map.Entry> entry : propertyDeserializers.entrySet()) { String[] path = entry.getKey(); Deserializer deserializer = entry.getValue(); diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/SimpleObjectDeserializer.java b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/SimpleObjectDeserializer.java index f5e366b6..d23c4797 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/SimpleObjectDeserializer.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/deserializing/SimpleObjectDeserializer.java @@ -25,16 +25,16 @@ public class SimpleObjectDeserializer { * @return the deserializer * @throws SQLException on error */ - public static Deserializer create(Class cls, ResultSetMetaData metaData) throws SQLException { + public static Deserializer create(Class cls, ResultSetMetaData metaData) throws SQLException { Map deserializers = DeserializerUtil.createPropertyDeserializers( - cls, - metaData, - SimpleObjectDeserializer::createRecordPropertyDeserializer); + cls, + metaData, + SimpleObjectDeserializer::createRecordPropertyDeserializer); Supplier instantiator = ReflectionUtil.instantiator(cls); return (rs) -> { - Object object = instantiator.get(); + I object = instantiator.get(); for (PropertyDeserializer propertyDeserializer : deserializers.values()) { propertyDeserializer.deserialize(rs, object); } @@ -43,8 +43,8 @@ public static Deserializer create(Class cls, ResultSetMetaData metaData) } private static PropertyDeserializer createRecordPropertyDeserializer( - PropertyResolver propertyResolver, - Deserializer deserializer) { + PropertyResolver propertyResolver, + Deserializer deserializer) { BiConsumer setter = propertyResolver.setter(); return (rs, obj) -> setter.accept(obj, deserializer.deserialize(rs).orElse(null)); } From f04ec445dd3f0ab132b7122cfaac94e420a76982 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 18:26:27 +0100 Subject: [PATCH 6/7] Add Statement implementation that is independent from ParameterizedQuery --- .../reactivewizard/db/query/ParamSetter.java | 7 ++ .../db/query/ParamSetterFactory.java | 48 ++++++++++ .../db/query/PreparedStatementParameters.java | 34 ++++++- .../db/statement/SelectSqlStatement.java | 93 +++++++++++++++++++ .../db/statement/SqlStatement.java | 77 +++++++++++++++ .../UpdateBatchableSqlStatement.java | 52 +++++++++++ .../UpdateReturningCountSqlStatement.java | 27 ++++++ ...dateReturningGeneratedKeySqlStatement.java | 64 +++++++++++++ .../UpdateReturningVoidSqlStatement.java | 25 +++++ .../db/statement/UpdateSqlStatement.java | 25 +++++ 10 files changed, 450 insertions(+), 2 deletions(-) create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetter.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetterFactory.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectSqlStatement.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/SqlStatement.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateBatchableSqlStatement.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningCountSqlStatement.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningGeneratedKeySqlStatement.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningVoidSqlStatement.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateSqlStatement.java diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetter.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetter.java new file mode 100644 index 00000000..79ee6410 --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetter.java @@ -0,0 +1,7 @@ +package se.fortnox.reactivewizard.db.query; + +import java.sql.SQLException; + +public interface ParamSetter { + void call(PreparedStatementParameters parameters) throws SQLException; +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetterFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetterFactory.java new file mode 100644 index 00000000..b8c2b0aa --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParamSetterFactory.java @@ -0,0 +1,48 @@ +package se.fortnox.reactivewizard.db.query; + +import se.fortnox.reactivewizard.json.JsonSerializerFactory; +import se.fortnox.reactivewizard.util.ReflectionUtil; + +import java.lang.reflect.Type; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.YearMonth; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public interface ParamSetterFactory { + ParamSetter create(Object value); + + JsonSerializerFactory JSON_SERIALIZER_FACTORY = new JsonSerializerFactory(); + + static ParamSetterFactory forType(Type type, Supplier> listElementTypeSelector) { + Class rawType = ReflectionUtil.getRawType(type); + if (LocalDate.class.isAssignableFrom(rawType)) { + return (value) -> (parameters) -> parameters.addLocalDate((LocalDate)value); + } else if (LocalTime.class.isAssignableFrom(rawType)) { + return (value) -> (parameters) -> parameters.addLocalTime((LocalTime)value); + } else if (LocalDateTime.class.isAssignableFrom(rawType)) { + return (value) -> (parameters) -> parameters.addLocalDateTime((LocalDateTime)value); + } else if (YearMonth.class.isAssignableFrom(rawType)) { + return (value) -> (parameters) -> parameters.addYearMonth((YearMonth) value); + } else if (List.class.isAssignableFrom(rawType)) { + Optional listElementType = listElementTypeSelector.get(); + if (!listElementType.isPresent()) { + var jsonSerializer = JSON_SERIALIZER_FACTORY.createStringSerializer(type); + return (value) -> (parameters) -> parameters.addSerializable(value, jsonSerializer); + } + String elementType = listElementType.get(); + return (value) -> (parameters) -> parameters.addArray(elementType, (List)value); + } else if (rawType.isEnum()) { + return (value) -> (parameters) -> parameters.addEnum((Enum)value); + } else if (Map.class.isAssignableFrom(rawType)) { + var jsonSerializer = JSON_SERIALIZER_FACTORY.createStringSerializer(Map.class); + return (value) -> (parameters) -> parameters.addSerializable((Map)value, jsonSerializer); + } else { + return (value) -> (parameters) -> parameters.addObject(value); + } + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/PreparedStatementParameters.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/PreparedStatementParameters.java index 8d708bcf..9c27841a 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/query/PreparedStatementParameters.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/PreparedStatementParameters.java @@ -6,18 +6,48 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.YearMonth; import java.util.Calendar; import java.sql.Date; import java.util.List; +import java.util.function.Function; public class PreparedStatementParameters { - private PreparedStatement preparedStatement; - private int parameterIndex = 1; + + private final PreparedStatement preparedStatement; + private int parameterIndex = 1; public PreparedStatementParameters(PreparedStatement preparedStatement) { this.preparedStatement = preparedStatement; } + public void addLocalDate(LocalDate ld) throws SQLException { + addDate(Date.valueOf(ld)); + } + + public void addLocalTime(LocalTime lt) throws SQLException { + addTime(Time.valueOf(lt)); + } + + public void addLocalDateTime(LocalDateTime ldt) throws SQLException { + addTimestamp(Timestamp.valueOf(ldt)); + } + + public void addYearMonth(YearMonth ym) throws SQLException { + addObject(ym.getYear() * 100 + ym.getMonthValue()); + } + + public void addEnum(Enum en) throws SQLException { + addObject(en.name()); + } + + public void addSerializable(T value, Function jsonSerializer) throws SQLException { + addObject(jsonSerializer.apply(value)); + } + public void addNull() throws SQLException { preparedStatement.setNull(parameterIndex++, java.sql.Types.NULL); } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectSqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectSqlStatement.java new file mode 100644 index 00000000..8127403a --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectSqlStatement.java @@ -0,0 +1,93 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializer; +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import static java.lang.String.format; + +public class SelectSqlStatement extends SqlStatement { + + private static final String NULL_VALUE_ERROR = """ + One or more of the values returned in the resultset of the following query was null: + %s + + Project Reactor does not allow emitting null values in a stream. Wrap the return value from the dao interface + in a 'wrapper' to solve the issue. + Example: + record Wrapper(String nullableValue) {}; + """; + + private static final String MONO_NEXT_ERROR = "%s returning a Mono received more than one result from the database"; + + private final DbResultSetDeserializer deserializer; + + private final String methodName; + private final String rawSql; + + public SelectSqlStatement(String sql, List paramSetters, + DbResultSetDeserializer deserializer) { + this(sql, paramSetters, deserializer, "Query\n" + sql + "\n", sql); + } + + public SelectSqlStatement(String sql, List paramSetters, + DbResultSetDeserializer deserializer, String methodName, String rawSql) { + super(sql, paramSetters); + this.deserializer = deserializer; + this.methodName = methodName; + this.rawSql = rawSql; + } + + @Override + public void execute(Connection connection) throws SQLException { + try (var preparedStatement = createPreparedStatement(connection)) { + addParameters(preparedStatement); + try (var resultSet = preparedStatement.executeQuery()) { + if (fluxSink != null) { + while (resultSet.next()) { + fluxSink.next(deserialize(resultSet)); + } + } else if (monoSink != null) { + if (resultSet.next()) { + var object = deserialize(resultSet); + if (resultSet.next()) { + throw new RuntimeException(format(MONO_NEXT_ERROR, methodName)); + } + monoSink.success(object); + } else { + monoSink.success(); + } + } + } + StatementDebug.log(preparedStatement); + } + } + + private T deserialize(ResultSet resultSet) { + var value = deserializer.deserialize(resultSet); + if (value == null) { + throw new NullPointerException(format(NULL_VALUE_ERROR, rawSql)); + } + return value; + } + + @Override + public PreparedStatement batch(Connection connection, PreparedStatement preparedStatement) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void batchExecuted(int count) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean sameBatch(Statement statement) { + return false; + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SqlStatement.java new file mode 100644 index 00000000..c7b535c3 --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SqlStatement.java @@ -0,0 +1,77 @@ +package se.fortnox.reactivewizard.db.statement; + +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.MonoSink; +import se.fortnox.reactivewizard.db.query.ParamSetter; +import se.fortnox.reactivewizard.db.query.PreparedStatementParameters; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +import static java.sql.Statement.RETURN_GENERATED_KEYS; + +public abstract class SqlStatement implements Statement { + + protected final String sql; + + private final List paramSetters; + + protected FluxSink fluxSink; + + protected MonoSink monoSink; + + protected SqlStatement(String sql, List paramSetters) { + this.sql = sql; + this.paramSetters = paramSetters; + } + + protected PreparedStatement createPreparedStatement(Connection connection) throws SQLException { + return connection.prepareStatement(sql); + } + + protected PreparedStatement createPreparedStatementWithGeneratedKeys(Connection connection) throws SQLException { + return connection.prepareStatement(sql, RETURN_GENERATED_KEYS); + } + + protected void addParameters(PreparedStatement preparedStatement) throws SQLException { + var parameters = new PreparedStatementParameters(preparedStatement); + for (var paramSetter : paramSetters) { + paramSetter.call(parameters); + } + } + + @Override + public void onCompleted() { + if (fluxSink != null) { + fluxSink.complete(); + } + } + + @Override + public void onError(Throwable throwable) { + if (fluxSink != null) { + fluxSink.error(throwable); + } + if (monoSink != null) { + monoSink.error(throwable); + } + } + + @Override + public void setFluxSink(FluxSink fluxSink) { + if (monoSink != null) { + throw new UnsupportedOperationException("Cannot set FluxSink if MonoSink is set"); + } + this.fluxSink = (FluxSink) fluxSink; + } + + @Override + public void setMonoSink(MonoSink monoSink) { + if (fluxSink != null) { + throw new UnsupportedOperationException("Cannot set MonoSink if FluxSink is set"); + } + this.monoSink = (MonoSink) monoSink; + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateBatchableSqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateBatchableSqlStatement.java new file mode 100644 index 00000000..2e8b05f4 --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateBatchableSqlStatement.java @@ -0,0 +1,52 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; + +public abstract class UpdateBatchableSqlStatement extends UpdateSqlStatement { + + protected UpdateBatchableSqlStatement(String sql, List paramSetters, + int minimumAffected, String rawSql) { + super(sql, paramSetters, minimumAffected, rawSql); + } + + @Override + public void execute(Connection connection) throws SQLException { + try (var preparedStatement = createPreparedStatement(connection)) { + addParameters(preparedStatement); + int count = preparedStatement.executeUpdate(); + ensureMinimumReached(count); + executed(count); + StatementDebug.log(preparedStatement); + } + } + + protected abstract void executed(int count); + + @Override + public PreparedStatement batch(Connection connection, PreparedStatement preparedStatement) throws SQLException { + if (preparedStatement == null) { + preparedStatement = createPreparedStatement(connection); + } + addParameters(preparedStatement); + preparedStatement.addBatch(); + return preparedStatement; + } + + @Override + public void batchExecuted(int count) throws SQLException { + ensureMinimumReached(count); + } + + @Override + public boolean sameBatch(Statement statement) { + if (!(statement instanceof UpdateBatchableSqlStatement batchableStatement)) { + return false; + } + return batchableStatement.sql.equals(this.sql); + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningCountSqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningCountSqlStatement.java new file mode 100644 index 00000000..e6aa833f --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningCountSqlStatement.java @@ -0,0 +1,27 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.util.List; + +public class UpdateReturningCountSqlStatement extends UpdateBatchableSqlStatement { + + public UpdateReturningCountSqlStatement(String sql, List paramSetters, + int minimumAffected) { + this(sql, paramSetters, minimumAffected, sql); + } + + public UpdateReturningCountSqlStatement(String sql, List paramSetters, + int minimumAffected, String rawSql) { + super(sql, paramSetters, minimumAffected, rawSql); + } + + @Override + protected void executed(int count) { + if (fluxSink != null) { + fluxSink.next(count); + } else if (monoSink != null) { + monoSink.success(count); + } + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningGeneratedKeySqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningGeneratedKeySqlStatement.java new file mode 100644 index 00000000..3b114b07 --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningGeneratedKeySqlStatement.java @@ -0,0 +1,64 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.GeneratedKey; +import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializer; +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +public class UpdateReturningGeneratedKeySqlStatement extends UpdateSqlStatement> { + + private final DbResultSetDeserializer deserializer; + + public UpdateReturningGeneratedKeySqlStatement(String sql, List paramSetters, + int minimumAffected, DbResultSetDeserializer deserializer) { + this(sql, paramSetters, minimumAffected, sql, deserializer); + } + + public UpdateReturningGeneratedKeySqlStatement(String sql, List paramSetters, + int minimumAffected, String rawSql, DbResultSetDeserializer deserializer) { + super(sql, paramSetters, minimumAffected, rawSql); + this.deserializer = deserializer; + } + + @Override + public void execute(Connection connection) throws SQLException { + try (var preparedStatement = createPreparedStatementWithGeneratedKeys(connection)) { + addParameters(preparedStatement); + ensureMinimumReached(preparedStatement.executeUpdate()); + try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) { + if (fluxSink != null) { + while (resultSet.next()) { + fluxSink.next(() -> deserializer.deserialize(resultSet)); + } + } else if (monoSink != null) { + if (resultSet.next()) { + monoSink.success(() -> deserializer.deserialize(resultSet)); + } else { + monoSink.success(); + } + } + } + StatementDebug.log(preparedStatement); + } + } + + @Override + public PreparedStatement batch(Connection connection, PreparedStatement preparedStatement) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void batchExecuted(int count) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean sameBatch(Statement statement) { + return false; + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningVoidSqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningVoidSqlStatement.java new file mode 100644 index 00000000..2b22763b --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateReturningVoidSqlStatement.java @@ -0,0 +1,25 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.util.List; + +public class UpdateReturningVoidSqlStatement extends UpdateBatchableSqlStatement { + + public UpdateReturningVoidSqlStatement(String sql, List paramSetters, + int minimumAffected) { + this(sql, paramSetters, minimumAffected, sql); + } + + public UpdateReturningVoidSqlStatement(String sql, List paramSetters, + int minimumAffected, String rawSql) { + super(sql, paramSetters, minimumAffected, rawSql); + } + + @Override + protected void executed(int count) { + if (monoSink != null) { + monoSink.success(); + } + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateSqlStatement.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateSqlStatement.java new file mode 100644 index 00000000..47c55f9e --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateSqlStatement.java @@ -0,0 +1,25 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.sql.SQLException; +import java.util.List; + +public abstract class UpdateSqlStatement extends SqlStatement { + + private final int minimumAffected; + private final String rawSql; + + protected UpdateSqlStatement(String sql, List paramSetters, + int minimumAffected, String rawSql) { + super(sql, paramSetters); + this.minimumAffected = minimumAffected; + this.rawSql = rawSql; + } + + protected void ensureMinimumReached(int updateCount) throws SQLException { + if (updateCount < minimumAffected) { + throw new MinimumAffectedRowsException(minimumAffected, updateCount, rawSql); + } + } +} From a499e3a3984608f706015305b4a1d4df11726354 Mon Sep 17 00:00:00 2001 From: Pavlo Liapota Date: Sat, 10 Feb 2024 19:22:13 +0100 Subject: [PATCH 7/7] Make StatementFactory create new Statement implementation --- .../db/query/ParameterizedQuery.java | 40 ++---- .../parts/CollectionOptionsQueryPart.java | 6 +- .../db/query/parts/ParamQueryPart.java | 84 +++--------- .../db/query/parts/QueryPart.java | 6 +- .../db/query/parts/StaticQueryPart.java | 7 +- .../statement/AbstractDbStatementFactory.java | 123 ------------------ .../AbstractUpdateStatementFactory.java | 25 ---- .../statement/DbStatementFactoryFactory.java | 46 ++++++- .../statement/DbStatementFactoryWrapper.java | 23 ++++ .../db/statement/SelectStatementFactory.java | 91 ------------- ...tatementExecutorReturningCountFactory.java | 68 ---------- ...StatementReturningGeneratedKeyFactory.java | 60 --------- .../UpdateStatementReturningVoidFactory.java | 27 ---- 13 files changed, 103 insertions(+), 503 deletions(-) delete mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractDbStatementFactory.java delete mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractUpdateStatementFactory.java create mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryWrapper.java delete mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java delete mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementExecutorReturningCountFactory.java delete mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java delete mode 100644 dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningVoidFactory.java diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParameterizedQuery.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParameterizedQuery.java index 9ddae799..129d53da 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParameterizedQuery.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/ParameterizedQuery.java @@ -9,8 +9,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Parameter; import java.lang.reflect.Type; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -93,46 +91,32 @@ protected DynamicQueryPart getDynamicQueryPart(String paramName, Map buildParamSetters(Object[] args) { + var list = new ArrayList(); for (QueryPart part : queryParts) { - part.addParams(parameters, args); - } - } - - private PreparedStatement createPreparedStatement(Connection connection, Integer options, String sql) throws SQLException { - if (options == null) { - return connection.prepareStatement(sql); + part.addParamSetter(list, args); } - return connection.prepareStatement(sql, options); + return list; } protected String sqlPreProcess(String sqlInp) { diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/CollectionOptionsQueryPart.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/CollectionOptionsQueryPart.java index 45afef38..3a325bdd 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/CollectionOptionsQueryPart.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/CollectionOptionsQueryPart.java @@ -2,12 +2,12 @@ import se.fortnox.reactivewizard.CollectionOptions; import se.fortnox.reactivewizard.db.Query; -import se.fortnox.reactivewizard.db.query.PreparedStatementParameters; +import se.fortnox.reactivewizard.db.query.ParamSetter; import se.fortnox.reactivewizard.util.CamelSnakeConverter; import se.fortnox.reactivewizard.util.FluxRxConverter; import java.lang.reflect.Method; -import java.sql.SQLException; +import java.util.List; import static com.google.common.collect.Iterables.indexOf; import static java.util.Arrays.asList; @@ -100,6 +100,6 @@ private void addOrderBy(StringBuilder sql, String orderBy) { } @Override - public void addParams(PreparedStatementParameters preparedStatement, Object[] args) throws SQLException { + public void addParamSetter(List paramSetters, Object[] args) { } } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/ParamQueryPart.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/ParamQueryPart.java index 7d5e282e..9f83e4c2 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/ParamQueryPart.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/ParamQueryPart.java @@ -1,44 +1,38 @@ package se.fortnox.reactivewizard.db.query.parts; -import se.fortnox.reactivewizard.db.query.PreparedStatementParameters; -import se.fortnox.reactivewizard.json.JsonSerializerFactory; +import se.fortnox.reactivewizard.db.query.ParamSetter; +import se.fortnox.reactivewizard.db.query.ParamSetterFactory; import se.fortnox.reactivewizard.util.PropertyResolver; import se.fortnox.reactivewizard.util.ReflectionUtil; import java.lang.reflect.Type; import java.sql.SQLException; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.YearMonth; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.function.Function; public class ParamQueryPart implements DynamicQueryPart { - private static final JsonSerializerFactory JSON_SERIALIZER_FACTORY = new JsonSerializerFactory(); - protected final PropertyResolver argResolver; - protected final int argIndex; - private final PreparedStatementParamSetter paramSetter; - private final Function getter; + protected final PropertyResolver argResolver; + protected final int argIndex; + private final ParamSetterFactory paramSetterFactory; + private final Function getter; - public ParamQueryPart(int argIndex, Type cls) throws SQLException { + public ParamQueryPart(int argIndex, Type cls) { this(argIndex, ReflectionUtil.getPropertyResolver(cls, new String[0]).get()); } - protected ParamQueryPart(ParamQueryPart paramQueryPart) throws SQLException { + protected ParamQueryPart(ParamQueryPart paramQueryPart) { this(paramQueryPart.argIndex, paramQueryPart.argResolver); } - protected ParamQueryPart(int argIndex, PropertyResolver argResolver) throws SQLException { + protected ParamQueryPart(int argIndex, PropertyResolver argResolver) { this.argIndex = argIndex; this.argResolver = argResolver; this.getter = argResolver.getter(); - paramSetter = createParamSetter(argResolver.getPropertyGenericType()); + paramSetterFactory = createParamSetterFactory(argResolver.getPropertyGenericType()); } @Override @@ -47,12 +41,12 @@ public void visit(StringBuilder sql, Object[] args) { } @Override - public void addParams(PreparedStatementParameters parameters, Object[] args) throws SQLException { - Object val = getValue(args); + public void addParamSetter(List paramSetters, Object[] args) { + var val = getValue(args); if (val == null) { - parameters.addNull(); + paramSetters.add((parameters) -> parameters.addNull()); } else { - paramSetter.call(parameters, val); + paramSetters.add(paramSetterFactory.create(val)); } } @@ -70,50 +64,8 @@ public DynamicQueryPart subPath(String[] subPath) throws SQLException { return new ParamQueryPart(argIndex, propertyResolver.get()); } - protected PreparedStatementParamSetter createParamSetter(Type type) throws SQLException { - Class rawType = ReflectionUtil.getRawType(type); - if (LocalDate.class.isAssignableFrom(rawType)) { - return (parameters, value) -> { - java.sql.Date sqlDate = java.sql.Date.valueOf((LocalDate)value); - parameters.addDate(sqlDate); - }; - } else if (LocalTime.class.isAssignableFrom(rawType)) { - return (parameters, value) -> { - java.sql.Time sqlTime = java.sql.Time.valueOf((LocalTime)value); - parameters.addTime(sqlTime); - }; - } else if (LocalDateTime.class.isAssignableFrom(rawType)) { - return (parameters, value) -> { - java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf((LocalDateTime)value); - parameters.addTimestamp(sqlTimestamp); - }; - } else if (YearMonth.class.isAssignableFrom(rawType)) { - return (parameters, value) -> { - YearMonth yearMonth = (YearMonth) value; - parameters.addObject(yearMonth.getYear() * 100 + yearMonth.getMonthValue()); - }; - } else if (List.class.isAssignableFrom(rawType)) { - Optional listElementType = getListElementType(type); - if (!listElementType.isPresent()) { - Function jsonSerializer = JSON_SERIALIZER_FACTORY.createStringSerializer(type); - return (parameters, value) -> parameters.addObject(jsonSerializer.apply(value)); - } - String elementType = listElementType.get(); - return (parameters, value) -> { - List list = (List)value; - parameters.addArray(elementType, list); - }; - } else if (rawType.isEnum()) { - return (parameters, value) -> { - Enum enumValue = (Enum)value; - parameters.addObject(enumValue.name()); - }; - } else if (Map.class.isAssignableFrom(rawType)) { - Function jsonSerializer = JSON_SERIALIZER_FACTORY.createStringSerializer(Map.class); - return (parameters, value) -> parameters.addObject(jsonSerializer.apply((Map)value)); - } else { - return (parameters, value) -> parameters.addObject(value); - } + protected ParamSetterFactory createParamSetterFactory(Type type) { + return ParamSetterFactory.forType(type, () -> getListElementType(type)); } private Optional getListElementType(Type type) { @@ -130,8 +82,4 @@ private Optional getListElementType(Type type) { return Optional.empty(); } } - - protected interface PreparedStatementParamSetter { - void call(PreparedStatementParameters parameters, Object value) throws SQLException; - } } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/QueryPart.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/QueryPart.java index b185754b..d6f5c9fb 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/QueryPart.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/QueryPart.java @@ -1,12 +1,12 @@ package se.fortnox.reactivewizard.db.query.parts; -import se.fortnox.reactivewizard.db.query.PreparedStatementParameters; +import se.fortnox.reactivewizard.db.query.ParamSetter; -import java.sql.SQLException; +import java.util.List; public interface QueryPart { void visit(StringBuilder sql, Object[] args); - void addParams(PreparedStatementParameters preparedStatement, Object[] args) throws SQLException; + void addParamSetter(List paramSetters, Object[] args); } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/StaticQueryPart.java b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/StaticQueryPart.java index 645c5a19..9f324c0c 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/StaticQueryPart.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/query/parts/StaticQueryPart.java @@ -1,6 +1,8 @@ package se.fortnox.reactivewizard.db.query.parts; -import se.fortnox.reactivewizard.db.query.PreparedStatementParameters; +import se.fortnox.reactivewizard.db.query.ParamSetter; + +import java.util.List; public class StaticQueryPart implements QueryPart { @@ -16,7 +18,6 @@ public void visit(StringBuilder sql, Object[] args) { } @Override - public void addParams(PreparedStatementParameters preparedStatement, Object[] args) { - + public void addParamSetter(List paramSetters, Object[] args) { } } diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractDbStatementFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractDbStatementFactory.java deleted file mode 100644 index 9850380e..00000000 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractDbStatementFactory.java +++ /dev/null @@ -1,123 +0,0 @@ -package se.fortnox.reactivewizard.db.statement; - -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.MonoSink; -import se.fortnox.reactivewizard.db.query.ParameterizedQuery; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public abstract class AbstractDbStatementFactory implements DbStatementFactory { - protected final ParameterizedQuery parameterizedQuery; - - protected AbstractDbStatementFactory(ParameterizedQuery parameterizedQuery) { - this.parameterizedQuery = parameterizedQuery; - } - - protected abstract void executeStatement(Connection connection, Object[] args, FluxSink fluxSink) - throws SQLException; - - protected abstract void executeStatement(Connection connection, Object[] args, MonoSink monoSink) - throws SQLException; - - protected PreparedStatement batch(Connection connection, PreparedStatement preparedStatement, Object[] args) throws SQLException { - throw new UnsupportedOperationException(); - } - - protected void batchExecuted(int count, FluxSink fluxSink) throws SQLException { - throw new UnsupportedOperationException(); - } - - protected void batchExecuted(int count, MonoSink monoSink) { - throw new UnsupportedOperationException(); - } - - protected boolean sameBatch(ParameterizedQuery parameterizedQuery) { - return false; - } - - @Override - public Statement create(Object[] args) { - return new StatementImpl(args, parameterizedQuery); - } - - private class StatementImpl implements Statement { - - private final Object[] args; - private final ParameterizedQuery parameterizedQuery; - private FluxSink fluxSink; - private MonoSink monoSink; - - private StatementImpl(Object[] args, ParameterizedQuery parameterizedQuery) { - this.args = args; - this.parameterizedQuery = parameterizedQuery; - } - - @Override - public void execute(Connection connection) throws SQLException { - if (fluxSink != null) { - executeStatement(connection, args, fluxSink); - } else { - executeStatement(connection, args, monoSink); - } - } - - @Override - public PreparedStatement batch(Connection connection, PreparedStatement preparedStatement) throws SQLException { - return AbstractDbStatementFactory.this.batch(connection, preparedStatement, args); - } - - @Override - public void batchExecuted(int count) throws SQLException { - if (monoSink != null) { - AbstractDbStatementFactory.this.batchExecuted(count, monoSink); - } else { - AbstractDbStatementFactory.this.batchExecuted(count, fluxSink); - } - } - - @Override - public boolean sameBatch(Statement statement) { - if (!(statement instanceof StatementImpl statementImpl)) { - return false; - } - return AbstractDbStatementFactory.this.sameBatch(statementImpl.parameterizedQuery); - } - - @Override - public void onCompleted() { - if (fluxSink != null) { - fluxSink.complete(); - } - } - - @Override - public void onError(Throwable throwable) { - if (fluxSink != null) { - fluxSink.error(throwable); - } - if (monoSink != null) { - monoSink.error(throwable); - } - - } - - @Override - public void setFluxSink(FluxSink fluxSink) { - if (monoSink != null) { - throw new UnsupportedOperationException("Cannot set FluxSink if MonoSink is set"); - } - this.fluxSink = fluxSink; - } - - @Override - public void setMonoSink(MonoSink monoSink) { - if (fluxSink != null) { - throw new UnsupportedOperationException("Cannot set MonoSink if FluxSink is set"); - } - this.monoSink = monoSink; - } - } - -} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractUpdateStatementFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractUpdateStatementFactory.java deleted file mode 100644 index 6b856ab7..00000000 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/AbstractUpdateStatementFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -package se.fortnox.reactivewizard.db.statement; - -import se.fortnox.reactivewizard.db.query.ParameterizedQuery; - -import java.sql.SQLException; - -public abstract class AbstractUpdateStatementFactory extends AbstractDbStatementFactory { - private final int minimumAffected; - - public AbstractUpdateStatementFactory(int minimumAffected, ParameterizedQuery parameterizedQuery) { - super(parameterizedQuery); - this.minimumAffected = minimumAffected; - } - - protected void ensureMinimumReached(int updateCount) throws SQLException { - if (updateCount < minimumAffected) { - throw new MinimumAffectedRowsException(minimumAffected, updateCount, toString()); - } - } - - @Override - public String toString() { - return parameterizedQuery.toString(); - } -} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryFactory.java index cb895afe..b481c642 100644 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryFactory.java +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryFactory.java @@ -4,6 +4,7 @@ import se.fortnox.reactivewizard.db.GeneratedKey; import se.fortnox.reactivewizard.db.Query; import se.fortnox.reactivewizard.db.Update; +import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializerImpl; import se.fortnox.reactivewizard.db.query.ParameterizedQuery; import se.fortnox.reactivewizard.util.ReflectionUtil; @@ -31,20 +32,57 @@ public DbStatementFactory createStatementFactory(Method method) throws SQLExcept Class cls = ReflectionUtil.getRawType(returnType); for (Annotation annotation : method.getAnnotations()) { if (annotation instanceof Query queryAnnotation) { - return new SelectStatementFactory(createParameterizedQuery(queryAnnotation.value(), method), cls); + ParameterizedQuery parameterizedQuery = createParameterizedQuery(queryAnnotation.value(), method); + var deserializer = new DbResultSetDeserializerImpl<>(cls); + return new DbStatementFactoryWrapper( + parameterizedQuery, + (args) -> new SelectSqlStatement<>( + parameterizedQuery.buildSql(args), + parameterizedQuery.buildParamSetters(args), + deserializer, + parameterizedQuery.getMethodName(), + parameterizedQuery.toString() + ) + ); } else if (annotation instanceof Update updateAnnotation) { ParameterizedQuery parameterizedQuery = createParameterizedQuery(updateAnnotation.value(), method); if (GeneratedKey.class.isAssignableFrom(cls)) { Class keyType = (Class)((ParameterizedType)returnType).getActualTypeArguments()[0]; - return new UpdateStatementReturningGeneratedKeyFactory(parameterizedQuery, keyType, updateAnnotation.minimumAffected()); + var deserializer = new DbResultSetDeserializerImpl<>(keyType); + return new DbStatementFactoryWrapper( + parameterizedQuery, + (args) -> new UpdateReturningGeneratedKeySqlStatement<>( + parameterizedQuery.buildSql(args), + parameterizedQuery.buildParamSetters(args), + updateAnnotation.minimumAffected(), + parameterizedQuery.toString(), + deserializer + ) + ); } else if (Integer.class.isAssignableFrom(cls)) { - return new UpdateStatementExecutorReturningCountFactory(parameterizedQuery, updateAnnotation.minimumAffected()); + return new DbStatementFactoryWrapper( + parameterizedQuery, + (args) -> new UpdateReturningCountSqlStatement( + parameterizedQuery.buildSql(args), + parameterizedQuery.buildParamSetters(args), + updateAnnotation.minimumAffected(), + parameterizedQuery.toString() + ) + ); } else if (Void.class.isAssignableFrom(cls)) { - return new UpdateStatementReturningVoidFactory(parameterizedQuery, updateAnnotation.minimumAffected()); + return new DbStatementFactoryWrapper( + parameterizedQuery, + (args) -> new UpdateReturningVoidSqlStatement( + parameterizedQuery.buildSql(args), + parameterizedQuery.buildParamSetters(args), + updateAnnotation.minimumAffected(), + parameterizedQuery.toString() + ) + ); } else { throw new RuntimeException("Unsupported return type for Update"); diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryWrapper.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryWrapper.java new file mode 100644 index 00000000..491070cb --- /dev/null +++ b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/DbStatementFactoryWrapper.java @@ -0,0 +1,23 @@ +package se.fortnox.reactivewizard.db.statement; + +import se.fortnox.reactivewizard.db.query.ParameterizedQuery; + +public class DbStatementFactoryWrapper implements DbStatementFactory { + private final ParameterizedQuery parameterizedQuery; + private final DbStatementFactory statementFactory; + + public DbStatementFactoryWrapper(ParameterizedQuery parameterizedQuery, DbStatementFactory statementFactory) { + this.parameterizedQuery = parameterizedQuery; + this.statementFactory = statementFactory; + } + + @Override + public Statement create(Object[] args) { + return statementFactory.create(args); + } + + @Override + public String toString() { + return parameterizedQuery.toString(); + } +} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java deleted file mode 100644 index 23160e67..00000000 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/SelectStatementFactory.java +++ /dev/null @@ -1,91 +0,0 @@ -package se.fortnox.reactivewizard.db.statement; - -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.MonoSink; -import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializerImpl; -import se.fortnox.reactivewizard.db.query.ParameterizedQuery; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; - -import static java.lang.String.format; - -public class SelectStatementFactory extends AbstractDbStatementFactory { - private static final String NULL_VALUE_ERROR = """ - One or more of the values returned in the resultset of the following query was null: - {{sqlQuery}} - - Project Reactor does not allow emitting null values in a stream. Wrap the return value from the dao interface - in a 'wrapper' to solve the issue. - Example: - record Wrapper(String nullableValue) {}; - """; - private static final String QUERY_PLACEHOLDER = "{{sqlQuery}}"; - private static final String MONO_NEXT_ERROR = "%s returning a Mono received more than one result from the database"; - private final DbResultSetDeserializerImpl deserializer; - - private final String methodName; - - public SelectStatementFactory(ParameterizedQuery parameterizedQuery, Class returnType) { - super(parameterizedQuery); - this.deserializer = new DbResultSetDeserializerImpl(returnType); - this.methodName = parameterizedQuery.getMethodName(); - } - - @Override - protected void executeStatement(Connection connection, Object[] args, FluxSink fluxSink) throws SQLException { - try (PreparedStatement statement = parameterizedQuery.createStatement(connection, args)) { - parameterizedQuery.addParameters(args, statement); - try (ResultSet resultSet = statement.executeQuery()) { - while (resultSet.next()) { - if (fluxSink != null) { - fluxSink.next(deserialize(resultSet)); - } - } - } - } - } - - @Override - protected void executeStatement(Connection connection, Object[] args, MonoSink monoSink) throws SQLException { - try (PreparedStatement statement = parameterizedQuery.createStatement(connection, args)) { - parameterizedQuery.addParameters(args, statement); - try (ResultSet resultSet = statement.executeQuery()) { - if (monoSink == null) { - return; - } - - Object object = null; - - if (resultSet.next()) { - object = deserialize(resultSet); - } else { - monoSink.success(); - } - - if (resultSet.next()) { - throw new RuntimeException(format(MONO_NEXT_ERROR, methodName)); - } - - monoSink.success(object); - } - StatementDebug.log(statement); - } - } - - @Override - public String toString() { - return parameterizedQuery.toString(); - } - - private Object deserialize(ResultSet resultSet) { - var value = deserializer.deserialize(resultSet); - if (value == null) { - var sqlQuery = parameterizedQuery.toString(); - throw new NullPointerException(NULL_VALUE_ERROR.replace(QUERY_PLACEHOLDER, sqlQuery)); - } - return value; - } -} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementExecutorReturningCountFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementExecutorReturningCountFactory.java deleted file mode 100644 index b100b7be..00000000 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementExecutorReturningCountFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -package se.fortnox.reactivewizard.db.statement; - -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.MonoSink; -import se.fortnox.reactivewizard.db.query.ParameterizedQuery; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -public class UpdateStatementExecutorReturningCountFactory extends AbstractUpdateStatementFactory { - - public UpdateStatementExecutorReturningCountFactory(ParameterizedQuery parameterizedQuery, int minimumAffected) { - super(minimumAffected, parameterizedQuery); - } - - @Override - protected void executeStatement(Connection connection, Object[] args, FluxSink fluxSink) throws SQLException { - try (PreparedStatement statement = parameterizedQuery.createStatement(connection, args)) { - parameterizedQuery.addParameters(args, statement); - executed(statement.executeUpdate(), fluxSink); - StatementDebug.log(statement); - } - } - - @Override - protected void executeStatement(Connection connection, Object[] args, MonoSink monoSink) throws SQLException { - try (PreparedStatement statement = parameterizedQuery.createStatement(connection, args)) { - parameterizedQuery.addParameters(args, statement); - executed(statement.executeUpdate(), monoSink); - } - } - - @Override - protected PreparedStatement batch(Connection connection, PreparedStatement preparedStatement, Object[] args) throws SQLException { - if (preparedStatement == null) { - preparedStatement = parameterizedQuery.createStatement(connection, args); - } - - parameterizedQuery.addParameters(args, preparedStatement); - preparedStatement.addBatch(); - return preparedStatement; - } - - @Override - protected void batchExecuted(int count, FluxSink fluxSink) throws SQLException { - executed(count, fluxSink); - } - - protected void executed(int count, FluxSink fluxSink) throws SQLException { - ensureMinimumReached(count); - if (fluxSink != null) { - fluxSink.next(count); - } - } - - protected void executed(int count, MonoSink monoSink) throws SQLException { - ensureMinimumReached(count); - if (monoSink != null) { - monoSink.success(count); - } - } - - @Override - protected boolean sameBatch(ParameterizedQuery parameterizedQuery) { - return parameterizedQuery.toString().equals(this.parameterizedQuery.toString()); - } -} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java deleted file mode 100644 index 11883fb9..00000000 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningGeneratedKeyFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -package se.fortnox.reactivewizard.db.statement; - -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.MonoSink; -import se.fortnox.reactivewizard.db.GeneratedKey; -import se.fortnox.reactivewizard.db.deserializing.DbResultSetDeserializerImpl; -import se.fortnox.reactivewizard.db.query.ParameterizedQuery; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; - -import static java.sql.Statement.RETURN_GENERATED_KEYS; - -public class UpdateStatementReturningGeneratedKeyFactory extends AbstractUpdateStatementFactory { - - private final DbResultSetDeserializerImpl deserializer; - - public UpdateStatementReturningGeneratedKeyFactory(ParameterizedQuery parameterizedQuery, - Class keyType, int minimumAffected - ) { - super(minimumAffected, parameterizedQuery); - this.deserializer = new DbResultSetDeserializerImpl(keyType); - } - - @Override - protected void executeStatement(Connection connection, Object[] args, FluxSink fluxSink) throws SQLException { - try (PreparedStatement statement = parameterizedQuery.createStatement(connection, args, RETURN_GENERATED_KEYS)) { - parameterizedQuery.addParameters(args, statement); - ensureMinimumReached(statement.executeUpdate()); - try (ResultSet resultSet = statement.getGeneratedKeys()) { - while (resultSet.next()) { - if (fluxSink != null) { - fluxSink.next((GeneratedKey) () -> deserializer.deserialize(resultSet)); - } - } - } - StatementDebug.log(statement); - } - } - - @Override - protected void executeStatement(Connection connection, Object[] args, MonoSink monoSink) throws SQLException { - try (PreparedStatement statement = parameterizedQuery.createStatement(connection, args, RETURN_GENERATED_KEYS)) { - parameterizedQuery.addParameters(args, statement); - ensureMinimumReached(statement.executeUpdate()); - try (ResultSet resultSet = statement.getGeneratedKeys()) { - if (monoSink == null) { - return; - } - if (resultSet.next()) { - monoSink.success((GeneratedKey)() -> deserializer.deserialize(resultSet)); - } else { - monoSink.success(); - } - } - } - } -} diff --git a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningVoidFactory.java b/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningVoidFactory.java deleted file mode 100644 index 84e6d2fa..00000000 --- a/dao/src/main/java/se/fortnox/reactivewizard/db/statement/UpdateStatementReturningVoidFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -package se.fortnox.reactivewizard.db.statement; - -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.MonoSink; -import se.fortnox.reactivewizard.db.query.ParameterizedQuery; - -import java.sql.SQLException; - -public class UpdateStatementReturningVoidFactory extends UpdateStatementExecutorReturningCountFactory { - - public UpdateStatementReturningVoidFactory(ParameterizedQuery parameterizedQuery, int minimumAffected) { - super(parameterizedQuery, minimumAffected); - } - - @Override - protected void executed(int count, FluxSink fluxSink) throws SQLException { - ensureMinimumReached(count); - } - - @Override - protected void executed(int count, MonoSink monoSink) throws SQLException { - ensureMinimumReached(count); - if (monoSink != null) { - monoSink.success(); - } - } -}