From f405fb870dba83ff9fae43531ab38a2f2d29c5f3 Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 08:06:22 -0800 Subject: [PATCH 1/8] Allow peers to override sleep config This squashed commit combines 8 commits: - Allow peers to override sleep config - Dynamic config update - Always get value - Use protobuf instead of string - Add to test - Add shell command - Use builder instead - Update UI to include sleep --- .../ReplicationPeerConfigUtil.java | 5 ++ .../replication/ReplicationPeerConfig.java | 18 ++++++- .../ReplicationPeerConfigBuilder.java | 7 +++ .../ReplicationPeerConfigTestUtil.java | 3 +- .../protobuf/server/master/Replication.proto | 1 + .../RecoveredReplicationSourceShipper.java | 49 +++++++++++++++++++ .../regionserver/ReplicationSource.java | 30 ++++++++---- .../ReplicationSourceShipper.java | 24 +++++++-- .../ReplicationSourceWALReader.java | 10 ++-- .../src/main/ruby/hbase/replication_admin.rb | 8 +++ hbase-shell/src/main/ruby/shell.rb | 1 + .../commands/set_peer_sleep_for_retries.rb | 42 ++++++++++++++++ .../test/ruby/hbase/replication_admin_test.rb | 15 ++++++ 13 files changed, 190 insertions(+), 23 deletions(-) create mode 100644 hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 57be558fb492..aa1e667837d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -318,6 +318,10 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe builder.setSerial(peer.getSerial()); } + if (peer.hasSleepForRetries()) { + builder.setSleepForRetries(peer.getSleepForRetries()); + } + Map> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); if (excludeTableCFsMap != null) { @@ -373,6 +377,7 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe builder.setBandwidth(peerConfig.getBandwidth()); builder.setReplicateAll(peerConfig.replicateAllUserTables()); builder.setSerial(peerConfig.isSerial()); + builder.setSleepForRetries(peerConfig.getSleepForRetries()); ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); if (excludeTableCFs != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 58c3a2e59cdf..d860913865eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -51,6 +51,7 @@ public class ReplicationPeerConfig { private final boolean serial; // Used by synchronous replication private String remoteWALDir; + private long sleepForRetries = 0; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -71,6 +72,7 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.bandwidth = builder.bandwidth; this.serial = builder.serial; this.remoteWALDir = builder.remoteWALDir; + this.sleepForRetries = builder.sleepForRetries; } private Map> @@ -140,6 +142,10 @@ public boolean isSerial() { return serial; } + public long getSleepForRetries() { + return this.sleepForRetries; + } + public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) { ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl(); builder.setClusterKey(peerConfig.getClusterKey()) @@ -150,7 +156,8 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()) - .setRemoteWALDir(peerConfig.getRemoteWALDir()); + .setRemoteWALDir(peerConfig.getRemoteWALDir()) + .setSleepForRetries(peerConfig.getSleepForRetries()); return builder; } @@ -181,6 +188,8 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu private String remoteWALDir = null; + private long sleepForRetries = 0; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey != null ? clusterKey.trim() : null; @@ -260,6 +269,12 @@ public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) { return this; } + @Override + public ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries) { + this.sleepForRetries = sleepForRetries; + return this; + } + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data @@ -293,6 +308,7 @@ public String toString() { if (this.remoteWALDir != null) { builder.append(",remoteWALDir=").append(remoteWALDir); } + builder.append(",sleepForRetries=").append(sleepForRetries); return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 95256d128b46..89d55d911c34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -158,6 +158,13 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map peerData */ ReplicationPeerConfigBuilder setRemoteWALDir(String dir); + /** + * Sets the sleep time between retries for this peer's replication source. + * @param sleepForRetries Sleep time in milliseconds + * @return {@code this}. + */ + ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries); + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java index ea2cb536a053..4f44c8682a17 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java @@ -67,7 +67,7 @@ public static ReplicationPeerConfig getConfig(int seed) { .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) - .setBandwidth(RNG.nextInt(1000)).build(); + .setBandwidth(RNG.nextInt(1000)).setSleepForRetries(RNG.nextInt(5000)).build(); } private static void assertSetEquals(Set expected, Set actual) { @@ -112,5 +112,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected, assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); assertEquals(expected.getBandwidth(), actual.getBandwidth()); + assertEquals(expected.getSleepForRetries(), actual.getSleepForRetries()); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto index 262a26985587..a01d87b36158 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto @@ -51,6 +51,7 @@ message ReplicationPeer { repeated bytes exclude_namespaces = 10; optional bool serial = 11; optional string remoteWALDir = 12; + optional int64 sleep_for_retries = 13; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index ece566d96006..27793dfe6ddb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -40,4 +40,53 @@ public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, protected void postFinish() { tryFinish.run(); } + + @Override + public long getStartPosition() { + long startPosition = getRecoveredQueueStartPos(); + int numRetries = 0; + while (numRetries <= maxRetriesMultiplier) { + try { + source.locateRecoveredPaths(walGroupId); + break; + } catch (IOException e) { + LOG.error("Error while locating recovered queue paths, attempt #" + numRetries, e); + numRetries++; + } + } + return startPosition; + } + + // If this is a recovered queue, the queue is already full and the first log + // normally has a position (unless the RS failed between 2 logs) + private long getRecoveredQueueStartPos() { + long startPosition = 0; + String peerClusterZNode = source.getQueueId(); + try { + startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(), + peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName()); + LOG.trace("Recovered queue started with log {} at position {}", + this.logQueue.getQueue(walGroupId).peek(), startPosition); + } catch (ReplicationException e) { + terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e); + } + return startPosition; + } + + private void terminate(String reason, Exception cause) { + if (cause == null) { + LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason); + } else { + LOG.error( + "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, + cause); + } + if (entryReader != null) { + entryReader.interrupt(); + Threads.shutdown(entryReader, source.getSleepForRetries()); + } + this.interrupt(); + Threads.shutdown(this, source.getSleepForRetries()); + LOG.info("ReplicationSourceWorker {} terminated", this.getName()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dc17ed12ff0a..302987cc9b2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -99,8 +99,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected ReplicationSourceManager manager; // Should we stop everything? protected Server server; - // How long should we sleep for each retry - private long sleepForRetries; protected FileSystem fs; // id of this cluster private UUID clusterId; @@ -204,8 +202,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); @@ -526,6 +522,19 @@ private long getCurrentBandwidth() { return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } + /** + * Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to + * global configuration. + * @return sleep time in milliseconds + */ + protected long getSleepForRetries() { + long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries(); + if (peerSleepForRetries > 0) { + return peerSleepForRetries; + } + return this.conf.getLong("replication.source.sleepforretries", 1000); + } + /** * Do the sleeping logic * @param msg Why we sleep @@ -534,11 +543,12 @@ private long getCurrentBandwidth() { */ private boolean sleepForRetries(String msg, int sleepMultiplier) { try { + long sleepForRetries = getSleepForRetries(); if (LOG.isTraceEnabled()) { LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, sleepMultiplier); } - Thread.sleep(this.sleepForRetries * sleepMultiplier); + Thread.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); @@ -594,7 +604,7 @@ private void initialize() { if (this.isSourceActive() && peerClusterId == null) { if (LOG.isDebugEnabled()) { LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), - (this.sleepForRetries * sleepMultiplier)); + (getSleepForRetries() * sleepMultiplier)); } if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; @@ -696,7 +706,7 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo // And notice that we may call terminate directly from the initThread so here we need to // avoid join on ourselves. initThread.interrupt(); - Threads.shutdown(initThread, this.sleepForRetries); + Threads.shutdown(initThread, getSleepForRetries()); } Collection workers = workerThreads.values(); @@ -713,7 +723,7 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo if (worker.isAlive() || worker.entryReader.isAlive()) { try { // Wait worker to stop - Thread.sleep(this.sleepForRetries); + Thread.sleep(getSleepForRetries()); } catch (InterruptedException e) { LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); Thread.currentThread().interrupt(); @@ -736,12 +746,12 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo if (join) { for (ReplicationSourceShipper worker : workers) { - Threads.shutdown(worker, this.sleepForRetries); + Threads.shutdown(worker, getSleepForRetries()); LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); } if (this.replicationEndpoint != null) { try { - this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, + this.replicationEndpoint.awaitTerminated(getSleepForRetries() * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("{} Got exception while waiting for endpoint to shutdown " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 4709e607fc70..284dfe695e13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -67,8 +67,6 @@ public enum WorkerState { private volatile WorkerState state; final ReplicationSourceWALReader entryReader; - // How long should we sleep for each retry - private final long sleepForRetries; // Maximum number of retries before taking bold actions private final int maxRetriesMultiplier; private final int DEFAULT_TIMEOUT = 20000; @@ -81,8 +79,6 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati this.walGroupId = walGroupId; this.source = source; this.entryReader = walReader; - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 20 seconds @@ -310,6 +306,24 @@ public boolean isFinished() { return state == WorkerState.FINISHED; } + /** + * Do the sleeping logic + * @param msg Why we sleep + * @param sleepMultiplier by how many times the default sleeping time is augmented + * @return True if sleepMultiplier is < maxRetriesMultiplier + */ + public boolean sleepForRetries(String msg, int sleepMultiplier) { + try { + long sleepForRetries = source.getSleepForRetries(); + LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); + Thread.sleep(sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping between retries"); + Thread.currentThread().interrupt(); + } + return sleepMultiplier < maxRetriesMultiplier; + } + /** * Attempts to properly update ReplicationSourceManager.totalBufferUser, in case * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't @@ -334,7 +348,7 @@ void clearWALEntryBatch() { return; } else { // Wait both shipper and reader threads to stop - Thread.sleep(this.sleepForRetries); + Thread.sleep(source.getSleepForRetries()); } } catch (InterruptedException e) { LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index fe983c9f3ae6..1f035e09e99e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -70,7 +70,6 @@ class ReplicationSourceWALReader extends Thread { private final int replicationBatchCountCapacity; // position in the WAL to start reading at private long currentPosition; - private final long sleepForRetries; private final int maxRetriesMultiplier; // Indicates whether this particular worker is running @@ -102,8 +101,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); @@ -127,7 +124,7 @@ protected final int sleep(int sleepMultiplier) { if (sleepMultiplier < maxRetriesMultiplier) { sleepMultiplier++; } - Threads.sleep(sleepForRetries * sleepMultiplier); + Threads.sleep(source.getSleepForRetries() * sleepMultiplier); return sleepMultiplier; } @@ -139,7 +136,8 @@ public void run() { source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); + waitingPeerEnabled.set(true); + Threads.sleep(source.getSleepForRetries()); continue; } if (!checkBufferQuota()) { @@ -272,7 +270,7 @@ public Path getCurrentPath() { private boolean checkBufferQuota() { // try not to go over total quota if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { - Threads.sleep(sleepForRetries); + Threads.sleep(source.getSleepForRetries()); return false; } return true; diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index f2f277008de1..5387a780b79a 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -318,6 +318,14 @@ def set_peer_bandwidth(id, bandwidth) @admin.updateReplicationPeerConfig(id, rpc) end + # Set new sleep_for_retries config for the specified peer + def set_peer_sleep_for_retries(id, sleep_for_retries) + rpc = get_peer_config(id) + return if rpc.nil? + rpc = ReplicationPeerConfig.newBuilder(rpc).setSleepForRetries(sleep_for_retries).build + @admin.updateReplicationPeerConfig(id, rpc) + end + # Append exclude namespaces config for the specified peer def append_peer_exclude_namespaces(id, namespaces) unless namespaces.nil? diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index c408bf06b227..454289430482 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -526,6 +526,7 @@ def self.exception_handler(hide_traceback) append_peer_exclude_tableCFs remove_peer_exclude_tableCFs set_peer_bandwidth + set_peer_sleep_for_retries list_replicated_tables append_peer_tableCFs remove_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb new file mode 100644 index 000000000000..c314feebeeb3 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb @@ -0,0 +1,42 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerSleepForRetries < Command + def help + <<-EOF +Set the replication source sleep time between retries for the specified peer. +Examples: + + # set sleep time to 2 seconds (2000ms) between retries for a peer + hbase> set_peer_sleep_for_retries '1', 2000 + # unset sleep time for a peer to use the global default configured in server-side + hbase> set_peer_sleep_for_retries '1', 0 + +EOF + end + + def command(id, sleep_for_retries = 0) + replication_admin.set_peer_sleep_for_retries(id, sleep_for_retries) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 2b19ecb59a40..640a7dbf57d0 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -652,6 +652,21 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, @peer_id) end + define_test 'set_peer_sleep_for_retries: works with peer sleep for retries' do + cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} + command(:add_peer, @peer_id, args) + + peer_config = command(:get_peer_config, @peer_id) + assert_equal(0, peer_config.getSleepForRetries) + command(:set_peer_sleep_for_retries, @peer_id, 2000) + peer_config = command(:get_peer_config, @peer_id) + assert_equal(2000, peer_config.getSleepForRetries) + + #cleanup + command(:remove_peer, @peer_id) + end + define_test 'get_peer_config: works with simple clusterKey peer' do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} From d97bf18446163781b736486b4974a0c85e5bb21b Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 08:30:26 -0800 Subject: [PATCH 2/8] Revert erroneous changes to RecoveredReplicationSourceShipper The previous commit incorrectly added methods (getStartPosition, getRecoveredQueueStartPos, terminate) that don't exist in upstream master. These were from the old branch base and should not be included. --- .../RecoveredReplicationSourceShipper.java | 49 ------------------- 1 file changed, 49 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 27793dfe6ddb..ece566d96006 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -40,53 +40,4 @@ public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, protected void postFinish() { tryFinish.run(); } - - @Override - public long getStartPosition() { - long startPosition = getRecoveredQueueStartPos(); - int numRetries = 0; - while (numRetries <= maxRetriesMultiplier) { - try { - source.locateRecoveredPaths(walGroupId); - break; - } catch (IOException e) { - LOG.error("Error while locating recovered queue paths, attempt #" + numRetries, e); - numRetries++; - } - } - return startPosition; - } - - // If this is a recovered queue, the queue is already full and the first log - // normally has a position (unless the RS failed between 2 logs) - private long getRecoveredQueueStartPos() { - long startPosition = 0; - String peerClusterZNode = source.getQueueId(); - try { - startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(), - peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName()); - LOG.trace("Recovered queue started with log {} at position {}", - this.logQueue.getQueue(walGroupId).peek(), startPosition); - } catch (ReplicationException e) { - terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e); - } - return startPosition; - } - - private void terminate(String reason, Exception cause) { - if (cause == null) { - LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason); - } else { - LOG.error( - "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, - cause); - } - if (entryReader != null) { - entryReader.interrupt(); - Threads.shutdown(entryReader, source.getSleepForRetries()); - } - this.interrupt(); - Threads.shutdown(this, source.getSleepForRetries()); - LOG.info("ReplicationSourceWorker {} terminated", this.getName()); - } } From ff202062a7a5a0b327a2b527216772c6428926b3 Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 08:51:13 -0800 Subject: [PATCH 3/8] Fix cherry pick and unchanged usages --- .../ReplicationSourceInterface.java | 7 ++++++ .../ReplicationSourceManager.java | 8 ++---- .../ReplicationSourceShipper.java | 25 +++---------------- 3 files changed, 13 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 69ad2887064a..406143161163 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -148,6 +148,13 @@ default boolean isSyncReplication() { return getPeer().getPeerConfig().isSyncReplication(); } + /** + * Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to + * global configuration. + * @return sleep time in milliseconds + */ + long getSleepForRetries(); + /** Returns active or not */ boolean isSourceActive(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ffaabe7e3399..eb34b57a4b93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -175,9 +175,6 @@ public class ReplicationSourceManager { private AtomicLong totalBufferUsed = new AtomicLong(); - // How long should we sleep for each retry when deleting remote wal files for sync replication - // peer. - private final long sleepForRetries; // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; @@ -227,7 +224,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.latestPaths = new HashMap<>(); - this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, @@ -747,8 +743,8 @@ private void cleanOldLogs(NavigableSet wals, ReplicationSourceInterface return; } if ( - ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, - sleepMultiplier, maxRetriesMultiplier) + ReplicationUtils.sleepForRetries("Failed to delete remote wals", + source.getSleepForRetries(), sleepMultiplier, maxRetriesMultiplier) ) { sleepMultiplier++; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 284dfe695e13..40791ffedc84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -98,7 +98,8 @@ public final void run() { if (!source.isPeerEnabled()) { // The peer enabled check is in memory, not expensive, so do not need to increase the // sleep interval as it may cause a long lag when we enable the peer. - sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); + sleepForRetries("Replication is disabled", source.getSleepForRetries(), 1, + maxRetriesMultiplier); continue; } try { @@ -216,8 +217,8 @@ private void shipEdits(WALEntryBatch entryBatch) { LOG.warn("{} threw unknown exception:", source.getReplicationEndpoint().getClass().getName(), ex); if ( - sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier, - maxRetriesMultiplier) + sleepForRetries("ReplicationEndpoint threw exception", source.getSleepForRetries(), + sleepMultiplier, maxRetriesMultiplier) ) { sleepMultiplier++; } @@ -306,24 +307,6 @@ public boolean isFinished() { return state == WorkerState.FINISHED; } - /** - * Do the sleeping logic - * @param msg Why we sleep - * @param sleepMultiplier by how many times the default sleeping time is augmented - * @return True if sleepMultiplier is < maxRetriesMultiplier - */ - public boolean sleepForRetries(String msg, int sleepMultiplier) { - try { - long sleepForRetries = source.getSleepForRetries(); - LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); - Thread.sleep(sleepForRetries * sleepMultiplier); - } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping between retries"); - Thread.currentThread().interrupt(); - } - return sleepMultiplier < maxRetriesMultiplier; - } - /** * Attempts to properly update ReplicationSourceManager.totalBufferUser, in case * there were unprocessed entries batched by the reader to the shipper, but the shipper didn't From 207dacd6d68746af66a4977fae4cedc300245944 Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 08:56:57 -0800 Subject: [PATCH 4/8] Accidental line in cherry pick --- .../replication/regionserver/ReplicationSourceWALReader.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 1f035e09e99e..7e74c2f6430c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -40,9 +40,7 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; - import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -136,7 +134,6 @@ public void run() { source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { - waitingPeerEnabled.set(true); Threads.sleep(source.getSleepForRetries()); continue; } From e40230a65da2e89a910a023c9241c4b3a636ec74 Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 08:59:54 -0800 Subject: [PATCH 5/8] Spotless --- .../replication/regionserver/ReplicationSourceWALReader.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7e74c2f6430c..a6c8b4b6ed5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -40,7 +40,9 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; From 0487429b48e86e5607049fd22f272ebc04d3414b Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 10:03:15 -0800 Subject: [PATCH 6/8] Fix weaker access --- .../hbase/replication/regionserver/ReplicationSource.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 302987cc9b2a..dcd9fbbd92e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; - import com.google.errorprone.annotations.RestrictedApi; import java.io.FileNotFoundException; import java.io.IOException; @@ -69,7 +68,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -527,7 +525,7 @@ private long getCurrentBandwidth() { * global configuration. * @return sleep time in milliseconds */ - protected long getSleepForRetries() { + public long getSleepForRetries() { long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries(); if (peerSleepForRetries > 0) { return peerSleepForRetries; From 251d4f08c3a85474b75f0bf9619dbd928da1deed Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 10:09:42 -0800 Subject: [PATCH 7/8] Fix test --- .../hbase/replication/regionserver/ReplicationSource.java | 2 ++ .../hadoop/hbase/replication/ReplicationSourceDummy.java | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dcd9fbbd92e1..84e7219859c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; + import com.google.errorprone.annotations.RestrictedApi; import java.io.FileNotFoundException; import java.io.IOException; @@ -68,6 +69,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index da0868be885f..b45b8d2d02a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -130,6 +130,11 @@ public boolean isSourceActive() { return true; } + @Override + public long getSleepForRetries() { + return 1000; // 1 second default for test + } + @Override public MetricsSource getSourceMetrics() { return metrics; From cae2efb8069d6bb6a5cdf96d0656c4d72bfb290c Mon Sep 17 00:00:00 2001 From: skhillon Date: Fri, 26 Dec 2025 16:20:20 -0800 Subject: [PATCH 8/8] Fix unit test --- .../replication/regionserver/TestReplicationSource.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 37af52eb93b9..e6a36a3da28c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -133,6 +133,7 @@ public void testDefaultSkipsMetaWAL() throws IOException { ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics()) @@ -174,6 +175,7 @@ public void testWALEntryFilter() throws IOException { ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); RegionServerServices rss = @@ -260,7 +262,10 @@ public void testTerminateTimeout() throws Exception { try { replicationEndpoint.start(); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + ReplicationPeerConfig mockPeerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(mockPeerConfig); + Mockito.when(mockPeerConfig.getSleepForRetries()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); @@ -284,7 +289,10 @@ public void testTerminateClearsBuffer() throws Exception { ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); ReplicationPeer mockPeer = mock(ReplicationPeer.class); + ReplicationPeerConfig mockPeerConfig = mock(ReplicationPeerConfig.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(mockPeerConfig); + Mockito.when(mockPeerConfig.getSleepForRetries()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); ReplicationQueueId queueId = new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); @@ -501,6 +509,7 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); FaultyReplicationEndpoint.count = 0; Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics())