From 86f7b9f2fd7fb132eeb2553aada20d37ab72c1e1 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Wed, 30 Apr 2025 00:02:24 +0530 Subject: [PATCH 1/6] Preserve input instance order in zone_based stoppable check --- .../MaintenanceManagementService.java | 26 +++++-- .../StoppableInstancesSelector.java | 67 ++++++++++++++----- .../resources/helix/InstancesAccessor.java | 7 ++ .../rest/server/TestInstancesAccessor.java | 67 +++++++++++++++++++ 4 files changed, 144 insertions(+), 23 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index 8d5d025b3c..f5520c70f7 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -374,16 +375,23 @@ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanc public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent) throws IOException { return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, - Collections.emptySet()); + Collections.emptySet(), false); } public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent, Set toBeStoppedInstances) throws IOException { - Map finalStoppableChecks = new HashMap<>(); + return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, toBeStoppedInstances, + false); + } + + public Map batchGetInstancesStoppableChecks(String clusterId, + List instances, String jsonContent, Set toBeStoppedInstances, + boolean preserveOrder) throws IOException { + Map finalStoppableChecks = new LinkedHashMap<>(); // helix instance check. List instancesForCustomInstanceLevelChecks = batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, - toBeStoppedInstances); + toBeStoppedInstances, preserveOrder); // custom check, includes partition check. batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks, toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent)); @@ -476,12 +484,16 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl private List batchHelixInstanceStoppableCheck(String clusterId, Collection instances, Map finalStoppableChecks, - Set toBeStoppedInstances) { + Set toBeStoppedInstances, boolean preserveOrder) { // Perform all but min_active replicas check in parallel Map> helixInstanceChecks = instances.stream().collect( - Collectors.toMap(Function.identity(), instance -> POOL.submit( - () -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)))); + Collectors.toMap( + Function.identity(), + instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)), + (existing, replacement) -> existing, + preserveOrder ? LinkedHashMap::new : HashMap::new + )); // Perform min_active replicas check sequentially addMinActiveReplicaChecks(clusterId, helixInstanceChecks, toBeStoppedInstances); @@ -618,7 +630,7 @@ private Map batchInstanceHealthCheck( // this is helix own check instancesForNext = batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, - Collections.emptySet()); + Collections.emptySet(), false); } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) { // custom check, includes custom Instance check and partition check. instancesForNext = diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 8916991008..13f94e140e 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -42,6 +42,16 @@ import org.apache.helix.rest.server.json.instance.StoppableCheck; import org.apache.helix.rest.server.resources.helix.InstancesAccessor; +/** + * This class is used to select stoppable instances based on different selection criteria. + * Selection criteria include: + * 1. Zone-based selection - Select instances from a single zone + * 2. Cross-zone selection - Select instances across multiple zones + * 3. Non-zone-based selection - Select instances regardless of zone + * + * For zone-based selection, instances can be ordered either lexicographically (default) or + * by preserving the original input order when preserveOrder is set to true. + */ public class StoppableInstancesSelector { // This type does not belong to real HealthCheck failed reason. Also, if we add this type // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl @@ -53,16 +63,19 @@ public class StoppableInstancesSelector { private final MaintenanceManagementService _maintenanceService; private final ClusterTopology _clusterTopology; private final ZKHelixDataAccessor _dataAccessor; + private final boolean _preserveOrder; private StoppableInstancesSelector(String clusterId, List orderOfZone, String customizedInput, MaintenanceManagementService maintenanceService, - ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) { + ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor, + boolean preserveOrder) { _clusterId = clusterId; _orderOfZone = orderOfZone; _customizedInput = customizedInput; _maintenanceService = maintenanceService; _clusterTopology = clusterTopology; _dataAccessor = dataAccessor; + _preserveOrder = preserveOrder; } /** @@ -76,7 +89,7 @@ private StoppableInstancesSelector(String clusterId, List orderOfZone, * @return An ObjectNode containing: * - 'stoppableNode': List of instances that can be stopped. * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and - * a list of reasons for non-stoppability as the value. + * a list of getZoneBasedInstancesreasons for non-stoppability as the value. * @throws IOException */ public ObjectNode getStoppableInstancesInSingleZone(List instances, @@ -91,7 +104,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, List zoneBasedInstance = getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, _preserveOrder); processNonexistentInstances(instances, failedStoppableInstances); return result; @@ -128,7 +141,7 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, continue; } populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, false); } processNonexistentInstances(instances, failedStoppableInstances); return result; @@ -162,17 +175,22 @@ public ObjectNode getStoppableInstancesNonZoneBased(List instances, List instancesToCheck = new ArrayList<>(instances); instancesToCheck.removeAll(nonExistingInstances); populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances); + failedStoppableInstances, false); return result; } private void populateStoppableInstances(List instances, Set toBeStoppedInstances, - ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException { + ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, boolean preserveOrder) throws IOException { Map instancesStoppableChecks = _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, - _customizedInput, toBeStoppedInstances); + _customizedInput, toBeStoppedInstances, preserveOrder); + // Print each instance and its isStoppable status + instancesStoppableChecks.forEach((instance, check) -> + System.out.println(instance + " -> isStoppable: " + check.isStoppable()) + ); + System.out.println("Just a BP"); for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { String instance = instanceStoppableCheck.getKey(); StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); @@ -251,9 +269,10 @@ public void calculateOrderOfZone(List instances, boolean random) { * The order of zones can directly come from user input. If user did not specify it, Helix will order * zones by the number of associated instances in descending order. * - * @param instances - * @param zoneMapping - * @return + * @param instances List of instances to be considered + * @param zoneMapping Mapping from zone to instances + * @return List of instances in the first non-empty zone. If preserveOrder is true, the original order + * of instances is maintained. If preserveOrder is false (default), instances are sorted lexicographically. */ private List getZoneBasedInstances(List instances, Map> zoneMapping) { @@ -263,11 +282,21 @@ private List getZoneBasedInstances(List instances, Set instanceSet = null; for (String zone : _orderOfZone) { - instanceSet = new TreeSet<>(instances); - Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); - instanceSet.retainAll(currentZoneInstanceSet); - if (instanceSet.size() > 0) { - return new ArrayList<>(instanceSet); + if (_preserveOrder) { + List filteredInstances = new ArrayList<>(instances); + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); + filteredInstances.removeIf(instance -> !currentZoneInstanceSet.contains(instance)); + if (!filteredInstances.isEmpty()) { + return filteredInstances; + } + } else { + // Original behavior - lexicographical ordering via TreeSet + instanceSet = new TreeSet<>(instances); + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); + instanceSet.retainAll(currentZoneInstanceSet); + if (!instanceSet.isEmpty()) { + return new ArrayList<>(instanceSet); + } } } @@ -319,6 +348,7 @@ public static class StoppableInstancesSelectorBuilder { private MaintenanceManagementService _maintenanceService; private ClusterTopology _clusterTopology; private ZKHelixDataAccessor _dataAccessor; + private boolean _preserveOrder = false; // Default to false for backward compatibility public StoppableInstancesSelectorBuilder setClusterId(String clusterId) { _clusterId = clusterId; @@ -351,9 +381,14 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat return this; } + public StoppableInstancesSelectorBuilder setPreserveOrder(boolean preserveOrder) { + _preserveOrder = preserveOrder; + return this; + } + public StoppableInstancesSelector build() { return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput, - _maintenanceService, _clusterTopology, _dataAccessor); + _maintenanceService, _clusterTopology, _dataAccessor, _preserveOrder); } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 1d6e249db9..830e3bfb08 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -78,6 +78,7 @@ public enum InstancesProperties { to_be_stopped_instances, skip_stoppable_check_list, customized_values, + preserve_order, instance_stoppable_parallel, instance_not_stoppable_with_reasons } @@ -304,6 +305,11 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .asBoolean(); } + boolean preserveOrder = false; + if (node.get(InstancesProperties.preserve_order.name()) != null) { + preserveOrder = node.get(InstancesProperties.preserve_order.name()).asBoolean(); + } + ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); if (selectionBase != InstanceHealthSelectionBase.non_zone_based) { if (!clusterService.isClusterTopologyAware(clusterId)) { @@ -354,6 +360,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setMaintenanceService(maintenanceService) .setClusterTopology(clusterTopology) .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId)) + .setPreserveOrder(preserveOrder) .build(); ObjectNode result; diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 7fe4bae288..0b4d28c752 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -624,6 +624,73 @@ public void testMultipleReplicasInSameMZ() throws Exception { System.out.println("End test :" + TestHelper.getTestMethodName()); } + @DataProvider(name = "preserveOrderProvider") + public Object[][] preserveOrderProvider() { + return new Object[][] { + { true }, + { false } + }; + } + + @Test(dataProvider = "preserveOrderProvider", + dependsOnMethods = "testMultipleReplicasInSameMZ") + public void testMultipleReplicasInSameMZWithPreserveOrder(boolean preserveOrder) throws Exception { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + // Create SemiAuto DB so that we can control assignment + String testDb = TestHelper.getTestMethodName() + "_resource_" + preserveOrder; + _gSetupTool.getClusterManagementTool().addResource(STOPPABLE_CLUSTER2, testDb, 3, "MasterSlave", + IdealState.RebalanceMode.SEMI_AUTO.toString()); + _gSetupTool.getClusterManagementTool().rebalance(STOPPABLE_CLUSTER2, testDb, 3); + + // Manually set ideal state to have the 3 replcias assigned to 3 instances all in the same zone + List preferenceList = Arrays.asList("instance0", "instance1", "instance2"); + IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(STOPPABLE_CLUSTER2, testDb); + for (String p : is.getPartitionSet()) { + is.setPreferenceList(p, preferenceList); + } + is.setMinActiveReplicas(2); + _gSetupTool.getClusterManagementTool().setResourceIdealState(STOPPABLE_CLUSTER2, testDb, is); + + // Wait for assignments to take place + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Run stoppable check against the 3 instances where SemiAuto DB was assigned + String content = + String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"],\"%s\":%s}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", "instance0", + InstancesAccessor.InstancesProperties.preserve_order.name(), + preserveOrder); + Response response = + new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + String stoppableNode = "instance0"; + List nonStoppableNodes = Arrays.asList("instance1", "instance2"); + if (preserveOrder) { + stoppableNode = "instance1"; + nonStoppableNodes = Arrays.asList("instance0", "instance2"); + } + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(Collections.singleton(stoppableNode).equals(stoppableSet)); + + // Next 2 instances should fail stoppable due to MIN_ACTIVE_REPLICA_CHECK_FAILED + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertFalse(getStringSet(nonStoppableInstances, stoppableNode) + .contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertTrue(getStringSet(nonStoppableInstances, nonStoppableNodes.get(0)) + .contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertTrue(getStringSet(nonStoppableInstances, nonStoppableNodes.get(1)) + .contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + @Test(dependsOnMethods = "testMultipleReplicasInSameMZ") public void testSkipClusterLevelHealthCheck() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); From 9a88499fc008066517001df7195cec16ee087b14 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Wed, 30 Apr 2025 10:19:10 +0530 Subject: [PATCH 2/6] Remove loggers --- .../MaintenanceManagementService.java | 2 +- .../StoppableInstancesSelector.java | 5 ----- .../org/apache/helix/rest/server/TestInstancesAccessor.java | 3 ++- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index f5520c70f7..f08f191f45 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -387,7 +387,7 @@ public Map batchGetInstancesStoppableChecks(String clust public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent, Set toBeStoppedInstances, boolean preserveOrder) throws IOException { - Map finalStoppableChecks = new LinkedHashMap<>(); + Map finalStoppableChecks = new HashMap<>(); // helix instance check. List instancesForCustomInstanceLevelChecks = batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 13f94e140e..a4a0789fa2 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -185,12 +185,7 @@ private void populateStoppableInstances(List instances, Set toBe Map instancesStoppableChecks = _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, _customizedInput, toBeStoppedInstances, preserveOrder); - // Print each instance and its isStoppable status - instancesStoppableChecks.forEach((instance, check) -> - System.out.println(instance + " -> isStoppable: " + check.isStoppable()) - ); - System.out.println("Just a BP"); for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { String instance = instanceStoppableCheck.getKey(); StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 0b4d28c752..4a77ebb13d 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -633,7 +633,8 @@ public Object[][] preserveOrderProvider() { } @Test(dataProvider = "preserveOrderProvider", - dependsOnMethods = "testMultipleReplicasInSameMZ") + dependsOnMethods = "testMultipleReplicasInSameMZ" + ) public void testMultipleReplicasInSameMZWithPreserveOrder(boolean preserveOrder) throws Exception { System.out.println("Start test :" + TestHelper.getTestMethodName()); // Create SemiAuto DB so that we can control assignment From 8b27bdddf33e9ac59b2366746be78056a1049d07 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Wed, 30 Apr 2025 11:06:48 +0530 Subject: [PATCH 3/6] Nit --- .../clusterMaintenanceService/MaintenanceManagementService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index f08f191f45..01df1ff20e 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -375,7 +375,7 @@ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanc public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent) throws IOException { return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, - Collections.emptySet(), false); + Collections.emptySet()); } public Map batchGetInstancesStoppableChecks(String clusterId, From 888dbb0d9d1836082b91080647715129e6c4bef8 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Thu, 1 May 2025 03:26:33 +0530 Subject: [PATCH 4/6] Incorported Review comments --- .../StoppableInstancesSelector.java | 61 +++++++++---------- .../resources/helix/InstancesAccessor.java | 17 ++---- .../rest/server/TestInstancesAccessor.java | 15 ++--- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index a4a0789fa2..1c02a0bbc2 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -63,19 +63,16 @@ public class StoppableInstancesSelector { private final MaintenanceManagementService _maintenanceService; private final ClusterTopology _clusterTopology; private final ZKHelixDataAccessor _dataAccessor; - private final boolean _preserveOrder; private StoppableInstancesSelector(String clusterId, List orderOfZone, String customizedInput, MaintenanceManagementService maintenanceService, - ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor, - boolean preserveOrder) { + ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) { _clusterId = clusterId; _orderOfZone = orderOfZone; _customizedInput = customizedInput; _maintenanceService = maintenanceService; _clusterTopology = clusterTopology; _dataAccessor = dataAccessor; - _preserveOrder = preserveOrder; } /** @@ -86,14 +83,15 @@ private StoppableInstancesSelector(String clusterId, List orderOfZone, * * @param instances A list of instance to be evaluated. * @param toBeStoppedInstances A list of instances presumed to be already stopped + * @param preserveOrder Indicates whether to preserve the original order of instances * @return An ObjectNode containing: * - 'stoppableNode': List of instances that can be stopped. * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and - * a list of getZoneBasedInstancesreasons for non-stoppability as the value. + * a list of reasons for non-stoppability as the value. * @throws IOException */ public ObjectNode getStoppableInstancesInSingleZone(List instances, - List toBeStoppedInstances) throws IOException { + List toBeStoppedInstances, boolean preserveOrder) throws IOException { ObjectNode result = JsonNodeFactory.instance.objectNode(); ArrayNode stoppableInstances = result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); @@ -102,9 +100,9 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, Set toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances); List zoneBasedInstance = - getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); + getZoneBasedInstances(instances, _clusterTopology.toZoneMapping(), preserveOrder); populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances, - failedStoppableInstances, _preserveOrder); + failedStoppableInstances, preserveOrder); processNonexistentInstances(instances, failedStoppableInstances); return result; @@ -117,6 +115,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, * * @param instances A list of instance to be evaluated. * @param toBeStoppedInstances A list of instances presumed to be already stopped + * @param preserveOrder Indicates whether to preserve the original order of instances * @return An ObjectNode containing: * - 'stoppableNode': List of instances that can be stopped. * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and @@ -154,6 +153,7 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, * * @param instances A list of instance to be evaluated. * @param toBeStoppedInstances A list of instances presumed to be already stopped + * @param preserveOrder Indicates whether to preserve the original order of instances * @return An ObjectNode containing: * - 'stoppableNode': List of instances that can be stopped. * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and @@ -266,36 +266,37 @@ public void calculateOrderOfZone(List instances, boolean random) { * * @param instances List of instances to be considered * @param zoneMapping Mapping from zone to instances + * @param preserveOrder Indicates whether to preserve the original order of instances * @return List of instances in the first non-empty zone. If preserveOrder is true, the original order * of instances is maintained. If preserveOrder is false (default), instances are sorted lexicographically. */ private List getZoneBasedInstances(List instances, - Map> zoneMapping) { + Map> zoneMapping, boolean preserveOrder) { if (_orderOfZone.isEmpty()) { - return _orderOfZone; + return Collections.emptyList(); } - Set instanceSet = null; for (String zone : _orderOfZone) { - if (_preserveOrder) { - List filteredInstances = new ArrayList<>(instances); - Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); - filteredInstances.removeIf(instance -> !currentZoneInstanceSet.contains(instance)); - if (!filteredInstances.isEmpty()) { - return filteredInstances; - } - } else { - // Original behavior - lexicographical ordering via TreeSet - instanceSet = new TreeSet<>(instances); - Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); - instanceSet.retainAll(currentZoneInstanceSet); - if (!instanceSet.isEmpty()) { - return new ArrayList<>(instanceSet); + Set currentZoneInstanceSet = zoneMapping.get(zone); + if (currentZoneInstanceSet == null || currentZoneInstanceSet.isEmpty()) { + continue; + } + + // Filter instances based on current zone + List filteredInstances = instances.stream() + .filter(currentZoneInstanceSet::contains) + .collect(Collectors.toList()); + + if (!filteredInstances.isEmpty()) { + // If preserve order is not required, return sorted list + if (!preserveOrder) { + Collections.sort(filteredInstances); // Lexicographical order } + return filteredInstances; } } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } /** @@ -343,7 +344,6 @@ public static class StoppableInstancesSelectorBuilder { private MaintenanceManagementService _maintenanceService; private ClusterTopology _clusterTopology; private ZKHelixDataAccessor _dataAccessor; - private boolean _preserveOrder = false; // Default to false for backward compatibility public StoppableInstancesSelectorBuilder setClusterId(String clusterId) { _clusterId = clusterId; @@ -376,14 +376,9 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat return this; } - public StoppableInstancesSelectorBuilder setPreserveOrder(boolean preserveOrder) { - _preserveOrder = preserveOrder; - return this; - } - public StoppableInstancesSelector build() { return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput, - _maintenanceService, _clusterTopology, _dataAccessor, _preserveOrder); + _maintenanceService, _clusterTopology, _dataAccessor); } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 830e3bfb08..812f8d349a 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -78,7 +78,6 @@ public enum InstancesProperties { to_be_stopped_instances, skip_stoppable_check_list, customized_values, - preserve_order, instance_stoppable_parallel, instance_not_stoppable_with_reasons } @@ -160,7 +159,9 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, @QueryParam("continueOnFailures") boolean continueOnFailures, @QueryParam("skipZKRead") boolean skipZKRead, @QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories, - @DefaultValue("false") @QueryParam("random") boolean random, String content) { + @DefaultValue("false") @QueryParam("random") boolean random, + @DefaultValue("false") @QueryParam("preserveOrder") boolean preserveOrder, + String content) { Command cmd; try { cmd = Command.valueOf(command); @@ -205,7 +206,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, break; case stoppable: return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures, - skipHealthCheckCategorySet, random); + skipHealthCheckCategorySet, random, preserveOrder); default: _logger.error("Unsupported command :" + command); return badRequest("Unsupported command :" + command); @@ -223,7 +224,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead, boolean continueOnFailures, Set skipHealthCheckCategories, - boolean random) throws IOException { + boolean random, boolean preserveOrder) throws IOException { try { // TODO: Process input data from the content // TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799 @@ -305,11 +306,6 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .asBoolean(); } - boolean preserveOrder = false; - if (node.get(InstancesProperties.preserve_order.name()) != null) { - preserveOrder = node.get(InstancesProperties.preserve_order.name()).asBoolean(); - } - ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); if (selectionBase != InstanceHealthSelectionBase.non_zone_based) { if (!clusterService.isClusterTopologyAware(clusterId)) { @@ -360,14 +356,13 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setMaintenanceService(maintenanceService) .setClusterTopology(clusterTopology) .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId)) - .setPreserveOrder(preserveOrder) .build(); ObjectNode result; switch (selectionBase) { case zone_based: stoppableInstancesSelector.calculateOrderOfZone(instances, random); - result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); + result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, preserveOrder); break; case cross_zone_based: stoppableInstancesSelector.calculateOrderOfZone(instances, random); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 4a77ebb13d..5c06aef32a 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -659,15 +659,16 @@ public void testMultipleReplicasInSameMZWithPreserveOrder(boolean preserveOrder) // Run stoppable check against the 3 instances where SemiAuto DB was assigned String content = - String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"],\"%s\":%s}", + String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"]}", InstancesAccessor.InstancesProperties.selection_base.name(), InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), - InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", "instance0", - InstancesAccessor.InstancesProperties.preserve_order.name(), - preserveOrder); - Response response = - new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( - STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", "instance0"); + Response response = new JerseyUriRequestBuilder(String.format( + "clusters/%s/instances?command=stoppable&skipHealthCheckCategories=%s&preserveOrder=%s", + STOPPABLE_CLUSTER2, + "CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK", + preserveOrder)) + .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); String stoppableNode = "instance0"; From 8cc47cd140d4ce2433f981d82b37cdb14ab2fbf6 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Wed, 7 May 2025 22:10:32 +0530 Subject: [PATCH 5/6] Add comment for explaining LinkedHashMap usage --- .../MaintenanceManagementService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index 01df1ff20e..de084ea447 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -492,6 +492,9 @@ private List batchHelixInstanceStoppableCheck(String clusterId, Function.identity(), instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)), (existing, replacement) -> existing, + // Use LinkedHashMap when preserveOrder is true as we need to preserve the order of instances. + // This is important for addMinActiveReplicaChecks which processes instances sequentially, + // and the order of processing can affect which instances pass the min active replica check preserveOrder ? LinkedHashMap::new : HashMap::new )); From 791c08f15c8a151b7359b5eaaea2814642e8e780 Mon Sep 17 00:00:00 2001 From: LZD-PratyushBhatt Date: Mon, 12 May 2025 11:58:46 +0530 Subject: [PATCH 6/6] Add preserveOrder check for CustomChecks --- .../MaintenanceManagementService.java | 32 ++++++++++++------- .../StoppableInstancesSelector.java | 21 ++++++++++-- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index de084ea447..18d1abd60c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -394,7 +394,7 @@ public Map batchGetInstancesStoppableChecks(String clust toBeStoppedInstances, preserveOrder); // custom check, includes partition check. batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks, - toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent)); + toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent), preserveOrder); return finalStoppableChecks; } @@ -507,7 +507,7 @@ private List batchHelixInstanceStoppableCheck(String clusterId, private List batchCustomInstanceStoppableCheck(String clusterId, List instances, Set toBeStoppedInstances, Map finalStoppableChecks, - Map customPayLoads) { + Map customPayLoads, boolean preserveOrder) { if (instances.isEmpty()) { // if all instances failed at previous checks, then all following checks are not required. return instances; @@ -536,7 +536,7 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List clusterLevelCustomCheckResult = performAggregatedCustomCheck(clusterId, instanceIdsForCustomCheck, restConfig.getCompleteConfiguredHealthUrl().get(), customPayLoads, - toBeStoppedInstances); + toBeStoppedInstances, preserveOrder); List instancesForNextCheck = new ArrayList<>(); clusterLevelCustomCheckResult.forEach((instance, stoppableCheck) -> { addStoppableCheck(finalStoppableChecks, instance, stoppableCheck); @@ -553,9 +553,13 @@ private List batchCustomInstanceStoppableCheck(String clusterId, List instancesForCustomPartitionLevelChecks = instanceIdsForCustomCheck; if (!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)) { Map> customInstanceLevelChecks = instances.stream().collect( - Collectors.toMap(Function.identity(), instance -> POOL.submit( - () -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance), - customPayLoads)))); + Collectors.toMap( + Function.identity(), + instance -> POOL.submit(() -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance), customPayLoads)), + (existing, replacement) -> existing, + // Use LinkedHashMap when preserveOrder is true to maintain the original order of instances + preserveOrder ? LinkedHashMap::new : HashMap::new + )); instancesForCustomPartitionLevelChecks = filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks); } @@ -638,7 +642,7 @@ private Map batchInstanceHealthCheck( // custom check, includes custom Instance check and partition check. instancesForNext = batchCustomInstanceStoppableCheck(clusterId, instancesForNext, Collections.emptySet(), - finalStoppableChecks, healthCheckConfig); + finalStoppableChecks, healthCheckConfig, false); } else { throw new UnsupportedOperationException(healthCheck + " is not supported yet!"); } @@ -785,8 +789,10 @@ private Map performPartitionsCheck(List instance private Map performAggregatedCustomCheck(String clusterId, List instances, String url, Map customPayLoads, - Set toBeStoppedInstances) { - Map aggregatedStoppableChecks = new HashMap<>(); + Set toBeStoppedInstances, boolean preserveOrder) { + // Use LinkedHashMap when preserveOrder is true to maintain the original order of instances + Map aggregatedStoppableChecks = preserveOrder ? + new LinkedHashMap<>() : new HashMap<>(); try { Map> customCheckResult = _customRestClient.getAggregatedStoppableCheck(url, instances, toBeStoppedInstances, @@ -799,9 +805,13 @@ private Map performAggregatedCustomCheck(String clusterI } } catch (IOException ex) { LOG.error("Custom client side aggregated health check for {} failed.", clusterId, ex); - return instances.stream().collect(Collectors.toMap(Function.identity(), + return instances.stream().collect(Collectors.toMap( + Function.identity(), instance -> new StoppableCheck(false, Collections.singletonList(instance), - StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK))); + StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK), + (existing, replacement) -> existing, + // Use LinkedHashMap when preserveOrder is true to maintain the original order of instances + preserveOrder ? LinkedHashMap::new : HashMap::new)); } return aggregatedStoppableChecks; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 1c02a0bbc2..8e36b371db 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -75,6 +75,25 @@ private StoppableInstancesSelector(String clusterId, List orderOfZone, _dataAccessor = dataAccessor; } + /** + * Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones. + * If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with + * the highest instance count. The method iterates through instances, performing stoppable checks, and records + * reasons for non-stoppability. + * + * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be already stopped + * @return An ObjectNode containing: + * - 'stoppableNode': List of instances that can be stopped. + * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and + * a list of reasons for non-stoppability as the value. + * @throws IOException + */ + public ObjectNode getStoppableInstancesInSingleZone(List instances, + List toBeStoppedInstances) throws IOException { + return getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, false); + } + /** * Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones. * If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with @@ -115,7 +134,6 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, * * @param instances A list of instance to be evaluated. * @param toBeStoppedInstances A list of instances presumed to be already stopped - * @param preserveOrder Indicates whether to preserve the original order of instances * @return An ObjectNode containing: * - 'stoppableNode': List of instances that can be stopped. * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and @@ -153,7 +171,6 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, * * @param instances A list of instance to be evaluated. * @param toBeStoppedInstances A list of instances presumed to be already stopped - * @param preserveOrder Indicates whether to preserve the original order of instances * @return An ObjectNode containing: * - 'stoppableNode': List of instances that can be stopped. * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and