diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 5618a4166afe..4e3689d43521 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -316,6 +316,10 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe builder.setSerial(peer.getSerial()); } + if (peer.hasSleepForRetries()) { + builder.setSleepForRetries(peer.getSleepForRetries()); + } + Map> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); if (excludeTableCFsMap != null) { @@ -368,6 +372,7 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe builder.setBandwidth(peerConfig.getBandwidth()); builder.setReplicateAll(peerConfig.replicateAllUserTables()); builder.setSerial(peerConfig.isSerial()); + builder.setSleepForRetries(peerConfig.getSleepForRetries()); ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); if (excludeTableCFs != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 940148c85b6b..49fd5e66448a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -48,6 +48,7 @@ public class ReplicationPeerConfig { private Set excludeNamespaces = null; private long bandwidth = 0; private final boolean serial; + private long sleepForRetries = 0; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -67,6 +68,7 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { : null; this.bandwidth = builder.bandwidth; this.serial = builder.serial; + this.sleepForRetries = builder.sleepForRetries; } private Map> @@ -222,6 +224,10 @@ public boolean isSerial() { return serial; } + public long getSleepForRetries() { + return this.sleepForRetries; + } + public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) { ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl(); builder.setClusterKey(peerConfig.getClusterKey()) @@ -231,7 +237,8 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()); + .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()) + .setSleepForRetries(peerConfig.getSleepForRetries()); return builder; } @@ -260,6 +267,8 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu private boolean serial = false; + private long sleepForRetries = 0; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey != null ? clusterKey.trim() : null; @@ -333,6 +342,12 @@ public ReplicationPeerConfigBuilder setSerial(boolean serial) { return this; } + @Override + public ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries) { + this.sleepForRetries = sleepForRetries; + return this; + } + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data @@ -362,7 +377,8 @@ public String toString() { } } builder.append("bandwidth=").append(bandwidth).append(","); - builder.append("serial=").append(serial); + builder.append("serial=").append(serial).append(","); + builder.append("sleepForRetries=").append(sleepForRetries); return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 63cae1f5b990..358971c4e22b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -151,6 +151,13 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map peerData */ ReplicationPeerConfigBuilder setSerial(boolean serial); + /** + * Sets the sleep time between retries for this peer's replication source. + * @param sleepForRetries Sleep time in milliseconds + * @return {@code this}. + */ + ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries); + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java index 634f4626da52..890dce8a0ee5 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java @@ -66,7 +66,8 @@ public static ReplicationPeerConfig getConfig(int seed) { .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) .setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG)) - .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build(); + .setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)) + .setSleepForRetries(RNG.nextInt(5000)).build(); } private static void assertSetEquals(Set expected, Set actual) { @@ -111,5 +112,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected, assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); assertEquals(expected.getBandwidth(), actual.getBandwidth()); + assertEquals(expected.getSleepForRetries(), actual.getSleepForRetries()); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 545e89eaaec7..8b64e492ed13 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -50,6 +50,8 @@ message ReplicationPeer { repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; optional bool serial = 11; + optional string remoteWALDir = 12; + optional int64 sleep_for_retries = 13; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index b01df7db8493..1fad705de9c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -99,10 +99,10 @@ private void terminate(String reason, Exception cause) { } if (entryReader != null) { entryReader.interrupt(); - Threads.shutdown(entryReader, sleepForRetries); + Threads.shutdown(entryReader, source.getSleepForRetries()); } this.interrupt(); - Threads.shutdown(this, sleepForRetries); + Threads.shutdown(this, source.getSleepForRetries()); LOG.info("ReplicationSourceWorker {} terminated", this.getName()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2ce155125540..a7ffba73a2a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -98,8 +98,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected ReplicationSourceManager manager; // Should we stop everything? protected Server server; - // How long should we sleep for each retry - private long sleepForRetries; protected FileSystem fs; // id of this cluster private UUID clusterId; @@ -200,8 +198,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); @@ -491,6 +487,19 @@ private long getCurrentBandwidth() { return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } + /** + * Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to + * global configuration. + * @return sleep time in milliseconds + */ + protected long getSleepForRetries() { + long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries(); + if (peerSleepForRetries > 0) { + return peerSleepForRetries; + } + return this.conf.getLong("replication.source.sleepforretries", 1000); + } + /** * Do the sleeping logic * @param msg Why we sleep @@ -499,11 +508,12 @@ private long getCurrentBandwidth() { */ protected boolean sleepForRetries(String msg, int sleepMultiplier) { try { + long sleepForRetries = getSleepForRetries(); if (LOG.isTraceEnabled()) { LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, sleepMultiplier); } - Thread.sleep(this.sleepForRetries * sleepMultiplier); + Thread.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); @@ -568,7 +578,7 @@ private void initialize() { if (this.isSourceActive() && peerClusterId == null) { if (LOG.isDebugEnabled()) { LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), - (this.sleepForRetries * sleepMultiplier)); + (getSleepForRetries() * sleepMultiplier)); } if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; @@ -666,7 +676,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool // And notice that we may call terminate directly from the initThread so here we need to // avoid join on ourselves. initThread.interrupt(); - Threads.shutdown(initThread, this.sleepForRetries); + Threads.shutdown(initThread, getSleepForRetries()); } Collection workers = workerThreads.values(); @@ -685,7 +695,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool if (worker.isAlive() || worker.entryReader.isAlive()) { try { // Wait worker to stop - Thread.sleep(this.sleepForRetries); + Thread.sleep(getSleepForRetries()); } catch (InterruptedException e) { LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); Thread.currentThread().interrupt(); @@ -708,12 +718,12 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool if (join) { for (ReplicationSourceShipper worker : workers) { - Threads.shutdown(worker, this.sleepForRetries); + Threads.shutdown(worker, getSleepForRetries()); LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); } if (this.replicationEndpoint != null) { try { - this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, + this.replicationEndpoint.awaitTerminated(getSleepForRetries() * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("{} Got exception while waiting for endpoint to shutdown " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 746c845908f2..98ba242c83f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -67,8 +67,6 @@ public enum WorkerState { private volatile WorkerState state; protected ReplicationSourceWALReader entryReader; - // How long should we sleep for each retry - protected final long sleepForRetries; // Maximum number of retries before taking bold actions protected final int maxRetriesMultiplier; private final int DEFAULT_TIMEOUT = 20000; @@ -81,8 +79,6 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, this.walGroupId = walGroupId; this.logQueue = logQueue; this.source = source; - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 20 seconds @@ -316,8 +312,9 @@ public boolean isFinished() { */ public boolean sleepForRetries(String msg, int sleepMultiplier) { try { + long sleepForRetries = source.getSleepForRetries(); LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); - Thread.sleep(this.sleepForRetries * sleepMultiplier); + Thread.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping between retries"); Thread.currentThread().interrupt(); @@ -349,7 +346,7 @@ void clearWALEntryBatch() { return; } else { // Wait both shipper and reader threads to stop - Thread.sleep(this.sleepForRetries); + Thread.sleep(source.getSleepForRetries()); } } catch (InterruptedException e) { LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 26360cbe3ea1..b3a28f1101a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -69,7 +69,6 @@ class ReplicationSourceWALReader extends Thread { private final int replicationBatchCountCapacity; // position in the WAL to start reading at private long currentPosition; - private final long sleepForRetries; private final int maxRetriesMultiplier; // Indicates whether this particular worker is running @@ -103,8 +102,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - // 1 second - this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); @@ -128,7 +125,7 @@ protected final int sleep(int sleepMultiplier) { if (sleepMultiplier < maxRetriesMultiplier) { sleepMultiplier++; } - Threads.sleep(sleepForRetries * sleepMultiplier); + Threads.sleep(source.getSleepForRetries() * sleepMultiplier); return sleepMultiplier; } @@ -141,7 +138,7 @@ public void run() { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { waitingPeerEnabled.set(true); - Threads.sleep(sleepForRetries); + Threads.sleep(source.getSleepForRetries()); continue; } else { waitingPeerEnabled.set(false); @@ -276,7 +273,7 @@ public Path getCurrentPath() { private boolean checkBufferQuota() { // try not to go over total quota if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { - Threads.sleep(sleepForRetries); + Threads.sleep(source.getSleepForRetries()); return false; } return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index b12c8eb73890..6bde5238fc7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -130,6 +130,7 @@ public void testDefaultSkipsMetaWAL() throws IOException { ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + when(peerConfig.getSleepForRetries()).thenReturn(0L); when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics()) @@ -169,6 +170,7 @@ public void testWALEntryFilter() throws IOException { ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + when(peerConfig.getSleepForRetries()).thenReturn(0L); when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); String queueId = "qid"; @@ -253,7 +255,10 @@ public void testTerminateTimeout() throws Exception { try { replicationEndpoint.start(); ReplicationPeer mockPeer = mock(ReplicationPeer.class); + ReplicationPeerConfig mockPeerConfig = mock(ReplicationPeerConfig.class); when(mockPeer.getPeerBandwidth()).thenReturn(0L); + when(mockPeer.getPeerConfig()).thenReturn(mockPeerConfig); + when(mockPeerConfig.getSleepForRetries()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); @@ -275,6 +280,9 @@ public void testTerminateClearsBuffer() throws Exception { null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + when(peerConfig.getSleepForRetries()).thenReturn(0L); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); Configuration testConf = HBaseConfiguration.create(); source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); @@ -518,6 +526,7 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); FaultyReplicationEndpoint.count = 0; when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); + when(peerConfig.getSleepForRetries()).thenReturn(0L); when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics()) @@ -634,6 +643,7 @@ public void testAgeOfOldestWal() throws Exception { ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); Mockito.when(peerConfig.getReplicationEndpointImpl()) .thenReturn(DoNothingReplicationEndpoint.class.getName()); + Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getGlobalMetrics()) diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index dfe263bf7b6c..0a5b122392d9 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -308,10 +308,17 @@ def show_peer_namespaces(peer_config) # Set new bandwidth config for the specified peer def set_peer_bandwidth(id, bandwidth) rpc = get_peer_config(id) - unless rpc.nil? - rpc.setBandwidth(bandwidth) - @admin.updateReplicationPeerConfig(id, rpc) - end + return if rpc.nil? + rpc = ReplicationPeerConfig.newBuilder(rpc).setBandwidth(bandwidth).build + @admin.updateReplicationPeerConfig(id, rpc) + end + + # Set new sleep_for_retries config for the specified peer + def set_peer_sleep_for_retries(id, sleep_for_retries) + rpc = get_peer_config(id) + return if rpc.nil? + rpc = ReplicationPeerConfig.newBuilder(rpc).setSleepForRetries(sleep_for_retries).build + @admin.updateReplicationPeerConfig(id, rpc) end # Append exclude namespaces config for the specified peer diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 70aa7b8ae619..94188a8c88bf 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -525,6 +525,7 @@ def self.exception_handler(hide_traceback) append_peer_exclude_tableCFs remove_peer_exclude_tableCFs set_peer_bandwidth + set_peer_sleep_for_retries list_replicated_tables append_peer_tableCFs remove_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb new file mode 100644 index 000000000000..c314feebeeb3 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb @@ -0,0 +1,42 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerSleepForRetries < Command + def help + <<-EOF +Set the replication source sleep time between retries for the specified peer. +Examples: + + # set sleep time to 2 seconds (2000ms) between retries for a peer + hbase> set_peer_sleep_for_retries '1', 2000 + # unset sleep time for a peer to use the global default configured in server-side + hbase> set_peer_sleep_for_retries '1', 0 + +EOF + end + + def command(id, sleep_for_retries = 0) + replication_admin.set_peer_sleep_for_retries(id, sleep_for_retries) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 94873de8d6be..68031ea407c1 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -599,6 +599,21 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, @peer_id) end + define_test 'set_peer_sleep_for_retries: works with peer sleep for retries' do + cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" + args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} + command(:add_peer, @peer_id, args) + + peer_config = command(:get_peer_config, @peer_id) + assert_equal(0, peer_config.getSleepForRetries) + command(:set_peer_sleep_for_retries, @peer_id, 2000) + peer_config = command(:get_peer_config, @peer_id) + assert_equal(2000, peer_config.getSleepForRetries) + + #cleanup + command(:remove_peer, @peer_id) + end + define_test 'get_peer_config: works with simple clusterKey peer' do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}