From 8578107512210a826c0b3c89b71e01dedd38b800 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Sat, 28 Jun 2025 19:00:57 +0530 Subject: [PATCH 1/3] Fix global lock contention in DistClusterControllerStateModel caused by Optional.empty() singleton --- .../DistClusterControllerStateModel.java | 7 +- .../TestDistControllerStateModel.java | 73 +++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index 42f0e4b416..8710119d6d 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -40,6 +40,9 @@ public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyS protected Optional _controllerOpt = Optional.empty(); private final Set _enabledPipelineTypes; + // dedicated lock object to avoid cross-instance contention from Optional.empty() singleton + private final Object _controllerLock = new Object(); + public DistClusterControllerStateModel(String zkAddr) { this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK)); } @@ -62,7 +65,7 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte logger.info(controllerName + " becoming leader from standby for " + clusterName); - synchronized (_controllerOpt) { + synchronized (_controllerLock) { if (!_controllerOpt.isPresent()) { HelixManager newController = HelixManagerFactory .getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr); @@ -112,7 +115,7 @@ public String getStateModeInstanceDescription(String partitionName, String insta @Override public void reset() { - synchronized (_controllerOpt) { + synchronized (_controllerLock) { if (_controllerOpt.isPresent()) { logger.info("Disconnecting controller: " + _controllerOpt.get().getInstanceName() + " for " + _controllerOpt.get().getClusterName()); diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java index d07aab6b0b..3e61bd8541 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java @@ -27,10 +27,18 @@ import org.apache.helix.model.Message.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + public class TestDistControllerStateModel extends ZkUnitTestBase { private static Logger LOG = LoggerFactory.getLogger(TestDistControllerStateModel.class); @@ -124,4 +132,69 @@ public void testReset() { stateModel.reset(); } + /** + * Test to verify that different DistClusterControllerStateModel instances + * use separate lock objects, ensuring no cross-instance blocking. + */ + @Test() + public void testNoSharedLockAcrossInstances() throws Exception { + LOG.info("Testing that lock objects are not shared across DistClusterControllerStateModel instances"); + + // Verify different instances have different lock objects + DistClusterControllerStateModel instance1 = new DistClusterControllerStateModel(ZK_ADDR); + DistClusterControllerStateModel instance2 = new DistClusterControllerStateModel(ZK_ADDR); + + Field lockField = DistClusterControllerStateModel.class.getDeclaredField("_controllerLock"); + lockField.setAccessible(true); + + Object lock1 = lockField.get(instance1); + Object lock2 = lockField.get(instance2); + + Assert.assertNotNull(lock1, "First instance should have a lock object"); + Assert.assertNotNull(lock2, "Second instance should have a lock object"); + Assert.assertNotSame(lock1, lock2, "Different instances must have different lock objects"); + + // Verify concurrent access doesn't block across instances + final int NUM_INSTANCES = 10; + ExecutorService executor = Executors.newFixedThreadPool(NUM_INSTANCES); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch completionLatch = new CountDownLatch(NUM_INSTANCES); + AtomicInteger completedInstances = new AtomicInteger(0); + + for (int i = 0; i < NUM_INSTANCES; i++) { + final int instanceId = i; + final DistClusterControllerStateModel instance = new DistClusterControllerStateModel(ZK_ADDR); + + executor.submit(() -> { + try { + startLatch.await(); // wait for all threads to be ready + + // Simulate state transition operations that would use the lock + synchronized (lockField.get(instance)) { + // hold the lock here briefly to simulate real state transition work + Thread.sleep(100); + completedInstances.incrementAndGet(); + } + + } catch (Exception e) { + LOG.error("Instance {} failed during concurrent test", instanceId, e); + } finally { + completionLatch.countDown(); + } + }); + } + + // start all threads simultaneously + startLatch.countDown(); + + // All instances should complete within reasonable time since they don't block each other + boolean allCompleted = completionLatch.await(2, TimeUnit.SECONDS); + + executor.shutdown(); + executor.awaitTermination(2, TimeUnit.SECONDS); + + Assert.assertTrue(allCompleted, "All instances should complete without blocking each other"); + Assert.assertEquals(completedInstances.get(), NUM_INSTANCES, + "All instances should successfully complete their synchronized work"); + } } From 8f36be13a5959dfafc0f3945d6f19decf0ac2f9d Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Mon, 30 Jun 2025 14:57:59 +0530 Subject: [PATCH 2/3] Address review comments --- .../helix/participant/DistClusterControllerStateModel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index 8710119d6d..56120feead 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -37,7 +37,7 @@ @StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"}) public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel { private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class); - protected Optional _controllerOpt = Optional.empty(); + protected volatile Optional _controllerOpt = Optional.empty(); private final Set _enabledPipelineTypes; // dedicated lock object to avoid cross-instance contention from Optional.empty() singleton From 95e0a9f25eb46f1948a72a8ebb7f15dd1b1ac3b7 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Mon, 7 Jul 2025 22:20:40 +0530 Subject: [PATCH 3/3] Address review comments and add a new test for explictly testing two locks --- .../TestDistControllerStateModel.java | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java index 3e61bd8541..7abe363b19 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerStateModel.java @@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class TestDistControllerStateModel extends ZkUnitTestBase { @@ -188,7 +189,7 @@ public void testNoSharedLockAcrossInstances() throws Exception { startLatch.countDown(); // All instances should complete within reasonable time since they don't block each other - boolean allCompleted = completionLatch.await(2, TimeUnit.SECONDS); + boolean allCompleted = completionLatch.await(500, TimeUnit.MILLISECONDS); executor.shutdown(); executor.awaitTermination(2, TimeUnit.SECONDS); @@ -197,4 +198,71 @@ public void testNoSharedLockAcrossInstances() throws Exception { Assert.assertEquals(completedInstances.get(), NUM_INSTANCES, "All instances should successfully complete their synchronized work"); } + + /** + * Explicit test to verify that while one instance holds its lock indefinitely, + * another instance with a different lock can complete immediately. + */ + @Test() + public void testExplicitLockIndependence() throws Exception { + LOG.info("Testing explicit lock independence - one blocked, other should complete"); + + DistClusterControllerStateModel instance1 = new DistClusterControllerStateModel(ZK_ADDR); + DistClusterControllerStateModel instance2 = new DistClusterControllerStateModel(ZK_ADDR); + + Field lockField = DistClusterControllerStateModel.class.getDeclaredField("_controllerLock"); + lockField.setAccessible(true); + + Object lock1 = lockField.get(instance1); + Object lock2 = lockField.get(instance2); + + Assert.assertNotSame(lock1, lock2, "Different instances must have different lock objects"); + + CountDownLatch instance1Started = new CountDownLatch(1); + CountDownLatch instance2Completed = new CountDownLatch(1); + AtomicBoolean instance1Interrupted = new AtomicBoolean(false); + + // Thread 1: Hold lock1 for 5 seconds + Thread thread1 = new Thread(() -> { + try { + synchronized (lock1) { + instance1Started.countDown(); + Thread.sleep(5000); // Hold much longer than test timeout + } + } catch (InterruptedException e) { + instance1Interrupted.set(true); + Thread.currentThread().interrupt(); + } + }, "BlockingThread"); + + // Thread 2: Should complete immediately since it uses lock2 + Thread thread2 = new Thread(() -> { + try { + instance1Started.await(1000, TimeUnit.MILLISECONDS); // Wait for thread1 to acquire lock1 + synchronized (lock2) { + // Should acquire immediately since lock2 != lock1 + Thread.sleep(50); + instance2Completed.countDown(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, "NonBlockingThread"); + + thread1.start(); + thread2.start(); + + // Instance2 should complete immediately even though instance1 is blocked + boolean instance2CompletedQuickly = instance2Completed.await(200, TimeUnit.MILLISECONDS); + + // Clean up + thread1.interrupt(); + thread1.join(1000); + thread2.join(1000); + + Assert.assertTrue(instance2CompletedQuickly, + "Instance2 should complete immediately, proving locks are not shared"); + Assert.assertTrue(instance1Interrupted.get(), + "Instance1 should have been interrupted while holding its lock"); + } }