diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 57be558fb492..aa1e667837d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -318,6 +318,10 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe builder.setSerial(peer.getSerial()); } + if (peer.hasSleepForRetries()) { + builder.setSleepForRetries(peer.getSleepForRetries()); + } + Map> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); if (excludeTableCFsMap != null) { @@ -373,6 +377,7 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe builder.setBandwidth(peerConfig.getBandwidth()); builder.setReplicateAll(peerConfig.replicateAllUserTables()); builder.setSerial(peerConfig.isSerial()); + builder.setSleepForRetries(peerConfig.getSleepForRetries()); ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); if (excludeTableCFs != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 58c3a2e59cdf..d860913865eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -51,6 +51,7 @@ public class ReplicationPeerConfig { private final boolean serial; // Used by synchronous replication private String remoteWALDir; + private long sleepForRetries = 0; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -71,6 +72,7 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.bandwidth = builder.bandwidth; this.serial = builder.serial; this.remoteWALDir = builder.remoteWALDir; + this.sleepForRetries = builder.sleepForRetries; } private Map> @@ -140,6 +142,10 @@ public boolean isSerial() { return serial; } + public long getSleepForRetries() { + return this.sleepForRetries; + } + public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) { ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl(); builder.setClusterKey(peerConfig.getClusterKey()) @@ -150,7 +156,8 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()) - .setRemoteWALDir(peerConfig.getRemoteWALDir()); + .setRemoteWALDir(peerConfig.getRemoteWALDir()) + .setSleepForRetries(peerConfig.getSleepForRetries()); return builder; } @@ -181,6 +188,8 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu private String remoteWALDir = null; + private long sleepForRetries = 0; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey != null ? clusterKey.trim() : null; @@ -260,6 +269,12 @@ public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) { return this; } + @Override + public ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries) { + this.sleepForRetries = sleepForRetries; + return this; + } + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data @@ -293,6 +308,7 @@ public String toString() { if (this.remoteWALDir != null) { builder.append(",remoteWALDir=").append(remoteWALDir); } + builder.append(",sleepForRetries=").append(sleepForRetries); return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 95256d128b46..89d55d911c34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -158,6 +158,13 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map peerData */ ReplicationPeerConfigBuilder setRemoteWALDir(String dir); + /** + * Sets the sleep time between retries for this peer's replication source. + * @param sleepForRetries Sleep time in milliseconds + * @return {@code this}. + */ + ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries); + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java index ea2cb536a053..4f44c8682a17 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java @@ -67,7 +67,7 @@ public static ReplicationPeerConfig getConfig(int seed) { .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) - .setBandwidth(RNG.nextInt(1000)).build(); + .setBandwidth(RNG.nextInt(1000)).setSleepForRetries(RNG.nextInt(5000)).build(); } private static void assertSetEquals(Set expected, Set actual) { @@ -112,5 +112,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected, assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); assertEquals(expected.getBandwidth(), actual.getBandwidth()); + assertEquals(expected.getSleepForRetries(), actual.getSleepForRetries()); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto index 262a26985587..a01d87b36158 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto @@ -51,6 +51,7 @@ message ReplicationPeer { repeated bytes exclude_namespaces = 10; optional bool serial = 11; optional string remoteWALDir = 12; + optional int64 sleep_for_retries = 13; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dc17ed12ff0a..84e7219859c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -99,8 +99,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected ReplicationSourceManager manager; // Should we stop everything? protected Server server; - // How long should we sleep for each retry - private long sleepForRetries; protected FileSystem fs; // id of this cluster private UUID clusterId; @@ -204,8 +202,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); @@ -526,6 +522,19 @@ private long getCurrentBandwidth() { return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } + /** + * Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to + * global configuration. + * @return sleep time in milliseconds + */ + public long getSleepForRetries() { + long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries(); + if (peerSleepForRetries > 0) { + return peerSleepForRetries; + } + return this.conf.getLong("replication.source.sleepforretries", 1000); + } + /** * Do the sleeping logic * @param msg Why we sleep @@ -534,11 +543,12 @@ private long getCurrentBandwidth() { */ private boolean sleepForRetries(String msg, int sleepMultiplier) { try { + long sleepForRetries = getSleepForRetries(); if (LOG.isTraceEnabled()) { LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, sleepMultiplier); } - Thread.sleep(this.sleepForRetries * sleepMultiplier); + Thread.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); @@ -594,7 +604,7 @@ private void initialize() { if (this.isSourceActive() && peerClusterId == null) { if (LOG.isDebugEnabled()) { LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), - (this.sleepForRetries * sleepMultiplier)); + (getSleepForRetries() * sleepMultiplier)); } if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; @@ -696,7 +706,7 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo // And notice that we may call terminate directly from the initThread so here we need to // avoid join on ourselves. initThread.interrupt(); - Threads.shutdown(initThread, this.sleepForRetries); + Threads.shutdown(initThread, getSleepForRetries()); } Collection workers = workerThreads.values(); @@ -713,7 +723,7 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo if (worker.isAlive() || worker.entryReader.isAlive()) { try { // Wait worker to stop - Thread.sleep(this.sleepForRetries); + Thread.sleep(getSleepForRetries()); } catch (InterruptedException e) { LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); Thread.currentThread().interrupt(); @@ -736,12 +746,12 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo if (join) { for (ReplicationSourceShipper worker : workers) { - Threads.shutdown(worker, this.sleepForRetries); + Threads.shutdown(worker, getSleepForRetries()); LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); } if (this.replicationEndpoint != null) { try { - this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, + this.replicationEndpoint.awaitTerminated(getSleepForRetries() * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("{} Got exception while waiting for endpoint to shutdown " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 69ad2887064a..406143161163 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -148,6 +148,13 @@ default boolean isSyncReplication() { return getPeer().getPeerConfig().isSyncReplication(); } + /** + * Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to + * global configuration. + * @return sleep time in milliseconds + */ + long getSleepForRetries(); + /** Returns active or not */ boolean isSourceActive(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ffaabe7e3399..eb34b57a4b93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -175,9 +175,6 @@ public class ReplicationSourceManager { private AtomicLong totalBufferUsed = new AtomicLong(); - // How long should we sleep for each retry when deleting remote wal files for sync replication - // peer. - private final long sleepForRetries; // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; @@ -227,7 +224,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.latestPaths = new HashMap<>(); - this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, @@ -747,8 +743,8 @@ private void cleanOldLogs(NavigableSet wals, ReplicationSourceInterface return; } if ( - ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, - sleepMultiplier, maxRetriesMultiplier) + ReplicationUtils.sleepForRetries("Failed to delete remote wals", + source.getSleepForRetries(), sleepMultiplier, maxRetriesMultiplier) ) { sleepMultiplier++; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 4709e607fc70..40791ffedc84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -67,8 +67,6 @@ public enum WorkerState { private volatile WorkerState state; final ReplicationSourceWALReader entryReader; - // How long should we sleep for each retry - private final long sleepForRetries; // Maximum number of retries before taking bold actions private final int maxRetriesMultiplier; private final int DEFAULT_TIMEOUT = 20000; @@ -81,8 +79,6 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati this.walGroupId = walGroupId; this.source = source; this.entryReader = walReader; - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 20 seconds @@ -102,7 +98,8 @@ public final void run() { if (!source.isPeerEnabled()) { // The peer enabled check is in memory, not expensive, so do not need to increase the // sleep interval as it may cause a long lag when we enable the peer. - sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); + sleepForRetries("Replication is disabled", source.getSleepForRetries(), 1, + maxRetriesMultiplier); continue; } try { @@ -220,8 +217,8 @@ private void shipEdits(WALEntryBatch entryBatch) { LOG.warn("{} threw unknown exception:", source.getReplicationEndpoint().getClass().getName(), ex); if ( - sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier, - maxRetriesMultiplier) + sleepForRetries("ReplicationEndpoint threw exception", source.getSleepForRetries(), + sleepMultiplier, maxRetriesMultiplier) ) { sleepMultiplier++; } @@ -334,7 +331,7 @@ void clearWALEntryBatch() { return; } else { // Wait both shipper and reader threads to stop - Thread.sleep(this.sleepForRetries); + Thread.sleep(source.getSleepForRetries()); } } catch (InterruptedException e) { LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index fe983c9f3ae6..a6c8b4b6ed5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -70,7 +70,6 @@ class ReplicationSourceWALReader extends Thread { private final int replicationBatchCountCapacity; // position in the WAL to start reading at private long currentPosition; - private final long sleepForRetries; private final int maxRetriesMultiplier; // Indicates whether this particular worker is running @@ -102,8 +101,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); @@ -127,7 +124,7 @@ protected final int sleep(int sleepMultiplier) { if (sleepMultiplier < maxRetriesMultiplier) { sleepMultiplier++; } - Threads.sleep(sleepForRetries * sleepMultiplier); + Threads.sleep(source.getSleepForRetries() * sleepMultiplier); return sleepMultiplier; } @@ -139,7 +136,7 @@ public void run() { source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); + Threads.sleep(source.getSleepForRetries()); continue; } if (!checkBufferQuota()) { @@ -272,7 +269,7 @@ public Path getCurrentPath() { private boolean checkBufferQuota() { // try not to go over total quota if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { - Threads.sleep(sleepForRetries); + Threads.sleep(source.getSleepForRetries()); return false; } return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index da0868be885f..b45b8d2d02a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -130,6 +130,11 @@ public boolean isSourceActive() { return true; } + @Override + public long getSleepForRetries() { + return 1000; // 1 second default for test + } + @Override public MetricsSource getSourceMetrics() { return metrics; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 37af52eb93b9..e6a36a3da28c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -133,6 +133,7 @@ public void testDefaultSkipsMetaWAL() throws IOException { ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics()) @@ -174,6 +175,7 @@ public void testWALEntryFilter() throws IOException { ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); RegionServerServices rss = @@ -260,7 +262,10 @@ public void testTerminateTimeout() throws Exception { try { replicationEndpoint.start(); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); + ReplicationPeerConfig mockPeerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(mockPeerConfig); + Mockito.when(mockPeerConfig.getSleepForRetries()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); @@ -284,7 +289,10 @@ public void testTerminateClearsBuffer() throws Exception { ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); ReplicationPeer mockPeer = mock(ReplicationPeer.class); + ReplicationPeerConfig mockPeerConfig = mock(ReplicationPeerConfig.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Mockito.when(mockPeer.getPeerConfig()).thenReturn(mockPeerConfig); + Mockito.when(mockPeerConfig.getSleepForRetries()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); ReplicationQueueId queueId = new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); @@ -501,6 +509,7 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); FaultyReplicationEndpoint.count = 0; Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics()) diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index f2f277008de1..5387a780b79a 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -318,6 +318,14 @@ def set_peer_bandwidth(id, bandwidth) @admin.updateReplicationPeerConfig(id, rpc) end + # Set new sleep_for_retries config for the specified peer + def set_peer_sleep_for_retries(id, sleep_for_retries) + rpc = get_peer_config(id) + return if rpc.nil? + rpc = ReplicationPeerConfig.newBuilder(rpc).setSleepForRetries(sleep_for_retries).build + @admin.updateReplicationPeerConfig(id, rpc) + end + # Append exclude namespaces config for the specified peer def append_peer_exclude_namespaces(id, namespaces) unless namespaces.nil? diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index c408bf06b227..454289430482 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -526,6 +526,7 @@ def self.exception_handler(hide_traceback) append_peer_exclude_tableCFs remove_peer_exclude_tableCFs set_peer_bandwidth + set_peer_sleep_for_retries list_replicated_tables append_peer_tableCFs remove_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb new file mode 100644 index 000000000000..c314feebeeb3 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb @@ -0,0 +1,42 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerSleepForRetries < Command + def help + <<-EOF +Set the replication source sleep time between retries for the specified peer. +Examples: + + # set sleep time to 2 seconds (2000ms) between retries for a peer + hbase> set_peer_sleep_for_retries '1', 2000 + # unset sleep time for a peer to use the global default configured in server-side + hbase> set_peer_sleep_for_retries '1', 0 + +EOF + end + + def command(id, sleep_for_retries = 0) + replication_admin.set_peer_sleep_for_retries(id, sleep_for_retries) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 2b19ecb59a40..640a7dbf57d0 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -652,6 +652,21 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, @peer_id) end + define_test 'set_peer_sleep_for_retries: works with peer sleep for retries' do + cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} + command(:add_peer, @peer_id, args) + + peer_config = command(:get_peer_config, @peer_id) + assert_equal(0, peer_config.getSleepForRetries) + command(:set_peer_sleep_for_retries, @peer_id, 2000) + peer_config = command(:get_peer_config, @peer_id) + assert_equal(2000, peer_config.getSleepForRetries) + + #cleanup + command(:remove_peer, @peer_id) + end + define_test 'get_peer_config: works with simple clusterKey peer' do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}