Conversation
|
before this change, on main, to reproduce the problem in unit tests, duplicate the last two lines of testIsEndingIfErrorOccursPersistingSensitivityValues and testIsEndingIfErrorOccursPersistingContingencyStatuses in SensitivityResultWriterPersistedTest: IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS));
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());
IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS));
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());On main, with this code, the first await allows the test to wait for the writer thread to get the exception, and it clears the remaining elements of the queue. Then re-add some elements to the queue and obviously nothing ever clears it or drains it again. The key point is that this is acutally the behavior of SensitivityAnalysisWorkerService: CompletableFuture<Boolean> future =
sensitivityAnalysisRunner.runAsync(...) //produces all values, writer threads starts consumming
.thenApply(unused -> { // all values produced, now wait for consummers. Infinite if they died.
while (writer.isWorking()) { ... }
}On this branch, there's no need to modify the test, a single batch write is enough to reproduce because there is no behavior difference (apart from logging) when exiting the threads normally (queue empty and producer finished) or exceptionally (interrupt or exception in saves) A better test would be to use both SensitivityAnalysisWorkerService and SensitivityResultWriterPersisted together but may be overkill |
…exited early
After the computation is finished (so nothing else will be added to the
queues), the computation thread waits for queues to be empty to proceed (and
even a little bit more that the last writes are done with the isWorking status
but this not relevant here). At this point the queues are guaranteed to be
emptied by the writer threads, but only if they are still running. The writer
threads cannot exit early, unless they throw an exception. They did clear
their queue to make it empty if they threw an exception, but only once and then
exited. If this happened before the computation thread was finished adding
things to the queue, then the computation thread readds elements to the queue
and nothing clears/drains because the writer thread has exited and so it would
never be emptied and consequently the computation thread hangs.
Solution:
=> Don't use empty queue as the criteria for progress in computation thread, use
writer thread finish status
=> In writer threads, if the computation thread is finished adding values, exit
when the queue is empty (exiting will trigger progress in the computation thread)
instead of polling for new elements. Note: when the computation thread is
not finished producing, we still poll for new elements when the queue is empty to avoid
exiting early.
Fix the tests: missing delays to make them independant of thread scheduling
Also The following line does not do any delay
await().atLeast(500, TimeUnit.MILLISECONDS)
we must use the good old Thread.Sleep() instead
NOTE: the ephemeral "isWorking" status that tracks true/false exactly if the
writer runnable is executing or not is not needed anymore so it is removed. We
can readd it if we ever need it.
NOTE: this is code is dangerous and should be refactor with safer idioms. Ideas:
- only use interrupts in exceptional cases, not on the nominal code path after
producer and writers are finished normaly
- don't use threads manually (instead use executor service)
- don't poll (while loops to detect progress)
- move all necessary statements in finally/whencomplete blocks to correctly
handle any exceptions
- rewrite tests to not depend on real time durations
- ... more ?
| while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) { | ||
| List<T> tasks = new ArrayList<>(BUFFER_SIZE); | ||
| while (queue.drainTo(tasks, BUFFER_SIZE) == 0) { | ||
| while (!(queue.drainTo(tasks, BUFFER_SIZE) > 0 || queueProducerFinished.get() && queue.isEmpty())) { |
There was a problem hiding this comment.
using queue::poll + queue::drainTo will remove the Thread::sleep.
Ok for now, will be fixed in the future ; #153 (review)
antoinebhs
left a comment
There was a problem hiding this comment.
Tested regression OK
Would be nice to keep track of suggestions to improve robustness in a separate issue
Would also be nice to understand what's the root cause of the exception that trigger the infinite wait
There was a problem hiding this comment.
Pull request overview
This PR fixes a deadlock issue where the computation thread could hang indefinitely when writer threads exited early due to exceptions. The core problem was that the computation thread waited for queues to be empty as a signal that writers finished, but if writers threw exceptions and exited before all items were queued, the computation thread would hang waiting for queues that would never be emptied.
Changes:
- Replaced queue-emptiness checks with explicit consumer finish status tracking
- Added producer-finished signaling to allow consumers to exit gracefully when queues are empty
- Removed the ephemeral
isWorkingstatus tracking mechanism
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| SensitivityResultWriterPersisted.java | Implemented new finish-status based synchronization replacing queue-emptiness checks and isWorking flags |
| SensitivityAnalysisWorkerService.java | Updated computation thread to signal producer completion and check consumer finish status instead of isWorking |
| SensitivityResultWriterPersistedTest.java | Updated all tests to use new isConsumerFinished() method and setQueueProducerFinished() signaling |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .thenApply(unused -> { | ||
| while (writer.isWorking()) { | ||
| while (!writer.isConsumerFinished()) { | ||
| // Nothing to do |
There was a problem hiding this comment.
This is a busy-wait loop that continuously polls the consumer status, consuming CPU cycles unnecessarily. Consider adding a small sleep interval (e.g., Thread.sleep(100)) inside the loop to reduce CPU usage, or better yet, use a proper synchronization mechanism like CountDownLatch or CompletableFuture to await completion.
| // Nothing to do | |
| try { | |
| Thread.sleep(100); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| break; | |
| } |
| // code safer if we ever remove such a blocking call (ie ones throwing when interrupted). | ||
| // Also a minor advantage is that we stop the loop one iteration earlier (drain + run) | ||
| // with the current code that only blocks if the queue was empty (drained 0 elements) | ||
| while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) { |
There was a problem hiding this comment.
The complex boolean expression lacks parentheses to clarify operator precedence. While the && operator has higher precedence than ||, adding explicit parentheses would improve readability: while (!(thread.isInterrupted() || (queueProducerFinished.get() && queue.isEmpty())))
| while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) { | ||
| List<T> tasks = new ArrayList<>(BUFFER_SIZE); | ||
| while (queue.drainTo(tasks, BUFFER_SIZE) == 0) { | ||
| while (!(queue.drainTo(tasks, BUFFER_SIZE) > 0 || queueProducerFinished.get() && queue.isEmpty())) { |
There was a problem hiding this comment.
This complex boolean expression also lacks parentheses for clarity. Consider adding explicit parentheses: while (!((queue.drainTo(tasks, BUFFER_SIZE) > 0) || (queueProducerFinished.get() && queue.isEmpty())))
|



PR Summary
After the computation is finished (so nothing else will be added to the queues), the computation thread waits for queues to be empty to proceed (and even a little bit more that the last writes are done with the isWorking status but this not relevant here). At this point the queues are guaranteed to be emptied by the writer threads, but only if they are still running. The writer threads cannot exit early, unless they throw an exception. They did clear their queue to make it empty if they threw an exception, but only once and then exited. If this happened before the computation thread was finished adding things to the queue, then the computation thread readds elements to the queue and nothing clears/drains because the writer thread has exited and so it would never be emptied and consequently the computation thread hangs.
Solution:
=> Don't use empty queue as the criteria for progress in computation thread, use
writer thread finish status
=> In writer threads, if the computation thread is finished adding values, exit
when the queue is empty (exiting will trigger progress in the computation thread)
instead of polling for new elements. Note: when the computation thread is
not finished producing, we still poll for new elements when the queue is empty to avoid
exiting early.
NOTE: the ephemeral "isWorking" status that tracks true/false exactly if the writer runnable is executing or not is not needed anymore so it is removed. We can readd it if we ever need it.
NOTE: this is code is dangerous and should be refactor with safer idioms. Ideas: