From 9052520ff9966206e1fe3b0e00d98545de42a87f Mon Sep 17 00:00:00 2001 From: HARPER Jon Date: Thu, 15 Jan 2026 11:10:26 +0100 Subject: [PATCH] Fix hangs in the finished computation thread when writer threads had exited early After the computation is finished (so nothing else will be added to the queues), the computation thread waits for queues to be empty to proceed (and even a little bit more that the last writes are done with the isWorking status but this not relevant here). At this point the queues are guaranteed to be emptied by the writer threads, but only if they are still running. The writer threads cannot exit early, unless they throw an exception. They did clear their queue to make it empty if they threw an exception, but only once and then exited. If this happened before the computation thread was finished adding things to the queue, then the computation thread readds elements to the queue and nothing clears/drains because the writer thread has exited and so it would never be emptied and consequently the computation thread hangs. Solution: => Don't use empty queue as the criteria for progress in computation thread, use writer thread finish status => In writer threads, if the computation thread is finished adding values, exit when the queue is empty (exiting will trigger progress in the computation thread) instead of polling for new elements. Note: when the computation thread is not finished producing, we still poll for new elements when the queue is empty to avoid exiting early. Fix the tests: missing delays to make them independant of thread scheduling Also The following line does not do any delay await().atLeast(500, TimeUnit.MILLISECONDS) we must use the good old Thread.Sleep() instead NOTE: the ephemeral "isWorking" status that tracks true/false exactly if the writer runnable is executing or not is not needed anymore so it is removed. We can readd it if we ever need it. NOTE: this is code is dangerous and should be refactor with safer idioms. Ideas: - only use interrupts in exceptional cases, not on the nominal code path after producer and writers are finished normaly - don't use threads manually (instead use executor service) - don't poll (while loops to detect progress) - move all necessary statements in finally/whencomplete blocks to correctly handle any exceptions - rewrite tests to not depend on real time durations - ... more ? --- .../SensitivityAnalysisWorkerService.java | 3 +- .../SensitivityResultWriterPersisted.java | 43 ++++++++++-------- .../SensitivityResultWriterPersistedTest.java | 44 +++++++++++-------- 3 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java index 814d3711..a466719e 100644 --- a/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/service/SensitivityAnalysisWorkerService.java @@ -147,8 +147,9 @@ protected CompletableFuture getCompletableFuture(SensitivityAnalysisRun sensitivityAnalysisParameters, executionService.getComputationManager(), runContext.getReportNode()) + .whenComplete((unused1, unused2) -> writer.setQueueProducerFinished()) .thenApply(unused -> { - while (writer.isWorking()) { + while (!writer.isConsumerFinished()) { // Nothing to do } writer.interrupt(); diff --git a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java index 89c27224..39daf0c3 100644 --- a/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java +++ b/src/main/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersisted.java @@ -38,9 +38,11 @@ public class SensitivityResultWriterPersisted implements SensitivityResultWriter private final Thread contingencyResultsThread; - private final AtomicBoolean sensitivityValuesWorking; + private final AtomicBoolean sensitivityValuesConsumerFinished; - private final AtomicBoolean contingencyResultsWorking; + private final AtomicBoolean contingencyResultsConsumerFinished; + + private final AtomicBoolean queueProducerFinished; private UUID resultUuid; @@ -50,8 +52,9 @@ public SensitivityResultWriterPersisted(UUID resultUuid, SensitivityAnalysisResu contingencyResultsQueue = new LinkedBlockingQueue<>(); sensitivityValuesThread = new Thread(sensitivityValuesBatchedHandling(), "sensitivityWriterThread"); contingencyResultsThread = new Thread(contingencyResultsBatchedHandling(), "contingencyWriterThread"); - sensitivityValuesWorking = new AtomicBoolean(false); - contingencyResultsWorking = new AtomicBoolean(false); + sensitivityValuesConsumerFinished = new AtomicBoolean(false); + contingencyResultsConsumerFinished = new AtomicBoolean(false); + queueProducerFinished = new AtomicBoolean(false); this.resultUuid = resultUuid; } @@ -65,11 +68,12 @@ public void interrupt() { contingencyResultsThread.interrupt(); } - public boolean isWorking() { - return !sensitivityValuesQueue.isEmpty() - || !contingencyResultsQueue.isEmpty() - || sensitivityValuesWorking.get() - || contingencyResultsWorking.get(); + public boolean isConsumerFinished() { + return sensitivityValuesConsumerFinished.get() && contingencyResultsConsumerFinished.get(); + } + + public void setQueueProducerFinished() { + queueProducerFinished.set(true); } @Override @@ -88,7 +92,7 @@ public void writeContingencyStatus(int contingencyIndex, SensitivityAnalysisResu private Runnable sensitivityValuesBatchedHandling() { return () -> run( sensitivityValuesThread, - sensitivityValuesWorking, + sensitivityValuesConsumerFinished, sensitivityValuesQueue, sensitivityAnalysisResultService::writeSensitivityValues ); @@ -97,7 +101,7 @@ private Runnable sensitivityValuesBatchedHandling() { private Runnable contingencyResultsBatchedHandling() { return () -> run( contingencyResultsThread, - contingencyResultsWorking, + contingencyResultsConsumerFinished, contingencyResultsQueue, sensitivityAnalysisResultService::writeContingenciesStatus ); @@ -107,19 +111,22 @@ private interface BatchedRunnable { void run(UUID resultUuid, List tasks); } - private void run(Thread thread, AtomicBoolean isWorking, BlockingQueue queue, BatchedRunnable runnable) { + private void run(Thread thread, AtomicBoolean isFinished, BlockingQueue queue, BatchedRunnable runnable) { try { - while (!thread.isInterrupted()) { + // Note: checking isInterrupted here is a bit redundant with Thread.sleep below which + // also checks it and throws to exit the loop, but it has the advantage of making the + // code safer if we ever remove such a blocking call (ie ones throwing when interrupted). + // Also a minor advantage is that we stop the loop one iteration earlier (drain + run) + // with the current code that only blocks if the queue was empty (drained 0 elements) + while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) { List tasks = new ArrayList<>(BUFFER_SIZE); - while (queue.drainTo(tasks, BUFFER_SIZE) == 0) { + while (!(queue.drainTo(tasks, BUFFER_SIZE) > 0 || queueProducerFinished.get() && queue.isEmpty())) { Thread.sleep(100); } LOGGER.debug("{} - Remaining {} elements in the queue", thread.getName(), queue.size()); if (!tasks.isEmpty()) { LOGGER.debug("{} - Treating {} elements in the batch", thread.getName(), tasks.size()); - isWorking.set(true); runnable.run(resultUuid, tasks); - isWorking.set(false); } } } catch (InterruptedException e) { @@ -127,8 +134,8 @@ private void run(Thread thread, AtomicBoolean isWorking, BlockingQueue qu thread.interrupt(); } catch (Exception e) { LOGGER.error("Unexpected error occurred during persisting results", e); - queue.clear(); - isWorking.set(false); + } finally { + isFinished.set(true); } } } diff --git a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java index 6223a5fe..4d827e7a 100644 --- a/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java +++ b/src/test/java/org/gridsuite/sensitivityanalysis/server/util/SensitivityResultWriterPersistedTest.java @@ -69,10 +69,13 @@ void tearDown() { } @Test - void testNotOperatingIfNotStarted() { + void testNotOperatingIfNotStarted() throws InterruptedException { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - assertTrue(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); + resultWriterPersisted.setQueueProducerFinished(); + Thread.sleep(500); + assertFalse(resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); } @@ -80,11 +83,11 @@ void testNotOperatingIfNotStarted() { void testWritingOneSensitivityValue() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - await().atMost(500, TimeUnit.MILLISECONDS).until(resultWriterPersisted::isWorking); - await().atMost(1, TimeUnit.SECONDS).until(() -> !resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); + await().atMost(1, TimeUnit.SECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(1)).writeSensitivityValues(any(), anyList()); } @@ -92,12 +95,12 @@ void testWritingOneSensitivityValue() { void testWritingSeveralSensitivityValuesIsBatched() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.)); - assertTrue(resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(2)).writeSensitivityValues(any(), anyList()); } @@ -105,12 +108,12 @@ void testWritingSeveralSensitivityValuesIsBatched() { void testWritingOneContingencyStatus() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS); - assertTrue(resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); - await().atMost(500, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + await().atMost(500, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(1)).writeContingenciesStatus(any(), anyList()); } @@ -118,22 +121,23 @@ void testWritingOneContingencyStatus() { void testWritingSeveralContingencyStatusesIsBatched() { verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); - assertFalse(resultWriterPersisted.isWorking()); + assertFalse(resultWriterPersisted.isConsumerFinished()); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS)); - assertTrue(resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, atLeast(2)).writeContingenciesStatus(any(), anyList()); } @Test - void testNotOperatingAfterInterruption() { + void testNotOperatingAfterInterruption() throws InterruptedException { resultWriterPersisted.start(); resultWriterPersisted.interrupt(); - + Thread.sleep(500); resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.); - await().atLeast(500, TimeUnit.MILLISECONDS); + Thread.sleep(500); + assertTrue(resultWriterPersisted.isConsumerFinished()); verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList()); } @@ -144,7 +148,8 @@ void testIsEndingIfErrorOccursPersistingSensitivityValues() { .writeSensitivityValues(any(), anyList()); resultWriterPersisted.start(); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.)); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); } @Test @@ -154,6 +159,7 @@ void testIsEndingIfErrorOccursPersistingContingencyStatuses() { .writeContingenciesStatus(any(), anyList()); resultWriterPersisted.start(); IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS)); - await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking()); + resultWriterPersisted.setQueueProducerFinished(); + await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished()); } }