Skip to content

Commit 3938781

Browse files
committed
reworked am registry service lifecycle
1 parent 6bf4f1f commit 3938781

File tree

5 files changed

+50
-44
lines changed

5 files changed

+50
-44
lines changed

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistry.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.tez.client.registry;
2020

2121

22-
import org.apache.hadoop.service.AbstractService;
2322
import org.apache.hadoop.yarn.api.records.ApplicationId;
2423

2524

@@ -38,18 +37,16 @@
3837
* </ul>
3938
* </p>
4039
*/
41-
public abstract class AMRegistry extends AbstractService {
42-
/* Implementations should provide a public no-arg constructor */
43-
protected AMRegistry(String name) {
44-
super(name);
45-
}
40+
public interface AMRegistry extends AutoCloseable {
4641

47-
public abstract void add(AMRecord server) throws Exception;
42+
void add(AMRecord server) throws Exception;
4843

49-
public abstract void remove(AMRecord server) throws Exception;
44+
void remove(AMRecord server) throws Exception;
5045

51-
public abstract ApplicationId generateNewId() throws Exception;
46+
ApplicationId generateNewId() throws Exception;
5247

53-
public abstract AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port,
48+
AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port,
5449
String computeName);
50+
51+
void close();
5552
}

tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/zookeeper/ZkAMRegistry.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* Clients should use org.apache.tez.dag.api.client.registry.zookeeper.ZkAMRegistryClient instead.
4646
*/
4747
@InterfaceAudience.Private
48-
public class ZkAMRegistry extends AMRegistry {
48+
public class ZkAMRegistry implements AMRegistry {
4949

5050
private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistry.class);
5151

@@ -57,20 +57,17 @@ public class ZkAMRegistry extends AMRegistry {
5757
private ZkConfig zkConfig = null;
5858

5959
public ZkAMRegistry(String externalId) {
60-
super("ZkAMRegistry");
6160
this.externalId = externalId;
6261
}
6362

64-
@Override
65-
public void serviceInit(Configuration conf) {
63+
public void init(Configuration conf) {
6664
zkConfig = new ZkConfig(conf);
6765
this.client = zkConfig.createCuratorFramework();
6866
this.namespace = zkConfig.getZkNamespace();
6967
LOG.info("ZkAMRegistry initialized");
7068
}
7169

72-
@Override
73-
public void serviceStart() throws Exception {
70+
public void start() throws Exception {
7471
client.start();
7572
LOG.info("ZkAMRegistry started");
7673
}
@@ -81,11 +78,8 @@ public void serviceStart() throws Exception {
8178
*
8279
* <p>After all removal attempts, the ZooKeeper client is closed and the shutdown
8380
* is logged.</p>
84-
*
85-
* @throws Exception if a failure occurs while closing the ZooKeeper client
8681
*/
87-
@Override
88-
public void serviceStop() throws Exception {
82+
public void close() {
8983
for (AMRecord amRecord : new ArrayList<>(amRecords)) {
9084
try {
9185
remove(amRecord);

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.FileNotFoundException;
2626
import java.io.IOException;
2727
import java.io.PrintWriter;
28+
import java.net.InetSocketAddress;
2829
import java.net.URI;
2930
import java.net.URISyntaxException;
3031
import java.net.URL;
@@ -45,6 +46,7 @@
4546
import java.util.Map;
4647
import java.util.Map.Entry;
4748
import java.util.Objects;
49+
import java.util.Optional;
4850
import java.util.Random;
4951
import java.util.Set;
5052
import java.util.Timer;
@@ -618,11 +620,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
618620
Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder()
619621
.setDaemon(true).setNameFormat("App Shared Pool - #%d").build());
620622
execService = MoreExecutors.listeningDecorator(rawExecutor);
621-
AMRegistry amRegistry = frameworkService.getAMRegistry(conf);
622-
if (amRegistry != null) {
623-
initAmRegistry(appAttemptID.getApplicationId(), amRegistry, clientRpcServer);
624-
addIfService(amRegistry, false);
625-
}
626623

627624
initServices(conf);
628625
super.serviceInit(conf);
@@ -650,16 +647,35 @@ private static ServerFrameworkService getFrameworkService(Configuration conf) {
650647
return FrameworkUtils.get(ServerFrameworkService.class, conf, YarnServerFrameworkService.class);
651648
}
652649

653-
@VisibleForTesting
654-
public void initAmRegistry(ApplicationId appId, AMRegistry amRegistry, DAGClientServer dagClientServer) {
655-
dagClientServer.registerServiceListener((service) -> {
650+
protected void initClientRpcServer() {
651+
clientRpcServer = new DAGClientServer(clientHandler, appAttemptID, recoveryFS);
652+
addIfService(clientRpcServer, true);
653+
654+
initAmRegistryCallbackForRecordAdd();
655+
}
656+
657+
/**
658+
* Initializes an AM registry callback when the clientRpcServer is already initialized
659+
*/
660+
private void initAmRegistryCallbackForRecordAdd() {
661+
AMRegistry amRegistry = frameworkService.getAMRegistry(this.amConf);
662+
if (amRegistry == null) {
663+
return;
664+
}
665+
if (clientRpcServer == null){
666+
throw new IllegalStateException(
667+
"Client RPC Server has not been initialized before attempting to initialize an AM registry");
668+
}
669+
670+
ApplicationId appId = appAttemptID.getApplicationId();
671+
672+
clientRpcServer.registerServiceListener((service) -> {
656673
if (service.isInState(STATE.STARTED)) {
674+
InetSocketAddress rpcServerAddress = clientRpcServer.getBindAddress();
675+
657676
final String computeName = System.getenv(ZkConfig.COMPUTE_GROUP_NAME_ENV);
658-
AMRecord amRecord = amRegistry.createAmRecord(
659-
appId, dagClientServer.getBindAddress().getHostName(),
660-
dagClientServer.getBindAddress().getAddress().getHostAddress(),
661-
dagClientServer.getBindAddress().getPort(), computeName
662-
);
677+
AMRecord amRecord = amRegistry.createAmRecord(appId, rpcServerAddress.getHostName(),
678+
rpcServerAddress.getAddress().getHostAddress(), rpcServerAddress.getPort(), computeName);
663679
try {
664680
amRegistry.add(amRecord);
665681
LOG.info("Added AMRecord: {} to registry..", amRecord);
@@ -670,11 +686,6 @@ public void initAmRegistry(ApplicationId appId, AMRegistry amRegistry, DAGClient
670686
});
671687
}
672688

673-
protected void initClientRpcServer() {
674-
clientRpcServer = new DAGClientServer(clientHandler, appAttemptID, recoveryFS);
675-
addIfService(clientRpcServer, true);
676-
}
677-
678689
@VisibleForTesting
679690
protected DAGAppMasterShutdownHandler createShutdownHandler() {
680691
return new DAGAppMasterShutdownHandler();
@@ -1967,6 +1978,9 @@ void stopServices() {
19671978
firstException = ex;
19681979
}
19691980
}
1981+
1982+
Optional.ofNullable(frameworkService.getAMRegistry(this.amConf)).ifPresent(AMRegistry::close);
1983+
19701984
//after stopping all services, rethrow the first exception raised
19711985
if (firstException != null) {
19721986
throw ServiceStateException.convert(firstException);

tez-dag/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneServerFrameworkService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public synchronized AMRegistry getAMRegistry(Configuration conf) {
5959
try {
6060
amRegistry = new ZkAMRegistry(externalId);
6161
amRegistry.init(conf);
62+
amRegistry.start();
6263
LOG.info("Created Zookeeper based AM Registry with externalId: {}", externalId);
6364
} catch (Exception e) {
6465
throw new RuntimeException(e);

tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/zookeeper/TestZkAMRegistry.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public void teardown() throws Exception {
7878
public void testGenerateNewIdProducesUniqueIds() throws Exception {
7979
TezConfiguration conf = createTezConf();
8080
try (ZkAMRegistry registry = new ZkAMRegistry("external-id")) {
81-
registry.serviceInit(conf);
82-
registry.serviceStart();
81+
registry.init(conf);
82+
registry.start();
8383

8484
ApplicationId first = registry.generateNewId();
8585
ApplicationId second = registry.generateNewId();
@@ -100,8 +100,8 @@ public void testGenerateNewIdFromParallelThreads() throws Exception {
100100
conf.setInt(TezConfiguration.TEZ_AM_CURATOR_MAX_RETRIES, 29);
101101

102102
try (ZkAMRegistry registry = new ZkAMRegistry("external-id")) {
103-
registry.serviceInit(conf);
104-
registry.serviceStart();
103+
registry.init(conf);
104+
registry.start();
105105

106106
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
107107
CountDownLatch startLatch = new CountDownLatch(1);
@@ -157,8 +157,8 @@ public void testAddAndRemoveAmRecordUpdatesZooKeeper() throws Exception {
157157

158158
try (ZkAMRegistry registry = new ZkAMRegistry("external-id");
159159
CuratorFramework checkClient = zkConfig.createCuratorFramework()) {
160-
registry.serviceInit(conf);
161-
registry.serviceStart();
160+
registry.init(conf);
161+
registry.start();
162162

163163
checkClient.start();
164164

0 commit comments

Comments
 (0)