Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions twill-api/src/main/java/org/apache/twill/api/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public static final class Keys {
*/
public static final String FILE_CONTEXT_CACHE_MAX_SIZE = "twill.file.context.cache.max.size";

/**
* The timeout seconds for zookeeper path service.
*/
public static final String ZOOKEEPER_PATH_SERVICE_TIMEOUT = "twill.zookeeper.path.service.timeout.secs";

private Keys() {
}
}
Expand Down Expand Up @@ -149,6 +154,11 @@ public static final class Defaults {
*/
public static final int FILE_CONTEXT_CACHE_MAX_SIZE = 100;

/**
* Default timeout value in seconds for zookeeper path service.
*/
public static final long ZOOKEEPER_PATH_SERVICE_TIMEOUT = 5L;


private Defaults() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.Configs;
import org.apache.twill.api.RunId;
import org.apache.twill.filesystem.FileContextLocationFactory;
import org.apache.twill.filesystem.LocalLocationFactory;
Expand Down Expand Up @@ -248,30 +249,30 @@ public FilterReply decide(ILoggingEvent event) {
* A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
*/
protected static class TwillZKPathService extends AbstractIdleService {

protected static final long TIMEOUT_SECONDS = 5L;

private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);

private final ZKClient zkClient;
private final String path;
protected final long timeoutSecs;

public TwillZKPathService(ZKClient zkClient, RunId runId) {
public TwillZKPathService(ZKClient zkClient, RunId runId, Configuration conf) {
this.zkClient = zkClient;
this.path = "/" + runId.getId();
this.timeoutSecs = conf.getLong(Configs.Keys.ZOOKEEPER_PATH_SERVICE_TIMEOUT,
Configs.Defaults.ZOOKEEPER_PATH_SERVICE_TIMEOUT);
}

@Override
protected void startUp() throws Exception {
LOG.info("Creating container ZK path: {}{}", zkClient.getConnectString(), path);
ZKOperations.ignoreError(zkClient.create(path, null, CreateMode.PERSISTENT),
KeeperException.NodeExistsException.class, null).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
KeeperException.NodeExistsException.class, null).get(timeoutSecs, TimeUnit.SECONDS);
}

@Override
protected void shutDown() throws Exception {
LOG.info("Removing container ZK path: {}{}", zkClient.getConnectString(), path);
ZKOperations.recursiveDelete(zkClient, path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
ZKOperations.recursiveDelete(zkClient, path).get(timeoutSecs, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void doMain() throws Exception {
List<Service> prerequisites = Lists.newArrayList(
new YarnAMClientService(amClient, trackerService),
zkClientService,
new AppMasterTwillZKPathService(zkClientService, runId)
new AppMasterTwillZKPathService(zkClientService, runId, conf)
);

if (twillRuntimeSpec.isLogCollectionEnabled()) {
Expand Down Expand Up @@ -256,8 +256,8 @@ private static final class AppMasterTwillZKPathService extends TwillZKPathServic
private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
private final ZKClient zkClient;

AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
super(zkClient, runId);
AppMasterTwillZKPathService(ZKClient zkClient, RunId runId, Configuration conf) {
super(zkClient, runId, conf);
this.zkClient = zkClient;
}

Expand Down Expand Up @@ -285,7 +285,7 @@ protected void shutDown() throws Exception {
LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
deleteFutures.add(zkClient.delete(path));
}
Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Futures.successfulAsList(deleteFutures).get(timeoutSecs, TimeUnit.SECONDS);
for (OperationFuture<?> future : deleteFutures) {
try {
future.get();
Expand Down Expand Up @@ -324,7 +324,7 @@ protected void shutDown() throws Exception {
private boolean delete(String path) throws Exception {
try {
LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
zkClient.delete(path).get(timeoutSecs, TimeUnit.SECONDS);
return true;
} catch (ExecutionException e) {
if (e.getCause() instanceof KeeperException.NotEmptyException) {
Expand All @@ -347,7 +347,7 @@ private boolean delete(String path) throws Exception {
*/
private List<String> getChildren(String path) throws Exception {
try {
return zkClient.getChildren(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
return zkClient.getChildren(path).get(timeoutSecs, TimeUnit.SECONDS).getChildren();
} catch (ExecutionException e) {
if (e.getCause() instanceof KeeperException.NoNodeException) {
// If the node doesn't exists, return an empty list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ runId, runnableSpec, getClassLoader(), conf,
service,
zkClientService,
new LogFlushService(),
new TwillZKPathService(containerZKClient, runId),
new TwillZKPathService(containerZKClient, runId, conf),
new CloseableServiceWrapper(discoveryService)
);
}
Expand Down