diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java index 827c41b5..794ed05e 100644 --- a/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java @@ -19,6 +19,7 @@ import com.powsybl.loadflow.LoadFlowProvider; import com.powsybl.network.store.client.NetworkStoreService; import com.powsybl.sensitivity.*; +import lombok.extern.slf4j.Slf4j; import org.gridsuite.computation.dto.ReportInfos; import org.gridsuite.computation.service.*; import org.apache.commons.lang3.tuple.Pair; @@ -29,10 +30,10 @@ import org.gridsuite.sensitivityanalysis.server.entities.AnalysisResultEntity; import org.gridsuite.sensitivityanalysis.server.entities.ContingencyResultEntity; import org.gridsuite.sensitivityanalysis.server.entities.SensitivityResultEntity; +import org.gridsuite.sensitivityanalysis.server.util.BatchAsyncPollerFactory; +import org.gridsuite.sensitivityanalysis.server.util.ExecutorProviderService; import org.gridsuite.sensitivityanalysis.server.util.SensitivityAnalysisRunnerSupplier; -import org.gridsuite.sensitivityanalysis.server.util.SensitivityResultWriterPersisted; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.gridsuite.sensitivityanalysis.server.util.SensitivityResultPersistedWriter; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; @@ -50,22 +51,20 @@ /** * @author Franck Lecuyer */ +@Slf4j @Service public class SensitivityAnalysisWorkerService extends AbstractWorkerService { - private static final Logger LOGGER = LoggerFactory.getLogger(SensitivityAnalysisWorkerService.class); - public static final String COMPUTATION_TYPE = "Sensitivity analysis"; + public static final String COMPUTATION_TYPE = "Sensitivity analysis"; public static final int CONTINGENCY_RESULTS_BUFFER_SIZE = 128; - public static final int MAX_RESULTS_BUFFER_SIZE = 128; + protected final SensitivityAnalysisInMemoryObserver inMemoryObserver; private final SensitivityAnalysisInputBuilderService sensitivityAnalysisInputBuilderService; - private final SensitivityAnalysisParametersService parametersService; - private final Function sensitivityAnalysisFactorySupplier; - - protected final SensitivityAnalysisInMemoryObserver inMemoryObserver; + private final ExecutorProviderService executorProviderService; + private final BatchAsyncPollerFactory batchAsyncPollerFactory; public SensitivityAnalysisWorkerService(NetworkStoreService networkStoreService, ReportService reportService, @@ -78,12 +77,15 @@ public SensitivityAnalysisWorkerService(NetworkStoreService networkStoreService, SensitivityAnalysisRunnerSupplier sensitivityAnalysisRunnerSupplier, SensitivityAnalysisObserver observer, SensitivityAnalysisInMemoryObserver inMemoryObserver, - PropertyServerNameProvider propertyServerNameProvider) { + PropertyServerNameProvider propertyServerNameProvider, + ExecutorProviderService executorProviderService) { super(networkStoreService, notificationService, reportService, resultService, executionService, observer, objectMapper, propertyServerNameProvider); this.sensitivityAnalysisInputBuilderService = sensitivityAnalysisInputBuilderService; this.parametersService = parametersService; this.sensitivityAnalysisFactorySupplier = sensitivityAnalysisRunnerSupplier::getRunner; this.inMemoryObserver = inMemoryObserver; + this.executorProviderService = executorProviderService; + this.batchAsyncPollerFactory = new BatchAsyncPollerFactory(); // TODO this should be a spring service } @Override @@ -132,50 +134,21 @@ protected CompletableFuture getCompletableFuture(SensitivityAnalysisRun saveSensitivityResults(groupedFactors, resultUuid, contingencies); - SensitivityResultWriterPersisted writer = new SensitivityResultWriterPersisted(resultUuid, resultService); - writer.start(); - List factors = groupedFactors.stream().flatMap(Collection::stream).toList(); SensitivityFactorReader sensitivityFactorReader = new SensitivityFactorModelReader(factors, runContext.getNetwork()); - CompletableFuture future = sensitivityAnalysisRunner.runAsync( - runContext.getNetwork(), - variantId, - sensitivityFactorReader, - writer, - contingencies, - runContext.getSensitivityAnalysisInputs().getVariablesSets(), - sensitivityAnalysisParameters, - executionService.getComputationManager(), - runContext.getReportNode()) - .whenComplete((unused1, unused2) -> writer.setQueueProducerFinished()) - .thenApply(unused -> { - try { - while (!writer.isConsumerFinished()) { - Thread.sleep(100); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - writer.interrupt(); - // used to check if result is not null - return true; - }) - .exceptionally(e -> { - LOGGER.error("Error occurred during computation", e); - runContext.getReportNode() - .newReportNode() - .withMessageTemplate("sensitivity.analysis.server.sensitivityComputationFailed") - .withUntypedValue("exception", e.getMessage()) - .withSeverity(TypedValue.ERROR_SEVERITY) - .add(); - writer.interrupt(); - // null means it failed - return false; - }); - if (resultUuid != null) { - futures.put(resultUuid, future); - } - return future; + SensitivityResultPersistedWriter sensitivityResultPersistedWriter = new SensitivityResultPersistedWriter(resultUuid, resultService, executorProviderService, batchAsyncPollerFactory); + + SensitivityAnalysisRunParameters runParameters = new SensitivityAnalysisRunParameters() + .setContingencies(contingencies) + .setVariableSets(runContext.getSensitivityAnalysisInputs().getVariablesSets()) + .setParameters(sensitivityAnalysisParameters) + .setComputationManager(executionService.getComputationManager()) + .setReportNode(runContext.getReportNode()); + + return sensitivityAnalysisRunner.runAsync(runContext.getNetwork(), variantId, sensitivityFactorReader, sensitivityResultPersistedWriter, runParameters) + .thenApply(unused -> Boolean.TRUE) + .whenComplete((result, throwable) -> syncWriterCompletion(throwable, sensitivityResultPersistedWriter)) + .exceptionally(throwable -> handleAsyncError(throwable, runContext)); } private void saveSensitivityResults(List> groupedFactors, UUID resultUuid, List contingencies) { @@ -236,7 +209,7 @@ public SensitivityAnalysisResult run(UUID networkUuid, String variantId, ReportI Thread.currentThread().interrupt(); return null; } catch (Exception e) { - LOGGER.error(getFailedMessage(getComputationType()), e); + log.error(getFailedMessage(getComputationType()), e); return null; } } @@ -244,7 +217,7 @@ public SensitivityAnalysisResult run(UUID networkUuid, String variantId, ReportI private SensitivityAnalysisResult runInMemory(SensitivityAnalysisRunContext runContext) throws Exception { Objects.requireNonNull(runContext); - LOGGER.info("Run sensitivity analysis"); + log.info("Run sensitivity analysis"); SensitivityAnalysis.Runner sensitivityAnalysisRunner = sensitivityAnalysisFactorySupplier.apply(runContext.getProvider()); @@ -313,4 +286,31 @@ private CompletableFuture runAsyncInMemory(Sensitivit reporter); return future.thenApply(r -> new SensitivityAnalysisResult(factors, writer.getContingencyStatuses(), writer.getValues())); } + + private void syncWriterCompletion(Throwable throwable, SensitivityResultPersistedWriter persistedWriter) { + // force free resources + try (persistedWriter) { + persistedWriter.notifyCompletion(); + + // success, so we wait for the writer to properly finish its job + if (throwable == null) { + persistedWriter.waitForCompletion(); + } + } catch (Exception e) { + // writer can throw an exception if one or more results could not be persisted + throw new RuntimeException(e); + } + } + + private Boolean handleAsyncError(Throwable throwable, SensitivityAnalysisRunContext runContext) { + log.error("Error occurred during computation", throwable); + runContext.getReportNode() + .newReportNode() + .withMessageTemplate("sensitivity.analysis.server.sensitivityComputationFailed") + .withUntypedValue("exception", throwable.getMessage()) + .withSeverity(TypedValue.ERROR_SEVERITY) + .add(); + // false since the computation failed + return false; + } } diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPoller.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPoller.java new file mode 100644 index 00000000..39d86ce5 --- /dev/null +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPoller.java @@ -0,0 +1,101 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +/** + * @author Ghiles Abdellah {@literal } + */ +@Slf4j +public class BatchAsyncPoller { + + protected static final int BUFFER_SIZE = 512; + private static final int TASK_INITIAL_DELAY = 0; + private static final int TASK_DELAY = 100; + + private final UUID resultUuid; + private final String taskName; + private final AtomicBoolean isProducerFinished; + private final BiConsumer> batchHandlingFunction; + + private final BlockingQueue blockingQueue; + private final ScheduledFuture pollingFuture; + + public BatchAsyncPoller(ScheduledExecutorService scheduledExecutorService, UUID resultUuid, + String taskName, BiConsumer> batchHandlingFunction) { + this.resultUuid = resultUuid; + this.taskName = taskName; + this.batchHandlingFunction = batchHandlingFunction; + this.isProducerFinished = new AtomicBoolean(false); + + this.blockingQueue = new LinkedBlockingQueue<>(); + this.pollingFuture = scheduledExecutorService.scheduleWithFixedDelay(this::drainQueue, TASK_INITIAL_DELAY, TASK_DELAY, TimeUnit.MILLISECONDS); + } + + public void add(T data) { + if (pollingFuture.isDone()) { + throw new IllegalStateException("Cannot add data to a finished Poller"); + } + + blockingQueue.add(data); + } + + public void notifyCompletion() { + isProducerFinished.set(true); + } + + /** + * @throws ExecutionException - if one scheduled iteration failed + */ + public void waitForCompletion() throws ExecutionException { + try { + pollingFuture.get(); + } catch (CancellationException e) { + // pass, this is the nominal case + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * This method makes exceptions bubble if the `batchHandlingFunction` throws one. + * The goal is to stop the unnecessary consumption and make the calling code know that the process failed at one point. + * The scheduler will stop it and mark the future with an exception -> a call to `notifyCompletion` will then throw an `ExecutionException` + */ + private void drainQueue() { + List buffer = new ArrayList<>(BUFFER_SIZE); + + while (!shouldStop() && hasDrainedData(buffer)) { + log.debug("{} - Treating {} elements in the batch, {} elements remaining in the queue", taskName, buffer.size(), blockingQueue.size()); + batchHandlingFunction.accept(resultUuid, new ArrayList<>(buffer)); + buffer.clear(); + } + + if (shouldStop()) { + pollingFuture.cancel(false); + } + } + + private boolean shouldStop() { + // Thread.currentThread().isInterrupted() check is mandatory for the loop since it doesn't have method calls that checks the flag + // isProducerFinished.get() && blockingQueue.isEmpty() is also mandatory given the logic inside the calling method + // it allows to consume all data before leaving the calling loop + return Thread.currentThread().isInterrupted() || isProducerFinished.get() && blockingQueue.isEmpty(); + } + + private boolean hasDrainedData(List buffer) { + return blockingQueue.drainTo(buffer, BUFFER_SIZE) > 0; + } +} diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPollerFactory.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPollerFactory.java new file mode 100644 index 00000000..8af09983 --- /dev/null +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPollerFactory.java @@ -0,0 +1,23 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; + +/** + * @author Ghiles Abdellah {@literal } + */ +public class BatchAsyncPollerFactory { + + public BatchAsyncPoller create(ScheduledExecutorService scheduledExecutorService, UUID resultUuid, + String taskName, BiConsumer> batchHandlingFunction) { + return new BatchAsyncPoller<>(scheduledExecutorService, resultUuid, taskName, batchHandlingFunction); + } +} diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/ExecutorProviderService.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/ExecutorProviderService.java new file mode 100644 index 00000000..8a310373 --- /dev/null +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/ExecutorProviderService.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.springframework.stereotype.Service; + +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +/** + * @author Ghiles Abdellah {@literal } + */ +@Service +public class ExecutorProviderService { + public ScheduledExecutorService newScheduledThreadPool(int size, UUID threadPrefix) { + Objects.requireNonNull(threadPrefix); + + ThreadFactory factory = new ThreadFactoryBuilder() + .setNameFormat(threadPrefix + "-%d") + .setDaemon(false) + .build(); + return Executors.newScheduledThreadPool(size, factory); + } +} diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultPersistedWriter.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultPersistedWriter.java new file mode 100644 index 00000000..450b5b1c --- /dev/null +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultPersistedWriter.java @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import com.powsybl.sensitivity.SensitivityAnalysisResult; +import com.powsybl.sensitivity.SensitivityResultWriter; +import com.powsybl.sensitivity.SensitivityValue; +import org.gridsuite.sensitivityanalysis.server.service.SensitivityAnalysisResultService; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; + +/** + * @author Ghiles Abdellah {@literal } + */ +public class SensitivityResultPersistedWriter implements SensitivityResultWriter, AutoCloseable { + + protected static final String SENSITIVITY_WRITER_THREAD = "sensitivityWriterThread"; + protected static final String CONTINGENCY_WRITER_THREAD = "contingencyWriterThread"; + + private final ScheduledExecutorService scheduledExecutorService; + private final BatchAsyncPoller sensitivityBatchAsyncPoller; + private final BatchAsyncPoller contingencyBatchAsyncPoller; + + public SensitivityResultPersistedWriter(UUID resultUuid, SensitivityAnalysisResultService sensitivityAnalysisResultService, + ExecutorProviderService executorProviderService, BatchAsyncPollerFactory batchAsyncPollerFactory) { + this.scheduledExecutorService = executorProviderService.newScheduledThreadPool(2, resultUuid); + this.sensitivityBatchAsyncPoller = batchAsyncPollerFactory.create(this.scheduledExecutorService, resultUuid, SENSITIVITY_WRITER_THREAD, sensitivityAnalysisResultService::writeSensitivityValues); + this.contingencyBatchAsyncPoller = batchAsyncPollerFactory.create(this.scheduledExecutorService, resultUuid, CONTINGENCY_WRITER_THREAD, sensitivityAnalysisResultService::writeContingenciesStatus); + } + + @Override + public void writeSensitivityValue(int factorIndex, int contingencyIndex, double value, double functionReference) { + throwOnExecutorShutdown(); + + if (Double.isNaN(functionReference) || Double.isNaN(value)) { + return; + } + sensitivityBatchAsyncPoller.add(new SensitivityValue(factorIndex, contingencyIndex, value, functionReference)); + } + + @Override + public void writeContingencyStatus(int contingencyIndex, SensitivityAnalysisResult.Status status) { + throwOnExecutorShutdown(); + + contingencyBatchAsyncPoller.add(new ContingencyResult(contingencyIndex, status)); + } + + @Override + public void close() { + scheduledExecutorService.shutdownNow(); + } + + public void notifyCompletion() { + sensitivityBatchAsyncPoller.notifyCompletion(); + contingencyBatchAsyncPoller.notifyCompletion(); + } + + public void waitForCompletion() throws ExecutionException { + sensitivityBatchAsyncPoller.waitForCompletion(); + contingencyBatchAsyncPoller.waitForCompletion(); + } + + private void throwOnExecutorShutdown() { + if (scheduledExecutorService.isShutdown()) { + throw new IllegalStateException("Cannot add data to a finished Writer"); + } + } +} diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java deleted file mode 100644 index 39daf0c3..00000000 --- a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (c) 2024, RTE (http://www.rte-france.com) - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package org.gridsuite.sensitivityanalysis.server.util; - -import com.powsybl.sensitivity.SensitivityAnalysisResult; -import com.powsybl.sensitivity.SensitivityResultWriter; -import com.powsybl.sensitivity.SensitivityValue; -import org.gridsuite.sensitivityanalysis.server.service.SensitivityAnalysisResultService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * @author Joris Mancini - */ -public class SensitivityResultWriterPersisted implements SensitivityResultWriter { - public static final Logger LOGGER = LoggerFactory.getLogger(SensitivityResultWriterPersisted.class); - - public static final int BUFFER_SIZE = 512; - - private final SensitivityAnalysisResultService sensitivityAnalysisResultService; - - private final BlockingQueue sensitivityValuesQueue; - - private final BlockingQueue contingencyResultsQueue; - - private final Thread sensitivityValuesThread; - - private final Thread contingencyResultsThread; - - private final AtomicBoolean sensitivityValuesConsumerFinished; - - private final AtomicBoolean contingencyResultsConsumerFinished; - - private final AtomicBoolean queueProducerFinished; - - private UUID resultUuid; - - public SensitivityResultWriterPersisted(UUID resultUuid, SensitivityAnalysisResultService sensitivityAnalysisResultService) { - this.sensitivityAnalysisResultService = sensitivityAnalysisResultService; - sensitivityValuesQueue = new LinkedBlockingQueue<>(); - contingencyResultsQueue = new LinkedBlockingQueue<>(); - sensitivityValuesThread = new Thread(sensitivityValuesBatchedHandling(), "sensitivityWriterThread"); - contingencyResultsThread = new Thread(contingencyResultsBatchedHandling(), "contingencyWriterThread"); - sensitivityValuesConsumerFinished = new AtomicBoolean(false); - contingencyResultsConsumerFinished = new AtomicBoolean(false); - queueProducerFinished = new AtomicBoolean(false); - this.resultUuid = resultUuid; - } - - public void start() { - sensitivityValuesThread.start(); - contingencyResultsThread.start(); - } - - public void interrupt() { - sensitivityValuesThread.interrupt(); - contingencyResultsThread.interrupt(); - } - - public boolean isConsumerFinished() { - return sensitivityValuesConsumerFinished.get() && contingencyResultsConsumerFinished.get(); - } - - public void setQueueProducerFinished() { - queueProducerFinished.set(true); - } - - @Override - public void writeSensitivityValue(int factorIndex, int contingencyIndex, double value, double functionReference) { - if (Double.isNaN(functionReference) || Double.isNaN(value)) { - return; - } - sensitivityValuesQueue.add(new SensitivityValue(factorIndex, contingencyIndex, value, functionReference)); - } - - @Override - public void writeContingencyStatus(int contingencyIndex, SensitivityAnalysisResult.Status status) { - contingencyResultsQueue.add(new ContingencyResult(contingencyIndex, status)); - } - - private Runnable sensitivityValuesBatchedHandling() { - return () -> run( - sensitivityValuesThread, - sensitivityValuesConsumerFinished, - sensitivityValuesQueue, - sensitivityAnalysisResultService::writeSensitivityValues - ); - } - - private Runnable contingencyResultsBatchedHandling() { - return () -> run( - contingencyResultsThread, - contingencyResultsConsumerFinished, - contingencyResultsQueue, - sensitivityAnalysisResultService::writeContingenciesStatus - ); - } - - private interface BatchedRunnable { - void run(UUID resultUuid, List tasks); - } - - private void run(Thread thread, AtomicBoolean isFinished, BlockingQueue queue, BatchedRunnable runnable) { - try { - // Note: checking isInterrupted here is a bit redundant with Thread.sleep below which - // also checks it and throws to exit the loop, but it has the advantage of making the - // code safer if we ever remove such a blocking call (ie ones throwing when interrupted). - // Also a minor advantage is that we stop the loop one iteration earlier (drain + run) - // with the current code that only blocks if the queue was empty (drained 0 elements) - while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) { - List tasks = new ArrayList<>(BUFFER_SIZE); - while (!(queue.drainTo(tasks, BUFFER_SIZE) > 0 || queueProducerFinished.get() && queue.isEmpty())) { - Thread.sleep(100); - } - LOGGER.debug("{} - Remaining {} elements in the queue", thread.getName(), queue.size()); - if (!tasks.isEmpty()) { - LOGGER.debug("{} - Treating {} elements in the batch", thread.getName(), tasks.size()); - runnable.run(resultUuid, tasks); - } - } - } catch (InterruptedException e) { - LOGGER.debug("Thread {} has been interrupted", thread.getName()); - thread.interrupt(); - } catch (Exception e) { - LOGGER.error("Unexpected error occurred during persisting results", e); - } finally { - isFinished.set(true); - } - } -} diff --git a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPollerTest.java b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPollerTest.java new file mode 100644 index 00000000..33e22c7e --- /dev/null +++ b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/BatchAsyncPollerTest.java @@ -0,0 +1,253 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.OngoingStubbing; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +/** + * @author Ghiles Abdellah {@literal } + */ +class BatchAsyncPollerTest { + + private ScheduledExecutorService scheduledExecutorServiceMock; + private ScheduledFuture scheduledFutureMock; + private BiConsumer> handlerMock; + private ArgumentCaptor runnableCaptor; + private Runnable actualRunnable; + + private long expectedInitialDelay; + private long expectedDelay; + private int expectedBufferSize; + private UUID randomUUID; + private String taskName; + private Object data; + + public static Stream provideFutureState() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(true, false, false), + Arguments.of(false, true, false), + Arguments.of(false, false, true) + ); + } + + @BeforeEach + void setUp() { + scheduledExecutorServiceMock = mock(ScheduledExecutorService.class); + scheduledFutureMock = mock(ScheduledFuture.class); + runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + handlerMock = mock(BiConsumer.class); + + randomUUID = UUID.randomUUID(); + taskName = "TestTask"; + data = new Object(); + + expectedInitialDelay = 0L; + expectedDelay = 100L; + expectedBufferSize = 512; + + when(scheduledExecutorServiceMock.scheduleWithFixedDelay(runnableCaptor.capture(), eq(expectedInitialDelay), eq(expectedDelay), eq(TimeUnit.MILLISECONDS))) + .thenReturn(scheduledFutureMock); + } + + @Test + void whenPollerIsCreatedThenFutureShouldBeInitialized() { + createBatchAsyncPoller(); + + verify(scheduledExecutorServiceMock).scheduleWithFixedDelay(any(Runnable.class), eq(expectedInitialDelay), eq(expectedDelay), eq(TimeUnit.MILLISECONDS)); + assertNotNull(actualRunnable); + verifyNoInteractions(handlerMock); + } + + @Test + void whenFutureCalledOnceAndHasSingleValueThenHandlerShouldBeCalledOnce() { + List expectedDataList = List.of(data); + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + + batchAsyncPoller.add(data); + actualRunnable.run(); + + verify(handlerMock).accept(randomUUID, expectedDataList); + } + + @Test + void whenFutureCalledTwiceAndHasSingleValueThenHandlerShouldBeCalledOnce() { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + batchAsyncPoller.add(data); + actualRunnable.run(); + actualRunnable.run(); + + verify(handlerMock).accept(any(), anyList()); + } + + @Test + void whenFutureCalledAndHasValueToBufferThenHandlerShouldBeCalledMultipleTimes() { + int expectedNumberOfBatch = 2; + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + + for (int i = 0; i < expectedBufferSize * expectedNumberOfBatch; i++) { + batchAsyncPoller.add(data); + } + actualRunnable.run(); + + verify(handlerMock, times(expectedNumberOfBatch)).accept(any(), anyList()); + } + + @Test + void whenDoneAndAddDataThenShouldThrowException() { + when(scheduledFutureMock.isDone()).thenReturn(true); + + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + + assertThrows(IllegalStateException.class, () -> batchAsyncPoller.add(data)); + } + + @Test + void whenCompleteThenFutureShouldBeCancelled() { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + + batchAsyncPoller.notifyCompletion(); + actualRunnable.run(); + + verify(scheduledFutureMock).cancel(false); + } + + @Test + void whenCompleteAndHasDataThenFutureShouldBeCancelledAfterFullPull() { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + batchAsyncPoller.add(data); + + batchAsyncPoller.notifyCompletion(); + actualRunnable.run(); + + verify(scheduledFutureMock).cancel(false); + verify(handlerMock).accept(any(), anyList()); + } + + @Test + void whenInterruptedAndHasDataThenFutureShouldBeCancelledWithoutPull() { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + batchAsyncPoller.add(data); + + Thread.currentThread().interrupt(); + actualRunnable.run(); + + verify(scheduledFutureMock).cancel(false); + verifyNoInteractions(handlerMock); + } + + @Test + void whenInterruptedMidComputationAndHasDataThenFutureShouldBeCancelledAfterFirstPull() { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + for (int i = 0; i < expectedBufferSize * 2; i++) { + batchAsyncPoller.add(data); + } + + doAnswer(unused -> { + Thread.currentThread().interrupt(); + return null; + }).when(handlerMock).accept(any(), anyList()); + actualRunnable.run(); + + verify(scheduledFutureMock).cancel(false); + verify(handlerMock).accept(any(), anyList()); + } + + /** + * the aim is to check if an exception in the handler is propagated to the caller, ie the scheduler that will stop it and mark the future with an exception + */ + @Test + void whenExceptionInHandlerThenExceptionShouldBePropagatedToCaller() { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + batchAsyncPoller.add(data); + + doThrow(new RuntimeException("TestException")) + .when(handlerMock).accept(any(), anyList()); + + assertThrows(RuntimeException.class, () -> actualRunnable.run()); + } + + @ParameterizedTest + @MethodSource("provideFutureState") + void whenWaitForCompletionThenShouldWaitForFuture(boolean isCanceled, boolean isInterrupted, boolean hasException) throws Exception { + BatchAsyncPoller batchAsyncPoller = createBatchAsyncPoller(); + CountDownLatch waitForEndOFComputation = new CountDownLatch(1); + + // default case + OngoingStubbing waitForEndOfComputationStubbing = when(scheduledFutureMock.get()).thenAnswer(invocationOnMock -> { + waitForEndOFComputation.await(); + return null; + }); + // handle multiple exceptions creation + if (isCanceled) { + waitForEndOfComputationStubbing.thenThrow(CancellationException.class); + } else if (isInterrupted) { + waitForEndOfComputationStubbing.thenThrow(InterruptedException.class); + } else if (hasException) { + waitForEndOfComputationStubbing.thenThrow(ExecutionException.class); + } + + CompletableFuture completableFuture = getCompletableFuture(batchAsyncPoller); + // should be waiting for the computation end + assertFalse(completableFuture.isDone()); + // simulate the end of computation + waitForEndOFComputation.countDown(); + // small wait with timeout + waitForFuture(completableFuture, hasException); + assertTrue(completableFuture.isDone()); + + verify(scheduledFutureMock).get(); + } + + private CompletableFuture getCompletableFuture(BatchAsyncPoller batchAsyncPoller) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + return CompletableFuture.runAsync(() -> { + try { + batchAsyncPoller.waitForCompletion(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, executorService); + } + + private BatchAsyncPoller createBatchAsyncPoller() { + BatchAsyncPoller batchAsyncPoller = new BatchAsyncPoller<>(scheduledExecutorServiceMock, randomUUID, taskName, handlerMock); + actualRunnable = runnableCaptor.getValue(); + + return batchAsyncPoller; + } + + private void waitForFuture(CompletableFuture completableFuture, boolean shouldHaveExecutionException) { + try { + completableFuture.get(100, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + fail("Should not timeout"); + } catch (Exception e) { + if (shouldHaveExecutionException) { + assertEquals(ExecutionException.class, e.getClass()); + } else { + fail("Should not throw exception"); + } + } + } +} diff --git a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/ExecutorProviderServiceTest.java b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/ExecutorProviderServiceTest.java new file mode 100644 index 00000000..d88071f2 --- /dev/null +++ b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/ExecutorProviderServiceTest.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * @author Ghiles Abdellah {@literal } + */ +class ExecutorProviderServiceTest { + + private ExecutorProviderService executorProviderService; + + @BeforeEach + void setUp() { + executorProviderService = new ExecutorProviderService(); + } + + @Test + void whenNewScheduledThreadPoolWithValidInputThenShouldReturnInitializedThreadPool() { + int threadPoolSize = 4; + UUID threadPrefix = UUID.randomUUID(); + + ScheduledExecutorService executorService = executorProviderService.newScheduledThreadPool(threadPoolSize, threadPrefix); + + assertNotNull(executorService); + assertFalse(executorService.isShutdown()); + assertFalse(executorService.isTerminated()); + + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) executorService; + int poolSize = scheduledThreadPoolExecutor.getPoolSize(); + int coreSize = scheduledThreadPoolExecutor.getCorePoolSize(); + assertEquals(0, poolSize); + assertEquals(threadPoolSize, coreSize); + } + + @Test + void whenNewScheduledThreadPoolWithNegativeSizeThenShouldThrowException() { + int threadPoolSize = -1; + UUID threadPrefix = UUID.randomUUID(); + + assertThrows(IllegalArgumentException.class, + () -> executorProviderService.newScheduledThreadPool(threadPoolSize, threadPrefix)); + } + + @Test + void whenNewScheduledThreadPoolWithNoPrefixThenShouldThrowException() { + int threadPoolSize = 1; + UUID threadPrefix = null; + + assertThrows(NullPointerException.class, + () -> executorProviderService.newScheduledThreadPool(threadPoolSize, threadPrefix)); + } +} diff --git a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultPersistedWriterTest.java b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultPersistedWriterTest.java new file mode 100644 index 00000000..f53eecb5 --- /dev/null +++ b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultPersistedWriterTest.java @@ -0,0 +1,178 @@ +/** + * Copyright (c) 2026, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package org.gridsuite.sensitivityanalysis.server.util; + +import com.powsybl.sensitivity.SensitivityAnalysisResult; +import com.powsybl.sensitivity.SensitivityValue; +import org.gridsuite.sensitivityanalysis.server.service.SensitivityAnalysisResultService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; + +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; +import java.util.stream.Stream; + +import static org.gridsuite.sensitivityanalysis.server.util.SensitivityResultPersistedWriter.CONTINGENCY_WRITER_THREAD; +import static org.gridsuite.sensitivityanalysis.server.util.SensitivityResultPersistedWriter.SENSITIVITY_WRITER_THREAD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * @author Ghiles Abdellah {@literal } + */ +class SensitivityResultPersistedWriterTest { + + private SensitivityResultPersistedWriter sensitivityResultPersistedWriter; + + private SensitivityAnalysisResultService sensitivityAnalysisResultServiceMock; + private ExecutorProviderService executorProviderServiceMock; + private ScheduledExecutorService scheduledExecutorServiceMock; + private BatchAsyncPollerFactory batchAsyncPollerFactoryMock; + private BatchAsyncPoller sensitivityPollerMock; + private BatchAsyncPoller contingencyPollerMock; + + private UUID resultUuid; + + public static Stream provideInvalidSensitivityValue() { + return Stream.of( + Arguments.of(1, 2, 0.5, Double.NaN), + Arguments.of(3, 4, Double.NaN, 0.8) + ); + } + + @BeforeEach + void setUp() { + sensitivityAnalysisResultServiceMock = mock(SensitivityAnalysisResultService.class); + executorProviderServiceMock = mock(ExecutorProviderService.class); + scheduledExecutorServiceMock = mock(ScheduledExecutorService.class); + batchAsyncPollerFactoryMock = mock(BatchAsyncPollerFactory.class, RETURNS_DEEP_STUBS); + sensitivityPollerMock = mock(BatchAsyncPoller.class); + contingencyPollerMock = mock(BatchAsyncPoller.class); + + resultUuid = UUID.randomUUID(); + + when(executorProviderServiceMock.newScheduledThreadPool(2, resultUuid)).thenReturn(scheduledExecutorServiceMock); + when(batchAsyncPollerFactoryMock.create(eq(scheduledExecutorServiceMock), eq(resultUuid), eq(SENSITIVITY_WRITER_THREAD), any(BiConsumer.class))).thenReturn(sensitivityPollerMock); + when(batchAsyncPollerFactoryMock.create(eq(scheduledExecutorServiceMock), eq(resultUuid), eq(CONTINGENCY_WRITER_THREAD), any(BiConsumer.class))).thenReturn(contingencyPollerMock); + + sensitivityResultPersistedWriter = new SensitivityResultPersistedWriter(resultUuid, sensitivityAnalysisResultServiceMock, executorProviderServiceMock, batchAsyncPollerFactoryMock); + } + + @Test + void whenWriterIsCreatedThenTwoPollerAreCreatedWithExpectedParam() { + verify(batchAsyncPollerFactoryMock).create(eq(scheduledExecutorServiceMock), eq(resultUuid), eq(SENSITIVITY_WRITER_THREAD), any(BiConsumer.class)); + verify(batchAsyncPollerFactoryMock).create(eq(scheduledExecutorServiceMock), eq(resultUuid), eq(CONTINGENCY_WRITER_THREAD), any(BiConsumer.class)); + } + + @Test + void whenWriteSensitivityValueCalledWithValidValuesThenValueIsAddedToPoller() { + int expectedFactorIndex = 1; + int expectedContingencyIndex = 2; + double expectedValue = 0.5; + double expectedFunctionReference = 1.0; + + sensitivityResultPersistedWriter.writeSensitivityValue(expectedFactorIndex, expectedContingencyIndex, expectedValue, expectedFunctionReference); + + ArgumentCaptor sensitivityValueCaptor = ArgumentCaptor.forClass(SensitivityValue.class); + verify(sensitivityPollerMock).add(sensitivityValueCaptor.capture()); + SensitivityValue actualSensitivityValue = sensitivityValueCaptor.getValue(); + + // TODO missing equals in powsybl, should we add it ? + assertEquals(expectedFactorIndex, actualSensitivityValue.getFactorIndex()); + assertEquals(expectedContingencyIndex, actualSensitivityValue.getContingencyIndex()); + assertEquals(expectedValue, actualSensitivityValue.getValue(), 0.0); + assertEquals(expectedFunctionReference, actualSensitivityValue.getFunctionReference(), 0.0); + } + + @ParameterizedTest + @MethodSource("provideInvalidSensitivityValue") + void whenWriteSensitivityValueCalledWithInvalidValuesThenShouldDoNothing(int expectedFactorIndex, int expectedContingencyIndex, double expectedValue, double expectedFunctionReference) { + sensitivityResultPersistedWriter.writeSensitivityValue(expectedFactorIndex, expectedContingencyIndex, expectedValue, expectedFunctionReference); + + verifyNoInteractions(sensitivityPollerMock); + } + + @Test + void whenWriteContingencyStatusCalledWithValidValuesThenValueIsAddedToPoller() { + int expectedContingencyIndex = 2; + SensitivityAnalysisResult.Status expectedStatus = SensitivityAnalysisResult.Status.SUCCESS; + ContingencyResult expectedContingencyResult = new ContingencyResult(expectedContingencyIndex, expectedStatus); + + sensitivityResultPersistedWriter.writeContingencyStatus(expectedContingencyIndex, expectedStatus); + + verify(contingencyPollerMock).add(expectedContingencyResult); + } + + @Test + void whenWriterWasClosedThenShouldThrowExceptionOnSensitivityWrite() { + int expectedFactorIndex = 1; + int expectedContingencyIndex = 2; + double expectedValue = 0.5; + double expectedFunctionReference = 1.0; + + when(scheduledExecutorServiceMock.isShutdown()).thenReturn(true); + assertThrows(IllegalStateException.class, () -> sensitivityResultPersistedWriter.writeSensitivityValue(expectedFactorIndex, expectedContingencyIndex, expectedValue, expectedFunctionReference)); + } + + @Test + void whenWriterWasClosedThenShouldThrowExceptionOnContingencyWrite() { + int expectedContingencyIndex = 2; + SensitivityAnalysisResult.Status expectedStatus = SensitivityAnalysisResult.Status.SUCCESS; + + when(scheduledExecutorServiceMock.isShutdown()).thenReturn(true); + assertThrows(IllegalStateException.class, () -> sensitivityResultPersistedWriter.writeContingencyStatus(expectedContingencyIndex, expectedStatus)); + } + + @Test + void whenWriterCloseThenShouldShutdownExecutorService() { + sensitivityResultPersistedWriter.close(); + + verify(scheduledExecutorServiceMock).shutdownNow(); + } + + @Test + void whenNotifCompletionThenShouldNotifyAllPoller() { + sensitivityResultPersistedWriter.notifyCompletion(); + + verify(sensitivityPollerMock).notifyCompletion(); + verify(contingencyPollerMock).notifyCompletion(); + } + + @Test + void whenWaitForCompletionThenShouldWaitAllPoller() throws Exception { + sensitivityResultPersistedWriter.waitForCompletion(); + + verify(sensitivityPollerMock).waitForCompletion(); + verify(contingencyPollerMock).waitForCompletion(); + } + + @Test + void whenWaitForCompletionAndFirstPollerThrowsThenShouldThrow() throws Exception { + doThrow(new RuntimeException("Mocked exception")).when(sensitivityPollerMock).waitForCompletion(); + + assertThrows(RuntimeException.class, () -> sensitivityResultPersistedWriter.waitForCompletion()); + + verify(sensitivityPollerMock).waitForCompletion(); + } + + @Test + void whenWaitForCompletionAndSecondPollerThrowsThenShouldThrow() throws Exception { + doThrow(new RuntimeException("Mocked exception")).when(contingencyPollerMock).waitForCompletion(); + + assertThrows(RuntimeException.class, () -> sensitivityResultPersistedWriter.waitForCompletion()); + + verify(contingencyPollerMock).waitForCompletion(); + } +} diff --git a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java deleted file mode 100644 index 437d8a23..00000000 --- a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Copyright (c) 2024, RTE (http://www.rte-france.com) - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package org.gridsuite.sensitivityanalysis.server.util; - -import com.powsybl.sensitivity.SensitivityAnalysisResult; -import org.gridsuite.sensitivityanalysis.server.service.SensitivityAnalysisResultService; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.platform.commons.logging.Logger; -import org.junit.platform.commons.logging.LoggerFactory; -import org.mockito.Mockito; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.stream.IntStream; - -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.Mockito.*; - -/** - * @author Joris Mancini - * @author Jon Schuhmacher - */ -class SensitivityResultWriterPersistedTest { - private static final Logger LOGGER = LoggerFactory.getLogger(SensitivityResultWriterPersistedTest.class); - - private final SensitivityAnalysisResultService analysisResultService = Mockito.mock(SensitivityAnalysisResultService.class); - - private SensitivityResultWriterPersisted resultWriterPersisted; - - @BeforeEach - void setUp() { - Mockito.reset(analysisResultService); - - doAnswer(answer -> { - try { - LOGGER.info(() -> "writing sensitivity results"); - Thread.sleep(200); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; - }).when(analysisResultService).writeSensitivityValues(any(), anyList()); - - doAnswer(answer -> { - try { - LOGGER.info(() -> "writing contingency statuses"); - Thread.sleep(200); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; - }).when(analysisResultService).writeContingenciesStatus(any(), anyList()); - - resultWriterPersisted = new SensitivityResultWriterPersisted(UUID.randomUUID(), analysisResultService); - } - - @AfterEach - void tearDown() { - resultWriterPersisted.interrupt(); - } - - private void testOperating(boolean started, boolean finished, boolean interrupted) throws InterruptedException { - if (started) { - resultWriterPersisted.start(); - } - if (interrupted) { - resultWriterPersisted.interrupt(); - } - resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS); - assertFalse(resultWriterPersisted.isConsumerFinished()); - if (finished) { - resultWriterPersisted.setQueueProducerFinished(); - } - // This test always uses Thread.sleep, even though when - // we start and either finish or interrupt we could just wait for isConsumerFinished() - // This is done to ensure that sleeping correctly exhibit the tested behavior in the cases - // where we can't wait for isConsumerFinished() - // This makes the test slow so don't overuse it. - // When we rewrite this test class to avoid real delays, we can improve this (for example - // using a countdownlatch to guarantee execution order around SensitivityAnalysisResultService - // method calls) - Thread.sleep(500); - assertEquals(started && (finished || interrupted), resultWriterPersisted.isConsumerFinished()); - int times = started && !interrupted ? 1 : 0; - verify(analysisResultService, times(times)).writeSensitivityValues(any(), anyList()); - verify(analysisResultService, times(times)).writeContingenciesStatus(any(), anyList()); - } - - @Test - void testNotOperatingIfNotStarted() throws InterruptedException { - testOperating(false, false, false); - } - - @Test - void testOperatingIfStartedNotFinished() throws InterruptedException { - testOperating(true, false, false); - } - - @Test - void testOperatingIfStartedInterrupted() throws InterruptedException { - testOperating(true, false, true); - } - - @Test - void testOperatingIfStartedFinished() throws InterruptedException { - testOperating(true, true, false); - } - - private void testWritingValue(Runnable writeOne, Consumer verify, boolean batched, boolean throwing) { - if (throwing) { - doThrow(new RuntimeException("Error persisting sensitivity values")) - .when(analysisResultService) - .writeSensitivityValues(any(), anyList()); - doThrow(new RuntimeException("Error persisting contingency statuses")) - .when(analysisResultService) - .writeContingenciesStatus(any(), anyList()); - } - resultWriterPersisted.start(); - if (batched) { - IntStream.range(0, SensitivityResultWriterPersisted.BUFFER_SIZE * 3 / 2).forEach(i -> writeOne.run()); - } else { - writeOne.run(); - } - resultWriterPersisted.setQueueProducerFinished(); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); - if (batched && !throwing) { - verify.accept(verify(analysisResultService, atLeast(2))); - verify.accept(verify(analysisResultService, atMost(3))); - } else { - verify.accept(verify(analysisResultService, times(1))); - } - } - - private void testWritingSensitivityValue(boolean batched, boolean throwing) { - testWritingValue( - () -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.), - verify -> verify.writeSensitivityValues(any(), anyList()), - batched, throwing - ); - } - - @Test - void testWritingOneSensitivityValue() { - testWritingSensitivityValue(false, false); - } - - @Test - void testWritingSeveralSensitivityValuesIsBatched() { - testWritingSensitivityValue(true, false); - } - - private void testWritingContingencyValue(boolean batched, boolean throwing) { - testWritingValue( - () -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS), - verify -> verify.writeContingenciesStatus(any(), anyList()), - batched, throwing - ); - } - - @Test - void testWritingOneContingencyStatus() { - testWritingContingencyValue(false, false); - } - - @Test - void testWritingSeveralContingencyStatusesIsBatched() { - testWritingContingencyValue(true, false); - } - - @Test - void testIsEndingIfErrorOccursPersistingSensitivityValues() { - testWritingSensitivityValue(false, true); - } - - @Test - void testIsEndingIfErrorOccursPersistingContingencyStatuses() { - testWritingContingencyValue(false, true); - } -}