Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe
builder.setSerial(peer.getSerial());
}

if (peer.hasSleepForRetries()) {
builder.setSleepForRetries(peer.getSleepForRetries());
}

Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
if (excludeTableCFsMap != null) {
Expand Down Expand Up @@ -368,6 +372,7 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe
builder.setBandwidth(peerConfig.getBandwidth());
builder.setReplicateAll(peerConfig.replicateAllUserTables());
builder.setSerial(peerConfig.isSerial());
builder.setSleepForRetries(peerConfig.getSleepForRetries());

ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
if (excludeTableCFs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ReplicationPeerConfig {
private Set<String> excludeNamespaces = null;
private long bandwidth = 0;
private final boolean serial;
private long sleepForRetries = 0;

private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.clusterKey = builder.clusterKey;
Expand All @@ -67,6 +68,7 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
: null;
this.bandwidth = builder.bandwidth;
this.serial = builder.serial;
this.sleepForRetries = builder.sleepForRetries;
}

private Map<TableName, List<String>>
Expand Down Expand Up @@ -222,6 +224,10 @@ public boolean isSerial() {
return serial;
}

public long getSleepForRetries() {
return this.sleepForRetries;
}

public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
builder.setClusterKey(peerConfig.getClusterKey())
Expand All @@ -231,7 +237,8 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial());
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial())
.setSleepForRetries(peerConfig.getSleepForRetries());
return builder;
}

Expand Down Expand Up @@ -260,6 +267,8 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu

private boolean serial = false;

private long sleepForRetries = 0;

@Override
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
this.clusterKey = clusterKey != null ? clusterKey.trim() : null;
Expand Down Expand Up @@ -333,6 +342,12 @@ public ReplicationPeerConfigBuilder setSerial(boolean serial) {
return this;
}

@Override
public ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries) {
this.sleepForRetries = sleepForRetries;
return this;
}

@Override
public ReplicationPeerConfig build() {
// It would be nice to validate the configuration, but we have to work with "old" data
Expand Down Expand Up @@ -362,7 +377,8 @@ public String toString() {
}
}
builder.append("bandwidth=").append(bandwidth).append(",");
builder.append("serial=").append(serial);
builder.append("serial=").append(serial).append(",");
builder.append("sleepForRetries=").append(sleepForRetries);
return builder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData
*/
ReplicationPeerConfigBuilder setSerial(boolean serial);

/**
* Sets the sleep time between retries for this peer's replication source.
* @param sleepForRetries Sleep time in milliseconds
* @return {@code this}.
*/
ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries);

/**
* Builds the configuration object from the current state of {@code this}.
* @return A {@link ReplicationPeerConfig} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static ReplicationPeerConfig getConfig(int seed) {
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG))
.setTableCFsMap(randTableCFs(RNG)).setExcludeTableCFsMap(randTableCFs(RNG))
.setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000)).build();
.setReplicateAllUserTables(RNG.nextBoolean()).setBandwidth(RNG.nextInt(1000))
.setSleepForRetries(RNG.nextInt(5000)).build();
}

private static void assertSetEquals(Set<String> expected, Set<String> actual) {
Expand Down Expand Up @@ -111,5 +112,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected,
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
assertEquals(expected.getSleepForRetries(), actual.getSleepForRetries());
}
}
2 changes: 2 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ message ReplicationPeer {
repeated TableCF exclude_table_cfs = 9;
repeated bytes exclude_namespaces = 10;
optional bool serial = 11;
optional string remoteWALDir = 12;
optional int64 sleep_for_retries = 13;
Comment on lines +53 to +54
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure of the intended approach here. In #7577, we add sleep_for_retries as field 13. However, remoteWALDir does not exist in branch-2. I have added it here as a dummy value so that upgrades from branch-2 to branch-3 can be done seamlessly. If that is not the right approach, I'm happy to change it.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ private void terminate(String reason, Exception cause) {
}
if (entryReader != null) {
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
Threads.shutdown(entryReader, source.getSleepForRetries());
}
this.interrupt();
Threads.shutdown(this, sleepForRetries);
Threads.shutdown(this, source.getSleepForRetries());
LOG.info("ReplicationSourceWorker {} terminated", this.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected ReplicationSourceManager manager;
// Should we stop everything?
protected Server server;
// How long should we sleep for each retry
private long sleepForRetries;
protected FileSystem fs;
// id of this cluster
private UUID clusterId;
Expand Down Expand Up @@ -200,8 +198,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
decorateConf();
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
Expand Down Expand Up @@ -491,6 +487,19 @@ private long getCurrentBandwidth() {
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}

/**
* Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to
* global configuration.
* @return sleep time in milliseconds
*/
protected long getSleepForRetries() {
long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries();
if (peerSleepForRetries > 0) {
return peerSleepForRetries;
}
return this.conf.getLong("replication.source.sleepforretries", 1000);
}

