Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer pe
builder.setSerial(peer.getSerial());
}

if (peer.hasSleepForRetries()) {
builder.setSleepForRetries(peer.getSleepForRetries());
}

Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
if (excludeTableCFsMap != null) {
Expand Down Expand Up @@ -373,6 +377,7 @@ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig pe
builder.setBandwidth(peerConfig.getBandwidth());
builder.setReplicateAll(peerConfig.replicateAllUserTables());
builder.setSerial(peerConfig.isSerial());
builder.setSleepForRetries(peerConfig.getSleepForRetries());

ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
if (excludeTableCFs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ReplicationPeerConfig {
private final boolean serial;
// Used by synchronous replication
private String remoteWALDir;
private long sleepForRetries = 0;

private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.clusterKey = builder.clusterKey;
Expand All @@ -71,6 +72,7 @@ private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.bandwidth = builder.bandwidth;
this.serial = builder.serial;
this.remoteWALDir = builder.remoteWALDir;
this.sleepForRetries = builder.sleepForRetries;
}

private Map<TableName, List<String>>
Expand Down Expand Up @@ -140,6 +142,10 @@ public boolean isSerial() {
return serial;
}

public long getSleepForRetries() {
return this.sleepForRetries;
}

public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
builder.setClusterKey(peerConfig.getClusterKey())
Expand All @@ -150,7 +156,8 @@ public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peer
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial())
.setRemoteWALDir(peerConfig.getRemoteWALDir());
.setRemoteWALDir(peerConfig.getRemoteWALDir())
.setSleepForRetries(peerConfig.getSleepForRetries());
return builder;
}

Expand Down Expand Up @@ -181,6 +188,8 @@ static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBu

private String remoteWALDir = null;

private long sleepForRetries = 0;

@Override
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
this.clusterKey = clusterKey != null ? clusterKey.trim() : null;
Expand Down Expand Up @@ -260,6 +269,12 @@ public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) {
return this;
}

@Override
public ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries) {
this.sleepForRetries = sleepForRetries;
return this;
}

