diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java index 814d3711..a466719e 100644 --- a/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java @@ -147,8 +147,9 @@ protected CompletableFuture getCompletableFuture(SensitivityAnalysisRun sensitivityAnalysisParameters, executionService.getComputationManager(), runContext.getReportNode()) + .whenComplete((unused1, unused2) -> writer.setQueueProducerFinished()) .thenApply(unused -> { - while (writer.isWorking()) { + while (!writer.isConsumerFinished()) { // Nothing to do } writer.interrupt(); diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java index 89c27224..39daf0c3 100644 --- a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java @@ -38,9 +38,11 @@ public class SensitivityResultWriterPersisted implements SensitivityResultWriter private final Thread contingencyResultsThread; - private final AtomicBoolean sensitivityValuesWorking; + private final AtomicBoolean sensitivityValuesConsumerFinished; - private final AtomicBoolean contingencyResultsWorking; + private final AtomicBoolean contingencyResultsConsumerFinished; + + private final AtomicBoolean queueProducerFinished; private UUID resultUuid; @@ -50,8 +52,9 @@ public SensitivityResultWriterPersisted(UUID resultUuid, SensitivityAnalysisResu contingencyResultsQueue = new LinkedBlockingQueue<>(); sensitivityValuesThread = new Thread(sensitivityValuesBatchedHandling(), "sensitivityWriterThread"); contingencyResultsThread = new Thread(contingencyResultsBatchedHandling(), "contingencyWriterThread"); - sensitivityValuesWorking = new AtomicBoolean(false); - contingencyResultsWorking = new AtomicBoolean(false); + sensitivityValuesConsumerFinished = new AtomicBoolean(false); + contingencyResultsConsumerFinished = new AtomicBoolean(false); + queueProducerFinished = new AtomicBoolean(false); this.resultUuid = resultUuid; } @@ -65,11 +68,12 @@ public void interrupt() { contingencyResultsThread.interrupt(); } - public boolean isWorking() { - return !sensitivityValuesQueue.isEmpty() - || !contingencyResultsQueue.isEmpty() - || sensitivityValuesWorking.get() - || contingencyResultsWorking.get(); + public boolean isConsumerFinished() { + return sensitivityValuesConsumerFinished.get() && contingencyResultsConsumerFinished.get(); + } + + public void setQueueProducerFinished() { + queueProducerFinished.set(true); } @Override @@ -88,7 +92,7 @@ public void writeContingencyStatus(int contingencyIndex, SensitivityAnalysisResu private Runnable sensitivityValuesBatchedHandling() { return () -> run( sensitivityValuesThread, - sensitivityValuesWorking, + sensitivityValuesConsumerFinished, sensitivityValuesQueue, sensitivityAnalysisResultService::writeSensitivityValues ); @@ -97,7 +101,7 @@ private Runnable sensitivityValuesBatchedHandling() { private Runnable contingencyResultsBatchedHandling() { return () -> run( contingencyResultsThread, - contingencyResultsWorking, + contingencyResultsConsumerFinished, contingencyResultsQueue, sensitivityAnalysisResultService::writeContingenciesStatus ); @@ -107,19 +111,22 @@ private interface BatchedRunnable { void run(UUID resultUuid, List tasks); } - private void run(Thread thread, AtomicBoolean isWorking, BlockingQueue queue, BatchedRunnable runnable) { + private void run(Thread thread, AtomicBoolean isFinished, BlockingQueue queue, BatchedRunnable runnable) { try { - while (!thread.isInterrupted()) { + // Note: checking isInterrupted here is a bit redundant with Thread.sleep below which + // also checks it and throws to exit the loop, but it has the advantage of making the + // code safer if we ever remove such a blocking call (ie ones throwing when interrupted). + // Also a minor advantage is that we stop the loop one iteration earlier (drain + run) + // with the current code that only blocks if the queue was empty (drained 0 elements) + while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) { List tasks = new ArrayList<>(BUFFER_SIZE); - while (queue.drainTo(tasks, BUFFER_SIZE) == 0) { + while (!(queue.drainTo(tasks, BUFFER_SIZE) > 0 || queueProducerFinished.get() && queue.isEmpty())) { Thread.sleep(100); } LOGGER.debug("{} - Remaining {} elements in the queue", thread.getName(), queue.size()); if (!tasks.isEmpty()) { LOGGER.debug("{} - Treating {} elements in the batch", thread.getName(), tasks.size()); - isWorking.set(true); runnable.run(resultUuid, tasks); - isWorking.set(false); } } } catch (InterruptedException e) { @@ -127,8 +134,8 @@ private void run(Thread thread, AtomicBoolean isWorking, BlockingQueue qu thread.interrupt(); } catch (Exception e) { LOGGER.error("Unexpected error occurred during persisting results", e); - queue.clear(); - isWorking.set(false); + } finally { + isFinished.set(true); } } } diff --git a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java index 6223a5fe..4d827e7a 100644 --- a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java +++ b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java @@ -69,10 +69,13 @@ void tearDown() { } @Test - void testNotOperatingIfNotStarted() { + void testNotOperatingIfNotStarted() throws InterruptedException { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - assertTrue(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); + resultWriterPersisted.setQueueProducerFinished(); + Thread.sleep(500); + assertFalse(resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); } @@ -80,11 +83,11 @@ void testNotOperatingIfNotStarted() { void testWritingOneSensitivityValue() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - await().atMost(500, TimeUnit.MILLISECONDS).until(resultWriterPersisted::isWorking); - await().atMost(1, TimeUnit.SECONDS).until(() -> !resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); + await().atMost(1, TimeUnit.SECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(1)).writeSensitivityValues(any(), anyList()); } @@ -92,12 +95,12 @@ void testWritingOneSensitivityValue() { void testWritingSeveralSensitivityValuesIsBatched() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.)); - assertTrue(resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(2)).writeSensitivityValues(any(), anyList()); } @@ -105,12 +108,12 @@ void testWritingSeveralSensitivityValuesIsBatched() { void testWritingOneContingencyStatus() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS); - assertTrue(resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); - await().atMost(500, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + await().atMost(500, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(1)).writeContingenciesStatus(any(), anyList()); } @@ -118,22 +121,23 @@ void testWritingOneContingencyStatus() { void testWritingSeveralContingencyStatusesIsBatched() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS)); - assertTrue(resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(2)).writeContingenciesStatus(any(), anyList()); } @Test - void testNotOperatingAfterInterruption() { + void testNotOperatingAfterInterruption() throws InterruptedException { resultWriterPersisted.start(); resultWriterPersisted.interrupt(); - + Thread.sleep(500); resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - await().atLeast(500, TimeUnit.MILLISECONDS); + Thread.sleep(500); + assertTrue(resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); } @@ -144,7 +148,8 @@ void testIsEndingIfErrorOccursPersistingSensitivityValues() { .writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.)); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); } @Test @@ -154,6 +159,7 @@ void testIsEndingIfErrorOccursPersistingContingencyStatuses() { .writeContingenciesStatus(any(), anyList()); resultWriterPersisted.start(); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS)); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); } }