Skip to content

Commit fe6ea7d

Browse files
[Testing] Soft delete the instance, and add Orphan deletion
1 parent 1fdcd73 commit fe6ea7d

File tree

6 files changed

+145
-4
lines changed

6 files changed

+145
-4
lines changed

helix-core/src/main/java/org/apache/helix/HelixAdmin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,17 @@ void addResource(String clusterName, String resourceName, int numPartitions, Str
235235
*/
236236
void dropInstance(String clusterName, InstanceConfig instanceConfig);
237237

238+
/**
239+
* Soft drop an instance from a cluster. Deletes InstanceConfig immediately to make the instance
240+
* non-assignable, then relies on async cleanup to remove the instance runtime path.
241+
* Use this for instances with large numbers of pending messages to avoid jute.maxbuffer issues.
242+
* @param clusterName
243+
* @param instanceConfig
244+
*/
245+
default void softDropInstance(String clusterName, InstanceConfig instanceConfig) {
246+
throw new UnsupportedOperationException("softDropInstance is not implemented.");
247+
}
248+
238249
/**
239250
* Purge offline instances that have been offline for longer than the offline duration time
240251
* from a cluster

helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.apache.helix.controller.stages.MessageSelectionStage;
9090
import org.apache.helix.controller.stages.MessageThrottleStage;
9191
import org.apache.helix.controller.stages.ParticipantDeregistrationStage;
92+
import org.apache.helix.controller.stages.OrphanedInstanceCleanupStage;
9293
import org.apache.helix.controller.stages.PersistAssignmentStage;
9394
import org.apache.helix.controller.stages.ReadClusterDataStage;
9495
import org.apache.helix.controller.stages.ResourceComputationStage;
@@ -537,6 +538,7 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
537538
rebalancePipeline.addStage(new PersistAssignmentStage());
538539
rebalancePipeline.addStage(new TargetExteralViewCalcStage());
539540
rebalancePipeline.addStage(new ParticipantDeregistrationStage());
541+
rebalancePipeline.addStage(new OrphanedInstanceCleanupStage());
540542

541543
// external view generation
542544
Pipeline externalViewPipeline = new Pipeline(pipelineName);

helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ public enum AsyncWorkerType {
3434
MaintenanceRecoveryWorker,
3535
TaskJobPurgeWorker,
3636
CustomizedStateViewComputeWorker,
37-
ParticipantDeregistrationWorker
37+
ParticipantDeregistrationWorker,
38+
OrphanedInstanceCleanupWorker
3839
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.apache.helix.controller.stages;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing,
15+
* software distributed under the License is distributed on an
16+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
* KIND, either express or implied. See the License for the
18+
* specific language governing permissions and limitations
19+
* under the License.
20+
*/
21+
22+
import java.util.List;
23+
24+
import org.apache.helix.AccessOption;
25+
import org.apache.helix.HelixManager;
26+
import org.apache.helix.PropertyPathBuilder;
27+
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
28+
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
29+
import org.apache.helix.controller.pipeline.AsyncWorkerType;
30+
import org.apache.helix.model.ClusterConfig;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* Cleans up orphaned instance runtime paths when InstanceConfig is deleted but the instance
36+
* subtree under INSTANCES remains.
37+
*/
38+
public class OrphanedInstanceCleanupStage extends AbstractAsyncBaseStage {
39+
private static final Logger LOG = LoggerFactory.getLogger(OrphanedInstanceCleanupStage.class);
40+
41+
@Override
42+
public AsyncWorkerType getAsyncWorkerType() {
43+
return AsyncWorkerType.OrphanedInstanceCleanupWorker;
44+
}
45+
46+
@Override
47+
public void execute(final ClusterEvent event) {
48+
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
49+
ResourceControllerDataProvider cache =
50+
event.getAttribute(AttributeName.ControllerDataProvider.name());
51+
if (manager == null || cache == null) {
52+
return;
53+
}
54+
55+
ClusterConfig clusterConfig = cache.getClusterConfig();
56+
if (clusterConfig == null || !clusterConfig.isAsyncInstanceDropEnabled()) {
57+
return;
58+
}
59+
60+
String clusterName = cache.getClusterName();
61+
62+
try {
63+
List<String> instanceNames = manager.getHelixDataAccessor().getBaseDataAccessor()
64+
.getChildNames(PropertyPathBuilder.instance(clusterName), AccessOption.PERSISTENT);
65+
if (instanceNames == null || instanceNames.isEmpty()) {
66+
return;
67+
}
68+
69+
for (String instanceName : instanceNames) {
70+
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
71+
if (manager.getHelixDataAccessor().getBaseDataAccessor()
72+
.exists(instanceConfigPath, AccessOption.PERSISTENT)) {
73+
continue;
74+
}
75+
76+
String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName);
77+
if (manager.getHelixDataAccessor().getBaseDataAccessor()
78+
.exists(liveInstancePath, AccessOption.PERSISTENT)) {
79+
LOG.warn("Found live instance {} without InstanceConfig in cluster {}. Skip orphan cleanup.",
80+
instanceName, clusterName);
81+
continue;
82+
}
83+
84+
String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
85+
LOG.info("Cleaning up orphaned instance path {}", instancePath);
86+
boolean removed = manager.getHelixDataAccessor().getBaseDataAccessor()
87+
.remove(instancePath, AccessOption.PERSISTENT);
88+
if (removed) {
89+
LOG.info("Removed orphaned instance path {} in cluster {}.", instancePath, clusterName);
90+
} else {
91+
LOG.warn("Failed to remove orphaned instance path {} in cluster {}. Will retry.", instancePath, clusterName);
92+
}
93+
}
94+
} catch (Exception e) {
95+
LOG.warn("OrphanedInstanceCleanupStage failed for cluster {}.", clusterName, e);
96+
}
97+
}
98+
}

helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,6 @@ private void dropInstancePathsRecursively(String clusterName, String instanceNam
292292
} catch (ZkClientException e) {
293293
if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
294294
.getCause() instanceof KeeperException.NotEmptyException) {
295-
// Racing condition with controller's persisting node history, retryable.
296-
// We don't need to backoff here as this racing condition only happens once (controller
297-
// does not repeatedly write instance history)
298295
logger.warn("Retrying dropping instance {} with exception {}", instanceName,
299296
e.getCause().getMessage());
300297
retryCnt++;
@@ -308,6 +305,27 @@ private void dropInstancePathsRecursively(String clusterName, String instanceNam
308305
}
309306
}
310307

308+
@Override
309+
public void softDropInstance(String clusterName, InstanceConfig instanceConfig) {
310+
logger.info("Soft drop instance {} from cluster {}.", instanceConfig.getInstanceName(), clusterName);
311+
String instanceName = instanceConfig.getInstanceName();
312+
313+
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
314+
if (!_zkClient.exists(instanceConfigPath)) {
315+
throw new HelixException(
316+
"Node " + instanceName + " does not exist in config for cluster " + clusterName);
317+
}
318+
319+
String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName);
320+
if (_zkClient.exists(liveInstancePath)) {
321+
throw new HelixException(
322+
"Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop.");
323+
}
324+
325+
_zkClient.delete(instanceConfigPath);
326+
logger.info("Deleted InstanceConfig for {}. OrphanedInstanceCleanupStage will clean up INSTANCES path asynchronously.", instanceName);
327+
}
328+
311329
/**
312330
* Please note that the purge function should only be called when there is no new instance
313331
* joining happening in the cluster. The reason is that current implementation is not thread safe,

helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ public enum ClusterConfigProperty {
170170
// How long offline nodes will stay in the cluster before they are automatically purged, in milliseconds
171171
PARTICIPANT_DEREGISTRATION_TIMEOUT,
172172

173+
// Enables async instance drop cleanup to avoid jute.maxbuffer failures
174+
ASYNC_INSTANCE_DROP_ENABLED,
175+
173176
// Allow disabled partitions to remain OFFLINE instead of being reassigned in WAGED rebalancer
174177
RELAXED_DISABLED_PARTITION_CONSTRAINT,
175178
}
@@ -1329,4 +1332,12 @@ public void setParticipantDeregistrationTimeout(long timeout) {
13291332
public boolean isParticipantDeregistrationEnabled() {
13301333
return getParticipantDeregistrationTimeout() > -1;
13311334
}
1335+
1336+
public boolean isAsyncInstanceDropEnabled() {
1337+
return _record.getBooleanField(ClusterConfigProperty.ASYNC_INSTANCE_DROP_ENABLED.name(), false);
1338+
}
1339+
1340+
public void setAsyncInstanceDropEnabled(boolean enabled) {
1341+
_record.setBooleanField(ClusterConfigProperty.ASYNC_INSTANCE_DROP_ENABLED.name(), enabled);
1342+
}
13321343
}

0 commit comments

Comments
 (0)