@Override
public ReplicationPeerConfig build() {
// It would be nice to validate the configuration, but we have to work with "old" data
Expand Down Expand Up @@ -293,6 +308,7 @@ public String toString() {
if (this.remoteWALDir != null) {
builder.append(",remoteWALDir=").append(remoteWALDir);
}
builder.append(",sleepForRetries=").append(sleepForRetries);
return builder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ default ReplicationPeerConfigBuilder putAllPeerData(Map<byte[], byte[]> peerData
*/
ReplicationPeerConfigBuilder setRemoteWALDir(String dir);

/**
* Sets the sleep time between retries for this peer's replication source.
* @param sleepForRetries Sleep time in milliseconds
* @return {@code this}.
*/
ReplicationPeerConfigBuilder setSleepForRetries(long sleepForRetries);

/**
* Builds the configuration object from the current state of {@code this}.
* @return A {@link ReplicationPeerConfig} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static ReplicationPeerConfig getConfig(int seed) {
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
.setBandwidth(RNG.nextInt(1000)).setSleepForRetries(RNG.nextInt(5000)).build();
}

private static void assertSetEquals(Set<String> expected, Set<String> actual) {
Expand Down Expand Up @@ -112,5 +112,6 @@ public static void assertConfigEquals(ReplicationPeerConfig expected,
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
assertEquals(expected.getSleepForRetries(), actual.getSleepForRetries());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message ReplicationPeer {
repeated bytes exclude_namespaces = 10;
optional bool serial = 11;
optional string remoteWALDir = 12;
optional int64 sleep_for_retries = 13;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected ReplicationSourceManager manager;
// Should we stop everything?
protected Server server;
// How long should we sleep for each retry
private long sleepForRetries;
protected FileSystem fs;
// id of this cluster
private UUID clusterId;
Expand Down Expand Up @@ -204,8 +202,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
decorateConf();
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
Expand Down Expand Up @@ -526,6 +522,19 @@ private long getCurrentBandwidth() {
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}

/**
* Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to
* global configuration.
* @return sleep time in milliseconds
*/
public long getSleepForRetries() {
long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries();
if (peerSleepForRetries > 0) {
return peerSleepForRetries;
}
return this.conf.getLong("replication.source.sleepforretries", 1000);
}

/**
* Do the sleeping logic
* @param msg Why we sleep
Expand All @@ -534,11 +543,12 @@ private long getCurrentBandwidth() {
*/
private boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
long sleepForRetries = getSleepForRetries();
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
Thread.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
Expand Down Expand Up @@ -594,7 +604,7 @@ private void initialize() {
if (this.isSourceActive() && peerClusterId == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
(this.sleepForRetries * sleepMultiplier));
(getSleepForRetries() * sleepMultiplier));
}
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
Expand Down Expand Up @@ -696,7 +706,7 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo
// And notice that we may call terminate directly from the initThread so here we need to
// avoid join on ourselves.
initThread.interrupt();
Threads.shutdown(initThread, this.sleepForRetries);
Threads.shutdown(initThread, getSleepForRetries());
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();

Expand All @@ -713,7 +723,7 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
// Wait worker to stop
Thread.sleep(this.sleepForRetries);
Thread.sleep(getSleepForRetries());
} catch (InterruptedException e) {
LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
Thread.currentThread().interrupt();
Expand All @@ -736,12 +746,12 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo

if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
Threads.shutdown(worker, getSleepForRetries());
LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
}
if (this.replicationEndpoint != null) {
try {
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
this.replicationEndpoint.awaitTerminated(getSleepForRetries() * maxRetriesMultiplier,
TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("{} Got exception while waiting for endpoint to shutdown "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ default boolean isSyncReplication() {
return getPeer().getPeerConfig().isSyncReplication();
}

/**
* Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to
* global configuration.
* @return sleep time in milliseconds
*/
long getSleepForRetries();

/** Returns active or not */
boolean isSourceActive();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,6 @@ public class ReplicationSourceManager {

private AtomicLong totalBufferUsed = new AtomicLong();

// How long should we sleep for each retry when deleting remote wal files for sync replication
// peer.
private final long sleepForRetries;
// Maximum number of retries before taking bold actions when deleting remote wal files for sync
// replication peer.
private final int maxRetriesMultiplier;
Expand Down Expand Up @@ -227,7 +224,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.latestPaths = new HashMap<>();
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
Expand Down Expand Up @@ -747,8 +743,8 @@ private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface
return;
}
if (
ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
sleepMultiplier, maxRetriesMultiplier)
ReplicationUtils.sleepForRetries("Failed to delete remote wals",
source.getSleepForRetries(), sleepMultiplier, maxRetriesMultiplier)
) {
sleepMultiplier++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public enum WorkerState {
private volatile WorkerState state;
final ReplicationSourceWALReader entryReader;

// How long should we sleep for each retry
private final long sleepForRetries;
// Maximum number of retries before taking bold actions
private final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
Expand All @@ -81,8 +79,6 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati
this.walGroupId = walGroupId;
this.source = source;
this.entryReader = walReader;
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
// 20 seconds
Expand All @@ -102,7 +98,8 @@ public final void run() {
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
// sleep interval as it may cause a long lag when we enable the peer.
sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
sleepForRetries("Replication is disabled", source.getSleepForRetries(), 1,
maxRetriesMultiplier);
continue;
}
try {
Expand Down Expand Up @@ -220,8 +217,8 @@ private void shipEdits(WALEntryBatch entryBatch) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (
sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
maxRetriesMultiplier)
sleepForRetries("ReplicationEndpoint threw exception", source.getSleepForRetries(),
sleepMultiplier, maxRetriesMultiplier)
) {
sleepMultiplier++;
}
Expand Down Expand Up @@ -334,7 +331,7 @@ void clearWALEntryBatch() {
return;
} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
Thread.sleep(source.getSleepForRetries());
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class ReplicationSourceWALReader extends Thread {
private final int replicationBatchCountCapacity;
// position in the WAL to start reading at
private long currentPosition;
private final long sleepForRetries;
private final int maxRetriesMultiplier;

// Indicates whether this particular worker is running
Expand Down Expand Up @@ -102,8 +101,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
Expand All @@ -127,7 +124,7 @@ protected final int sleep(int sleepMultiplier) {
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
Threads.sleep(source.getSleepForRetries() * sleepMultiplier);
return sleepMultiplier;
}

Expand All @@ -139,7 +136,7 @@ public void run() {
source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
Threads.sleep(source.getSleepForRetries());
continue;
}
if (!checkBufferQuota()) {
Expand Down Expand Up @@ -272,7 +269,7 @@ public Path getCurrentPath() {
private boolean checkBufferQuota() {
// try not to go over total quota
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
Threads.sleep(source.getSleepForRetries());
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public boolean isSourceActive() {
return true;
}

@Override
public long getSleepForRetries() {
return 1000; // 1 second default for test
}

@Override
public MetricsSource getSourceMetrics() {
return metrics;
Expand Down
Loading