/**
* Do the sleeping logic
* @param msg Why we sleep
Expand All @@ -499,11 +508,12 @@ private long getCurrentBandwidth() {
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
long sleepForRetries = getSleepForRetries();
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
Thread.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
Expand Down Expand Up @@ -568,7 +578,7 @@ private void initialize() {
if (this.isSourceActive() && peerClusterId == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
(this.sleepForRetries * sleepMultiplier));
(getSleepForRetries() * sleepMultiplier));
}
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
Expand Down Expand Up @@ -666,7 +676,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
// And notice that we may call terminate directly from the initThread so here we need to
// avoid join on ourselves.
initThread.interrupt();
Threads.shutdown(initThread, this.sleepForRetries);
Threads.shutdown(initThread, getSleepForRetries());
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();

Expand All @@ -685,7 +695,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
// Wait worker to stop
Thread.sleep(this.sleepForRetries);
Thread.sleep(getSleepForRetries());
} catch (InterruptedException e) {
LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
Thread.currentThread().interrupt();
Expand All @@ -708,12 +718,12 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool

if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
Threads.shutdown(worker, getSleepForRetries());
LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
}
if (this.replicationEndpoint != null) {
try {
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
this.replicationEndpoint.awaitTerminated(getSleepForRetries() * maxRetriesMultiplier,
TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("{} Got exception while waiting for endpoint to shutdown "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public enum WorkerState {
private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;

// How long should we sleep for each retry
protected final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
Expand All @@ -81,8 +79,6 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId,
this.walGroupId = walGroupId;
this.logQueue = logQueue;
this.source = source;
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
// 20 seconds
Expand Down Expand Up @@ -316,8 +312,9 @@ public boolean isFinished() {
*/
public boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
long sleepForRetries = source.getSleepForRetries();
LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
Thread.sleep(this.sleepForRetries * sleepMultiplier);
Thread.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -349,7 +346,7 @@ void clearWALEntryBatch() {
return;
} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
Thread.sleep(source.getSleepForRetries());
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class ReplicationSourceWALReader extends Thread {
private final int replicationBatchCountCapacity;
// position in the WAL to start reading at
private long currentPosition;
private final long sleepForRetries;
private final int maxRetriesMultiplier;

// Indicates whether this particular worker is running
Expand Down Expand Up @@ -103,8 +102,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
Expand All @@ -128,7 +125,7 @@ protected final int sleep(int sleepMultiplier) {
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
Threads.sleep(source.getSleepForRetries() * sleepMultiplier);
return sleepMultiplier;
}

Expand All @@ -141,7 +138,7 @@ public void run() {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
waitingPeerEnabled.set(true);
Threads.sleep(sleepForRetries);
Threads.sleep(source.getSleepForRetries());
continue;
} else {
waitingPeerEnabled.set(false);
Expand Down Expand Up @@ -276,7 +273,7 @@ public Path getCurrentPath() {
private boolean checkBufferQuota() {
// try not to go over total quota
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
Threads.sleep(source.getSleepForRetries());
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testDefaultSkipsMetaWAL() throws IOException {
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
when(peerConfig.getReplicationEndpointImpl())
.thenReturn(DoNothingReplicationEndpoint.class.getName());
when(peerConfig.getSleepForRetries()).thenReturn(0L);
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
Mockito.when(manager.getGlobalMetrics())
Expand Down Expand Up @@ -169,6 +170,7 @@ public void testWALEntryFilter() throws IOException {
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
when(peerConfig.getReplicationEndpointImpl())
.thenReturn(DoNothingReplicationEndpoint.class.getName());
when(peerConfig.getSleepForRetries()).thenReturn(0L);
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
String queueId = "qid";
Expand Down Expand Up @@ -253,7 +255,10 @@ public void testTerminateTimeout() throws Exception {
try {
replicationEndpoint.start();
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
ReplicationPeerConfig mockPeerConfig = mock(ReplicationPeerConfig.class);
when(mockPeer.getPeerBandwidth()).thenReturn(0L);
when(mockPeer.getPeerConfig()).thenReturn(mockPeerConfig);
when(mockPeerConfig.getSleepForRetries()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
Expand All @@ -275,6 +280,9 @@ public void testTerminateClearsBuffer() throws Exception {
null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
when(peerConfig.getSleepForRetries()).thenReturn(0L);
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer",
null, p -> OptionalLong.empty(), mock(MetricsSource.class));
Expand Down Expand Up @@ -518,6 +526,7 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
FaultyReplicationEndpoint.count = 0;
when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
when(peerConfig.getSleepForRetries()).thenReturn(0L);
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
Mockito.when(manager.getGlobalMetrics())
Expand Down Expand Up @@ -634,6 +643,7 @@ public void testAgeOfOldestWal() throws Exception {
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
Mockito.when(peerConfig.getReplicationEndpointImpl())
.thenReturn(DoNothingReplicationEndpoint.class.getName());
Mockito.when(peerConfig.getSleepForRetries()).thenReturn(0L);
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getGlobalMetrics())
Expand Down
15 changes: 11 additions & 4 deletions hbase-shell/src/main/ruby/hbase/replication_admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,17 @@ def show_peer_namespaces(peer_config)
# Set new bandwidth config for the specified peer
def set_peer_bandwidth(id, bandwidth)
rpc = get_peer_config(id)
unless rpc.nil?
rpc.setBandwidth(bandwidth)
@admin.updateReplicationPeerConfig(id, rpc)
end
return if rpc.nil?
rpc = ReplicationPeerConfig.newBuilder(rpc).setBandwidth(bandwidth).build
@admin.updateReplicationPeerConfig(id, rpc)
end

# Set new sleep_for_retries config for the specified peer
def set_peer_sleep_for_retries(id, sleep_for_retries)
rpc = get_peer_config(id)
return if rpc.nil?
rpc = ReplicationPeerConfig.newBuilder(rpc).setSleepForRetries(sleep_for_retries).build
@admin.updateReplicationPeerConfig(id, rpc)
end

# Append exclude namespaces config for the specified peer
Expand Down
1 change: 1 addition & 0 deletions hbase-shell/src/main/ruby/shell.rb
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ def self.exception_handler(hide_traceback)
append_peer_exclude_tableCFs
remove_peer_exclude_tableCFs
set_peer_bandwidth
set_peer_sleep_for_retries
list_replicated_tables
append_peer_tableCFs
remove_peer_tableCFs
Expand Down
Loading