Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,19 @@ void autoEnableMaintenanceMode(String clusterName, boolean enabled, String reaso
void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, String reason,
Map<String, String> customFields);

/**
* Manually enable maintenance mode with timeout. To be called by the REST client.
* The cluster will automatically exit maintenance mode after the specified timeout.
* @param clusterName the cluster name
* @param enabled if true, enter maintenance mode; if false, exit maintenance mode
* @param reason reason to enter maintenance mode
* @param timeout time in milliseconds after which maintenance mode should be exited automatically.
* Only applicable when enabled is true. Set to -1 for no automatic exit.
* @param customFields user-specified KV mappings to be stored in the ZNode
*/
void manuallyEnableMaintenanceModeWithTimeout(String clusterName, boolean enabled, String reason,
long timeout, Map<String, String> customFields);

/**
* Check specific cluster is in maintenance mode or not
* @param clusterName the cluster name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,33 @@ public void execute(final ClusterEvent event) throws Exception {
// Check for the maintenance signal
// If it was entered manually or the signal is null (which shouldn't happen), skip this stage
MaintenanceSignal maintenanceSignal = cache.getMaintenanceSignal();
if (maintenanceSignal == null || maintenanceSignal
.getTriggeringEntity() != MaintenanceSignal.TriggeringEntity.CONTROLLER) {
if (maintenanceSignal == null) {
return;
}

HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null || !manager.isConnected()) {
LogUtil.logInfo(LOG, _eventId,
"MaintenanceRecoveryStage failed due to HelixManager being null or not connected!");
return;
}
// Check if this is a user-triggered maintenance mode with an end time
if (maintenanceSignal.getTriggeringEntity() == MaintenanceSignal.TriggeringEntity.USER) {
long endTime = maintenanceSignal.getEndTime();
// If endTime is set and the current time has passed the end time, exit maintenance mode
if (endTime > 0 && System.currentTimeMillis() >= endTime) {
String reason = String.format(
"Timeout-based exit from maintenance mode for cluster %s; End time %d has passed.",
event.getClusterName(), endTime);

manager.getClusterManagmentTool().manuallyEnableMaintenanceMode(manager.getClusterName(), false,
reason, null);
cache.setMaintenanceSignalChanged(); // Set the flag so we do not double enable/disable
LogUtil.logInfo(LOG, _eventId, reason);
return;
}
// Not yet time to exit, or no end time set
return;
}

// At this point, the cluster entered maintenance mode automatically. Retrieve the
// auto-triggering reason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,15 @@ public void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, S
Map<String, String> customFields) {
processMaintenanceMode(clusterName, enabled, reason,
MaintenanceSignal.AutoTriggerReason.NOT_APPLICABLE, customFields,
MaintenanceSignal.TriggeringEntity.USER);
MaintenanceSignal.TriggeringEntity.USER, -1);
}

@Override
public void manuallyEnableMaintenanceModeWithTimeout(String clusterName, boolean enabled,
String reason, long timeout, Map<String, String> customFields) {
processMaintenanceMode(clusterName, enabled, reason,
MaintenanceSignal.AutoTriggerReason.NOT_APPLICABLE, customFields,
MaintenanceSignal.TriggeringEntity.USER, timeout);
}

/**
Expand All @@ -1195,6 +1203,26 @@ private void processMaintenanceMode(String clusterName, final boolean enabled,
final String reason, final MaintenanceSignal.AutoTriggerReason internalReason,
final Map<String, String> customFields,
final MaintenanceSignal.TriggeringEntity triggeringEntity) {
processMaintenanceMode(clusterName, enabled, reason, internalReason, customFields,
triggeringEntity, -1);
}

/**
* Helper method for enabling/disabling maintenance mode.
* @param clusterName
* @param enabled
* @param reason
* @param internalReason
* @param customFields
* @param triggeringEntity
* @param timeout time in milliseconds after which maintenance mode should be exited automatically.
* Only applicable when enabled is true. Set to -1 for no automatic exit.
*/
private void processMaintenanceMode(String clusterName, final boolean enabled,
final String reason, final MaintenanceSignal.AutoTriggerReason internalReason,
final Map<String, String> customFields,
final MaintenanceSignal.TriggeringEntity triggeringEntity,
final long timeout) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
Expand All @@ -1212,6 +1240,7 @@ private void processMaintenanceMode(String clusterName, final boolean enabled,
}
maintenanceSignal.setTimestamp(currentTime);
maintenanceSignal.setTriggeringEntity(triggeringEntity);

