Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ protected CompletableFuture<Boolean> getCompletableFuture(SensitivityAnalysisRun
sensitivityAnalysisParameters,
executionService.getComputationManager(),
runContext.getReportNode())
.whenComplete((unused1, unused2) -> writer.setQueueProducerFinished())
.thenApply(unused -> {
while (writer.isWorking()) {
while (!writer.isConsumerFinished()) {
// Nothing to do
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a busy-wait loop that continuously polls the consumer status, consuming CPU cycles unnecessarily. Consider adding a small sleep interval (e.g., Thread.sleep(100)) inside the loop to reduce CPU usage, or better yet, use a proper synchronization mechanism like CountDownLatch or CompletableFuture to await completion.

Suggested change
// Nothing to do
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}

Copilot uses AI. Check for mistakes.
}
writer.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public class SensitivityResultWriterPersisted implements SensitivityResultWriter

private final Thread contingencyResultsThread;

private final AtomicBoolean sensitivityValuesWorking;
private final AtomicBoolean sensitivityValuesConsumerFinished;

private final AtomicBoolean contingencyResultsWorking;
private final AtomicBoolean contingencyResultsConsumerFinished;

private final AtomicBoolean queueProducerFinished;

private UUID resultUuid;

Expand All @@ -50,8 +52,9 @@ public SensitivityResultWriterPersisted(UUID resultUuid, SensitivityAnalysisResu
contingencyResultsQueue = new LinkedBlockingQueue<>();
sensitivityValuesThread = new Thread(sensitivityValuesBatchedHandling(), "sensitivityWriterThread");
contingencyResultsThread = new Thread(contingencyResultsBatchedHandling(), "contingencyWriterThread");
sensitivityValuesWorking = new AtomicBoolean(false);
contingencyResultsWorking = new AtomicBoolean(false);
sensitivityValuesConsumerFinished = new AtomicBoolean(false);
contingencyResultsConsumerFinished = new AtomicBoolean(false);
queueProducerFinished = new AtomicBoolean(false);
this.resultUuid = resultUuid;
}

Expand All @@ -65,11 +68,12 @@ public void interrupt() {
contingencyResultsThread.interrupt();
}

public boolean isWorking() {
return !sensitivityValuesQueue.isEmpty()
|| !contingencyResultsQueue.isEmpty()
|| sensitivityValuesWorking.get()
|| contingencyResultsWorking.get();
public boolean isConsumerFinished() {
return sensitivityValuesConsumerFinished.get() && contingencyResultsConsumerFinished.get();
}

public void setQueueProducerFinished() {
queueProducerFinished.set(true);
}

@Override
Expand All @@ -88,7 +92,7 @@ public void writeContingencyStatus(int contingencyIndex, SensitivityAnalysisResu
private Runnable sensitivityValuesBatchedHandling() {
return () -> run(
sensitivityValuesThread,
sensitivityValuesWorking,
sensitivityValuesConsumerFinished,
sensitivityValuesQueue,
sensitivityAnalysisResultService::writeSensitivityValues
);
Expand All @@ -97,7 +101,7 @@ private Runnable sensitivityValuesBatchedHandling() {
private Runnable contingencyResultsBatchedHandling() {
return () -> run(
contingencyResultsThread,
contingencyResultsWorking,
contingencyResultsConsumerFinished,
contingencyResultsQueue,
sensitivityAnalysisResultService::writeContingenciesStatus
);
Expand All @@ -107,28 +111,31 @@ private interface BatchedRunnable<T> {
void run(UUID resultUuid, List<T> tasks);
}

private <T> void run(Thread thread, AtomicBoolean isWorking, BlockingQueue<T> queue, BatchedRunnable<T> runnable) {
private <T> void run(Thread thread, AtomicBoolean isFinished, BlockingQueue<T> queue, BatchedRunnable<T> runnable) {
try {
while (!thread.isInterrupted()) {
// Note: checking isInterrupted here is a bit redundant with Thread.sleep below which
// also checks it and throws to exit the loop, but it has the advantage of making the
// code safer if we ever remove such a blocking call (ie ones throwing when interrupted).
// Also a minor advantage is that we stop the loop one iteration earlier (drain + run)
// with the current code that only blocks if the queue was empty (drained 0 elements)
while (!(thread.isInterrupted() || queueProducerFinished.get() && queue.isEmpty())) {
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The complex boolean expression lacks parentheses to clarify operator precedence. While the && operator has higher precedence than ||, adding explicit parentheses would improve readability: while (!(thread.isInterrupted() || (queueProducerFinished.get() && queue.isEmpty())))

Copilot uses AI. Check for mistakes.
List<T> tasks = new ArrayList<>(BUFFER_SIZE);
while (queue.drainTo(tasks, BUFFER_SIZE) == 0) {
while (!(queue.drainTo(tasks, BUFFER_SIZE) > 0 || queueProducerFinished.get() && queue.isEmpty())) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using queue::poll + queue::drainTo will remove the Thread::sleep.
Ok for now, will be fixed in the future ; #153 (review)

Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This complex boolean expression also lacks parentheses for clarity. Consider adding explicit parentheses: while (!((queue.drainTo(tasks, BUFFER_SIZE) > 0) || (queueProducerFinished.get() && queue.isEmpty())))

Copilot uses AI. Check for mistakes.
Thread.sleep(100);
}
LOGGER.debug("{} - Remaining {} elements in the queue", thread.getName(), queue.size());
if (!tasks.isEmpty()) {
LOGGER.debug("{} - Treating {} elements in the batch", thread.getName(), tasks.size());
isWorking.set(true);
runnable.run(resultUuid, tasks);
isWorking.set(false);
}
}
} catch (InterruptedException e) {
LOGGER.debug("Thread {} has been interrupted", thread.getName());
thread.interrupt();
} catch (Exception e) {
LOGGER.error("Unexpected error occurred during persisting results", e);
queue.clear();
isWorking.set(false);
} finally {
isFinished.set(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,71 +69,75 @@ void tearDown() {
}

@Test
void testNotOperatingIfNotStarted() {
void testNotOperatingIfNotStarted() throws InterruptedException {
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.);
assertTrue(resultWriterPersisted.isWorking());
assertFalse(resultWriterPersisted.isConsumerFinished());
resultWriterPersisted.setQueueProducerFinished();
Thread.sleep(500);
assertFalse(resultWriterPersisted.isConsumerFinished());
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
}

@Test
void testWritingOneSensitivityValue() {
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
resultWriterPersisted.start();
assertFalse(resultWriterPersisted.isWorking());
assertFalse(resultWriterPersisted.isConsumerFinished());

resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.);
await().atMost(500, TimeUnit.MILLISECONDS).until(resultWriterPersisted::isWorking);
await().atMost(1, TimeUnit.SECONDS).until(() -> !resultWriterPersisted.isWorking());
resultWriterPersisted.setQueueProducerFinished();
await().atMost(1, TimeUnit.SECONDS).until(() -> resultWriterPersisted.isConsumerFinished());
verify(analysisResultService, atLeast(1)).writeSensitivityValues(any(), anyList());
}

@Test
void testWritingSeveralSensitivityValuesIsBatched() {
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
resultWriterPersisted.start();
assertFalse(resultWriterPersisted.isWorking());
assertFalse(resultWriterPersisted.isConsumerFinished());

IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.));
assertTrue(resultWriterPersisted.isWorking());
resultWriterPersisted.setQueueProducerFinished();

await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished());
verify(analysisResultService, atLeast(2)).writeSensitivityValues(any(), anyList());
}

@Test
void testWritingOneContingencyStatus() {
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
resultWriterPersisted.start();
assertFalse(resultWriterPersisted.isWorking());
assertFalse(resultWriterPersisted.isConsumerFinished());

resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS);
assertTrue(resultWriterPersisted.isWorking());
resultWriterPersisted.setQueueProducerFinished();

await().atMost(500, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());
await().atMost(500, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished());
verify(analysisResultService, atLeast(1)).writeContingenciesStatus(any(), anyList());
}

@Test
void testWritingSeveralContingencyStatusesIsBatched() {
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
resultWriterPersisted.start();
assertFalse(resultWriterPersisted.isWorking());
assertFalse(resultWriterPersisted.isConsumerFinished());

IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS));
assertTrue(resultWriterPersisted.isWorking());
resultWriterPersisted.setQueueProducerFinished();

await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished());
verify(analysisResultService, atLeast(2)).writeContingenciesStatus(any(), anyList());
}

@Test
void testNotOperatingAfterInterruption() {
void testNotOperatingAfterInterruption() throws InterruptedException {
resultWriterPersisted.start();
resultWriterPersisted.interrupt();

Thread.sleep(500);
resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.);
await().atLeast(500, TimeUnit.MILLISECONDS);
Thread.sleep(500);
assertTrue(resultWriterPersisted.isConsumerFinished());
verify(analysisResultService, times(0)).writeSensitivityValues(any(), anyList());
}

Expand All @@ -144,7 +148,8 @@ void testIsEndingIfErrorOccursPersistingSensitivityValues() {
.writeSensitivityValues(any(), anyList());
resultWriterPersisted.start();
IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeSensitivityValue(0, 0, 0., 0.));
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());
resultWriterPersisted.setQueueProducerFinished();
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished());
}

@Test
Expand All @@ -154,6 +159,7 @@ void testIsEndingIfErrorOccursPersistingContingencyStatuses() {
.writeContingenciesStatus(any(), anyList());
resultWriterPersisted.start();
IntStream.range(0, 1000).forEach(i -> resultWriterPersisted.writeContingencyStatus(0, SensitivityAnalysisResult.Status.SUCCESS));
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> !resultWriterPersisted.isWorking());
resultWriterPersisted.setQueueProducerFinished();
await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> resultWriterPersisted.isConsumerFinished());
}
}