diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java index 8cb089cb9e..b409a7f0a2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java @@ -55,7 +55,7 @@ public Map getBaselineAssignment(AssignmentMetadataS if (assignmentMetadataStore != null) { try { _stateReadLatency.startMeasuringLatency(); - currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline()); + currentBaseline = assignmentMetadataStore.getBaseline(); _stateReadLatency.endMeasuringLatency(); } catch (Exception ex) { throw new HelixRebalanceException( @@ -88,10 +88,7 @@ public Map getBestPossibleAssignment( if (assignmentMetadataStore != null) { try { _stateReadLatency.startMeasuringLatency(); - currentBestAssignment = - assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, - entry -> new ResourceAssignment(entry.getValue().getRecord()))); + currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment(); ; _stateReadLatency.endMeasuringLatency(); } catch (Exception ex) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java index 157bb0ae4c..b793778d5a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Objects; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; @@ -50,8 +50,8 @@ public class AssignmentMetadataStore { private final String _baselinePath; private final String _bestPossiblePath; // volatile for double-checked locking - protected volatile Map _globalBaseline; - protected volatile Map _bestPossibleAssignment; + protected volatile Map _globalBaseline = null; + protected volatile Map _bestPossibleAssignment = null; protected volatile int _bestPossibleVersion = 0; protected volatile int _lastPersistedBestPossibleVersion = 0; @@ -65,6 +65,9 @@ protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY); } + /** + * @return a deep copy of the best possible assignment that is safe for modification. + */ public Map getBaseline() { // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK if (_globalBaseline == null) { @@ -75,7 +78,11 @@ public Map getBaseline() { } } } - return _globalBaseline; + Map result = new HashMap<>(_globalBaseline.size()); + for (Map.Entry entry : _globalBaseline.entrySet()) { + result.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); + } + return result; } /** @@ -88,6 +95,9 @@ protected boolean hasPersistedLatestBestPossibleAssignment() { return _lastPersistedBestPossibleVersion == _bestPossibleVersion; } + /** + * @return a deep copy of the best possible assignment that is safe for modification. + */ public Map getBestPossibleAssignment() { // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK if (_bestPossibleAssignment == null) { @@ -98,7 +108,12 @@ public Map getBestPossibleAssignment() { } } } - return _bestPossibleAssignment; + // Return defensive copy so that the in-memory assignment is not modified by callers + Map result = new HashMap<>(_bestPossibleAssignment); + for (Map.Entry entry : _bestPossibleAssignment.entrySet()) { + result.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); + } + return result; } private Map fetchAssignmentOrDefault(String path) { @@ -135,14 +150,15 @@ private void persistAssignmentToMetadataStore(Map ne */ public synchronized void persistBaseline(Map globalBaseline) { // Create defensive copy so that the in-memory assignment is not modified after it is persisted - Map baselineCopy = globalBaseline.entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, - entry -> new ResourceAssignment(entry.getValue().getRecord()))); + Map baselineCopy = new HashMap<>(globalBaseline.size()); + for (Map.Entry entry : globalBaseline.entrySet()) { + baselineCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); + } + // write to metadata store persistAssignmentToMetadataStore(baselineCopy, _baselinePath, BASELINE_KEY); // write to memory - getBaseline().clear(); - getBaseline().putAll(baselineCopy); + _globalBaseline = baselineCopy; } /** @@ -152,14 +168,14 @@ public synchronized void persistBaseline(Map globalB */ public synchronized void persistBestPossibleAssignment(Map bestPossibleAssignment) { // Create defensive copy so that the in-memory assignment is not modified after it is persisted - Map bestPossibleAssignmentCopy = bestPossibleAssignment.entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, - entry -> new ResourceAssignment(entry.getValue().getRecord()))); + Map bestPossibleAssignmentCopy = new HashMap<>(bestPossibleAssignment.size()); + for (Map.Entry entry : bestPossibleAssignment.entrySet()) { + bestPossibleAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); + } // write to metadata store persistAssignmentToMetadataStore(bestPossibleAssignmentCopy, _bestPossiblePath, BEST_POSSIBLE_KEY); // write to memory - getBestPossibleAssignment().clear(); - getBestPossibleAssignment().putAll(bestPossibleAssignmentCopy); + _bestPossibleAssignment = bestPossibleAssignmentCopy; _bestPossibleVersion++; _lastPersistedBestPossibleVersion = _bestPossibleVersion; } @@ -173,10 +189,13 @@ public synchronized void persistBestPossibleAssignment(Map bestPossibleAssignment, int newVersion) { + Map bestPossibleAssignmentCopy = new HashMap<>(bestPossibleAssignment.size()); + for (Map.Entry entry : bestPossibleAssignment.entrySet()) { + bestPossibleAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord())); + } // Check if the version is stale by this point if (newVersion > _bestPossibleVersion) { - getBestPossibleAssignment().clear(); - getBestPossibleAssignment().putAll(bestPossibleAssignment); + _bestPossibleAssignment = bestPossibleAssignmentCopy; _bestPossibleVersion = newVersion; return true; } @@ -191,19 +210,13 @@ public int getBestPossibleVersion() { public synchronized void clearAssignmentMetadata() { persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath, BASELINE_KEY); persistAssignmentToMetadataStore(Collections.emptyMap(), _bestPossiblePath, BEST_POSSIBLE_KEY); - getBaseline().clear(); - getBestPossibleAssignment().clear(); + _globalBaseline = new HashMap<>(); + _bestPossibleAssignment = new HashMap<>(); } protected synchronized void reset() { - if (_bestPossibleAssignment != null) { - _bestPossibleAssignment.clear(); - _bestPossibleAssignment = null; - } - if (_globalBaseline != null) { - _globalBaseline.clear(); - _globalBaseline = null; - } + _bestPossibleAssignment = null; + _globalBaseline = null; } protected void finalize() { @@ -248,10 +261,10 @@ private Map splitAssignments(HelixProperty property) } protected boolean isBaselineChanged(Map newBaseline) { - return !getBaseline().equals(newBaseline); + return !Objects.equals(_globalBaseline, newBaseline); } protected boolean isBestPossibleChanged(Map newBestPossible) { - return !getBestPossibleAssignment().equals(newBestPossible); + return !Objects.equals(_bestPossibleAssignment, newBestPossible); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java index fb59b81465..997ec23ab6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java @@ -19,7 +19,7 @@ * under the License. */ -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.helix.BucketDataAccessor; @@ -36,7 +36,7 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore { } public Map getBaseline() { - return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline; + return _globalBaseline == null ? new HashMap<>() : _globalBaseline; } public void persistBaseline(Map globalBaseline) { @@ -44,7 +44,7 @@ public void persistBaseline(Map globalBaseline) { } public Map getBestPossibleAssignment() { - return _bestPossibleAssignment == null ? Collections.emptyMap() : _bestPossibleAssignment; + return _bestPossibleAssignment == null ? new HashMap<>() : _bestPossibleAssignment; } public void persistBestPossibleAssignment(