switch (triggeringEntity) {
case CONTROLLER:
// autoEnable
Expand All @@ -1231,6 +1260,29 @@ private void processMaintenanceMode(String clusterName, final boolean enabled,
}
break;
}

if (timeout > 0) {
// Set end time if timeout is provided
long endTime = currentTime + timeout;
maintenanceSignal.setEndTime(endTime);

// If timeout is provided, create a /CONTROLLER/MAINTENANCE_TTL that is a PERSISTENT_WITH_TTL znode
try {
String maintenanceTTLPath = "/" + clusterName + "/CONTROLLER/MAINTENANCE_TTL";
ZNRecord record = new ZNRecord("MAINTENANCE_TTL");
record.setLongField("TIMEOUT_MS", timeout);
record.setLongField("END_TIME", currentTime + timeout);
boolean success = ((ZkBaseDataAccessor<ZNRecord>) accessor.getBaseDataAccessor())
.create(maintenanceTTLPath, record, AccessOption.PERSISTENT_WITH_TTL, timeout);

if (!success) {
logger.warn("Failed to create TTL znode for maintenance mode. Auto exit may not work.");
}
} catch (Exception e) {
logger.warn("Failed to create TTL znode for maintenance mode. Auto exit may not work. ", e);
}
}

if (!accessor.createMaintenance(maintenanceSignal)) {
throw new HelixException("Failed to create maintenance signal!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class MaintenanceSignal extends PauseSignal {
public enum MaintenanceSignalProperty {
TRIGGERED_BY,
TIMESTAMP,
AUTO_TRIGGER_REASON
AUTO_TRIGGER_REASON,
END_TIME
}

/**
Expand Down Expand Up @@ -112,4 +113,21 @@ public void setTimestamp(long timestamp) {
public long getTimestamp() {
return _record.getLongField(MaintenanceSignalProperty.TIMESTAMP.name(), -1);
}

/**
* Sets the end time for maintenance mode.
* @param endTime the time (in milliseconds) when maintenance mode should end. A value of -1 means
* no automatic exit.
*/
public void setEndTime(long endTime) {
_record.setLongField(MaintenanceSignalProperty.END_TIME.name(), endTime);
}

/**
* Returns the end time for maintenance mode.
* @return the time (in milliseconds) when maintenance mode should end. Returns -1 if no end time is set.
*/
public long getEndTime() {
return _record.getLongField(MaintenanceSignalProperty.END_TIME.name(), -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,11 @@ public void beforeSuite() throws Exception {
// TODO: use logging.properties file to config java.util.logging.Logger levels
java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.WARNING);

// Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
System.setProperty("zookeeper.4lw.commands.whitelist", "*");

// Enable extended types for TTL support
System.setProperty("zookeeper.extendedTypesEnabled", "true");
System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000");

// Start in-memory ZooKeepers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
Expand All @@ -32,6 +33,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
Expand All @@ -41,13 +43,28 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.ContainerManager;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import java.lang.reflect.Field;
import org.apache.zookeeper.ZooKeeper;
import org.apache.helix.AccessOption;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.NIOServerCnxnFactory;

public class TestClusterMaintenanceMode extends TaskTestBase {
private static final long TIMEOUT = 180 * 1000L;
Expand Down Expand Up @@ -430,6 +447,59 @@ public void testMaintenanceHistory() throws Exception {
Assert.assertNull(lastHistoryEntry.get("AUTO_TRIGGER_REASON"));
}

@Test
public void testMaintenanceModeWithTimeout() throws Exception {
// Get ZooKeeper server instance from _zkServerMap
ZkServer zkServer = _zkServerMap.get(ZK_ADDR);
Assert.assertNotNull(zkServer, "ZooKeeper server not found");

// Get the ZooKeeper server instance
ZooKeeperServer zooKeeperServer = zkServer.getZooKeeperServer();
Assert.assertNotNull(zooKeeperServer, "ZooKeeper server instance not found");

// Get first processor from ZooKeeperServer using reflection
Field firstProcessorField = ZooKeeperServer.class.getDeclaredField("firstProcessor");
firstProcessorField.setAccessible(true);
RequestProcessor firstProcessor = (RequestProcessor) firstProcessorField.get(zooKeeperServer);
Assert.assertNotNull(firstProcessor, "First processor not found");

// Create a custom ContainerManager with a fake elapsed time
AtomicLong fakeElapsed = new AtomicLong(0);
ContainerManager containerManager = new ContainerManager(
zooKeeperServer.getZKDatabase(),
firstProcessor,
100, // Check interval in milliseconds
100 // Max containers to check per interval
) {
@Override
protected long getElapsed(DataNode node) {
return fakeElapsed.get();
}
};

// Enable maintenance mode with TTL
long timeout = 1000; // 1 second TTL
Map<String, String> customFields = new HashMap<>();
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceModeWithTimeout(CLUSTER_NAME, true, TestHelper.getTestMethodName(), timeout, customFields);

// Verify maintenance mode is enabled
String maintenanceTTLPath = "/" + CLUSTER_NAME + "/CONTROLLER/MAINTENANCE_TTL";
Assert.assertTrue(_manager.getHelixDataAccessor().getBaseDataAccessor().exists(maintenanceTTLPath, 0));
Assert.assertTrue(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME), "Cluster should be in maintenance mode");

// Set elapsed time to trigger TTL purge
fakeElapsed.set(System.currentTimeMillis() + 10000); // Set to future time to force purge

// Force container manager to check now
Thread.sleep(1000);
containerManager.checkContainers();

// Verify maintenance mode is disabled
Assert.assertFalse(_manager.getHelixDataAccessor().getBaseDataAccessor().exists(maintenanceTTLPath, 0));
Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME), "Cluster should not be in maintenance mode");

}

/**
* Convert a String representation of a Map into a Map object for verification purposes.
* @param value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ public void manuallyEnableMaintenanceMode(String clusterName, boolean enabled, S

}

@Override
public void manuallyEnableMaintenanceModeWithTimeout(String clusterName, boolean enabled, String reason,
long timeout, Map<String, String> customFields) {

}

@Override
public boolean isInMaintenanceMode(String clusterName) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,27 @@ public Response updateCluster(@PathParam("clusterId") String clusterId,
} catch (Exception e) {
// NOP
}
helixAdmin
.manuallyEnableMaintenanceMode(clusterId, command == Command.enableMaintenanceMode,
content, customFieldsMap);

// Check if a timeout is specified
long timeout = -1;
if (customFieldsMap != null) {
try {
String timeoutStr = customFieldsMap.get("timeout");
if (timeoutStr != null && !timeoutStr.isEmpty()) {
timeout = Long.parseLong(timeoutStr);
}
} catch (NumberFormatException nfe) {
LOG.warn("Invalid timeout value specified", nfe);
}
}

if (timeout > 0 && command == Command.enableMaintenanceMode) {
helixAdmin.manuallyEnableMaintenanceModeWithTimeout(clusterId, true, content, timeout,
customFieldsMap);
} else {
helixAdmin.manuallyEnableMaintenanceMode(clusterId, command == Command.enableMaintenanceMode,
content, customFieldsMap);
}
break;
case enableWagedRebalanceForAllResources:
// Enable WAGED rebalance for all resources in the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -70,6 +71,8 @@
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.server.ContainerManager;
import org.apache.zookeeper.server.DataNode;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -591,10 +594,37 @@ public void testGetClusterConfig() throws IOException {
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testGetClusterConfig")
// @Test(dependsOnMethods = "testGetClusterConfig")
@Test
public void testEnableDisableMaintenanceModeWithTimeout() throws IOException {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
System.out.println("Start test :" + TestHelper.getTestMethodName());
String cluster = _clusters.iterator().next();
System.out.println("cluster: " + cluster);
String reasonValue = "Test reason";
String reasonJSONString = "{\"reason\":\"" + reasonValue + "\", \"timeout\":10000}";
// enable maintenance mode
post("clusters/" + cluster, ImmutableMap.of("command", "enableMaintenanceMode"),
Entity.entity(reasonJSONString, MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());

// verify is in maintenance mode
Assert.assertTrue(isMaintenanceModeEnabled(cluster));

// Check that we could retrieve maintenance signal correctly
Map<String, Object> maintenanceSignalMap =
getMapResponseFromRest("clusters/" + cluster + "/controller/maintenanceSignal");
Assert.assertEquals(maintenanceSignalMap.get("TRIGGERED_BY"), "USER");
Assert.assertEquals(maintenanceSignalMap.get("REASON"), reasonValue);
Assert.assertNotNull(maintenanceSignalMap.get("TIMESTAMP"));
Assert.assertEquals(maintenanceSignalMap.get("clusterName"), cluster);
}


@Test
public void testEnableDisableMaintenanceMode() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String cluster = _clusters.iterator().next();
System.out.println("cluster: " + cluster);
String reasonValue = "Test reason";
String reasonJSONString = "{\"reason\":\"" + reasonValue + "\"}";
// enable maintenance mode
Expand Down
Loading