Skip to content

Commit 50a3c90

Browse files
committed
TEZ-4007: Introduce AmExtensions and Zookeeper-based FrameworkServices - checkstyle, spotbugs, javadoc improvements, refactor, test fixes
1 parent 1392ea9 commit 50a3c90

File tree

77 files changed

+1661
-1199
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1661
-1199
lines changed

pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<commons-io.version>2.16.0</commons-io.version>
7070
<commons-lang.version>2.6</commons-lang.version>
7171
<clover.license>${user.home}/clover.license</clover.license>
72-
<curator.version>2.7.1</curator.version>
72+
<curator.version>5.9.0</curator.version>
7373
<dependency-check-maven.version>3.2.0</dependency-check-maven.version>
7474
<dependency-maven-plugin.version>3.8.1</dependency-maven-plugin.version>
7575
<spotbugs.version>4.9.3</spotbugs.version>
@@ -749,6 +749,18 @@
749749
<type>test-jar</type>
750750
<scope>test</scope>
751751
</dependency>
752+
<dependency>
753+
<groupId>org.apache.curator</groupId>
754+
<artifactId>curator-test</artifactId>
755+
<version>${curator.version}</version>
756+
<scope>test</scope>
757+
<exclusions>
758+
<exclusion>
759+
<groupId>org.junit.jupiter</groupId>
760+
<artifactId>junit-jupiter-api</artifactId>
761+
</exclusion>
762+
</exclusions>
763+
</dependency>
752764
<dependency>
753765
<groupId>org.mockito</groupId>
754766
<artifactId>mockito-core</artifactId>

tez-api/findbugs-exclude.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,17 @@
151151
<Method name="getApplicationId" />
152152
<Bug pattern="EI_EXPOSE_REP" />
153153
</Match>
154+
155+
<!-- TEZ-4007 -->
156+
<Match>
157+
<Class name="org.apache.tez.client.registry.zookeeper.ZkFrameworkClient" />
158+
<Field name="amPort" />
159+
<Bug pattern="AT_STALE_THREAD_WRITE_OF_PRIMITIVE" />
160+
</Match>
161+
162+
<Match>
163+
<Class name="org.apache.tez.client.TezYarnClient" />
164+
<Method name="&lt;init&gt;" params="org.apache.hadoop.yarn.client.api.YarnClient" returns="void" />
165+
<Bug pattern="EI_EXPOSE_REP2" />
166+
</Match>
154167
</FindBugsFilter>

tez-api/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@
126126
<dependency>
127127
<groupId>org.apache.curator</groupId>
128128
<artifactId>curator-test</artifactId>
129-
<version>${curator.version}</version>
130129
<scope>test</scope>
131130
</dependency>
132131
</dependencies>

tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
package org.apache.tez.client;
2020

2121
import java.io.IOException;
22-
import java.util.Optional;
2322

2423
import org.apache.hadoop.classification.InterfaceAudience.Private;
2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.security.UserGroupInformation;
2726
import org.apache.hadoop.yarn.api.records.ApplicationId;
2827
import org.apache.hadoop.yarn.api.records.ApplicationReport;
2928
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
30-
import org.apache.hadoop.yarn.client.api.YarnClient;
3129
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
3230
import org.apache.hadoop.yarn.exceptions.YarnException;
3331
import org.apache.tez.common.RPCUtil;
@@ -49,6 +47,7 @@
4947
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
5048
import org.apache.tez.frameworkplugins.ClientFrameworkService;
5149
import org.apache.tez.frameworkplugins.FrameworkUtils;
50+
import org.apache.tez.frameworkplugins.yarn.YarnClientFrameworkService;
5251

5352
import com.google.protobuf.ServiceException;
5453

@@ -60,21 +59,18 @@ public abstract class FrameworkClient {
6059
protected static final Logger LOG = LoggerFactory.getLogger(FrameworkClient.class);
6160

6261
public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) {
63-
Optional<FrameworkClient> pluginClient =
64-
FrameworkUtils.get(ClientFrameworkService.class, tezConf)
65-
.flatMap(framework -> framework.createOrGetFrameworkClient(tezConf));
66-
6762
boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
6863
if (isLocal) {
6964
try {
7065
return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
7166
} catch (TezReflectionException e) {
7267
throw new TezUncheckedException("Fail to create LocalClient", e);
7368
}
74-
} else if (pluginClient.isPresent()) {
75-
return pluginClient.get();
69+
} else {
70+
ClientFrameworkService clientFrameworkService = FrameworkUtils.get(ClientFrameworkService.class, tezConf);
71+
return clientFrameworkService == null ? new YarnClientFrameworkService().newFrameworkClient()
72+
: clientFrameworkService.newFrameworkClient();
7673
}
77-
return new TezYarnClient(YarnClient.createYarnClient());
7874
}
7975

8076
/**

tez-api/src/main/java/org/apache/tez/client/TezClient.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -672,8 +672,6 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
672672
}
673673

674674
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
675-
676-
677675
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
678676
usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
679677

@@ -688,7 +686,6 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
688686

689687
// if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS
690688
SubmitDAGRequestProto request = requestBuilder.build();
691-
692689
if (request.getSerializedSize() > maxSubmitDAGRequestSizeThroughIPC) {
693690
Path dagPlanPath = new Path(TezCommonUtils.getTezSystemStagingPath(amConfig.getTezConfiguration(),
694691
sessionAppId.toString()), TezConstants.TEZ_PB_PLAN_BINARY_NAME +

tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,6 @@ private static Path getPath(String configUri) {
149149
}
150150
}
151151

152-
153-
154152
/**
155153
* Setup LocalResource map for Tez jars based on provided Configuration
156154
*
@@ -188,7 +186,7 @@ static boolean setupTezJarsLocalResources(TezConfiguration conf,
188186
+ conf.get(TezConfiguration.TEZ_LIB_URIS_CLASSPATH));
189187

190188
usingTezArchive = addLocalResources(conf, tezJarUris,
191-
tezJarResources, credentials);
189+
tezJarResources, credentials);
192190

193191
if (tezJarResources.isEmpty()) {
194192
throw new TezUncheckedException(
@@ -265,8 +263,8 @@ private static boolean addLocalResources(Configuration conf,
265263

266264
// Add URI fragment or just the filename
267265
Path name = new Path((null == u.getFragment())
268-
? p.getName()
269-
: u.getFragment());
266+
? p.getName()
267+
: u.getFragment());
270268
if (name.isAbsolute()) {
271269
throw new IllegalArgumentException("Resource name must be "
272270
+ "relative, not absolute: " + name
@@ -579,7 +577,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
579577
// don't overwrite existing conf, needed for TezClient.getClient() so existing containers have stable resource fingerprints
580578
if(!binaryConfPath.getFileSystem(tezConf).exists(binaryConfPath)) {
581579
ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
582-
servicePluginsDescriptor);
580+
servicePluginsDescriptor);
583581

584582
FSDataOutputStream amConfPBOutBinaryStream = null;
585583
try {
@@ -620,12 +618,12 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
620618
}
621619

622620
LocalResource sessionJarsPBLRsrc =
623-
TezClientUtils.createLocalResource(fs,
624-
sessionJarsPath, LocalResourceType.FILE,
625-
LocalResourceVisibility.APPLICATION);
621+
TezClientUtils.createLocalResource(fs,
622+
sessionJarsPath, LocalResourceType.FILE,
623+
LocalResourceVisibility.APPLICATION);
626624
amLocalResources.put(
627-
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME,
628-
sessionJarsPBLRsrc);
625+
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME,
626+
sessionJarsPBLRsrc);
629627

630628
String user = UserGroupInformation.getCurrentUser().getShortUserName();
631629
ACLManager aclManager = new ACLManager(user, amConfig.getTezConfiguration());
@@ -657,9 +655,9 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
657655
}
658656

659657
amLocalResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME,
660-
TezClientUtils.createLocalResource(fs,
661-
binaryPath, LocalResourceType.FILE,
662-
LocalResourceVisibility.APPLICATION));
658+
TezClientUtils.createLocalResource(fs,
659+
binaryPath, LocalResourceType.FILE,
660+
LocalResourceVisibility.APPLICATION));
663661

664662
if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
665663
Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
@@ -690,7 +688,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
690688

691689
Collection<String> tagsFromConf =
692690
amConfig.getTezConfiguration().getTrimmedStringCollection(
693-
TezConfiguration.TEZ_APPLICATION_TAGS);
691+
TezConfiguration.TEZ_APPLICATION_TAGS);
694692

695693
appContext.setApplicationType(TezConstants.TEZ_APPLICATION_TYPE);
696694
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
@@ -711,8 +709,8 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
711709
appContext.setAMContainerSpec(amContainer);
712710

713711
appContext.setMaxAppAttempts(
714-
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
715-
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
712+
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
713+
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
716714

717715
return appContext;
718716

@@ -850,7 +848,7 @@ public static void addLog4jSystemProperties(String logLevel,
850848
}
851849

852850
public static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
853-
ServicePluginsDescriptor servicePluginsDescriptor) {
851+
ServicePluginsDescriptor servicePluginsDescriptor) {
854852
assert amConf != null;
855853
ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
856854
for (Entry<String, String> entry : amConf) {
@@ -951,7 +949,7 @@ static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient frameworkClient,
951949
+ ", trackingUrl=" + appReport.getTrackingUrl()
952950
+ ", diagnostics="
953951
+ (appReport.getDiagnostics() != null ? appReport.getDiagnostics()
954-
: TezClient.NO_CLUSTER_DIAGNOSTICS_MSG);
952+
: TezClient.NO_CLUSTER_DIAGNOSTICS_MSG);
955953
LOG.info(msg);
956954
throw new SessionNotRunning(msg);
957955
}
@@ -1017,7 +1015,7 @@ static void createSessionToken(String tokenIdentifier,
10171015
public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource resource,
10181016
double maxHeapFactor) {
10191017
if ((javaOpts != null && !javaOpts.isEmpty()
1020-
&& (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")))
1018+
&& (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")))
10211019
|| (resource.getMemory() <= 0)) {
10221020
return javaOpts;
10231021
}
@@ -1028,8 +1026,8 @@ public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource res
10281026

10291027
if (Double.parseDouble("-1") == maxHeapFactor) {
10301028
maxHeapFactor = resource.getMemory() < TezConstants.TEZ_CONTAINER_SMALL_SLAB_BOUND_MB
1031-
? TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_SMALL_SLAB
1032-
: TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_LARGE_SLAB;
1029+
? TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_SMALL_SLAB
1030+
: TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_LARGE_SLAB;
10331031
}
10341032
int maxMemory = (int)(resource.getMemory() * maxHeapFactor);
10351033
maxMemory = maxMemory <= 0 ? 1 : maxMemory;
@@ -1039,7 +1037,7 @@ public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource res
10391037
}
10401038

10411039
private static boolean checkAncestorPermissionsForAllUsers(Configuration conf, Path pathComponent,
1042-
FsAction permission) throws IOException {
1040+
FsAction permission) throws IOException {
10431041
FileSystem fs = pathComponent.getFileSystem(conf);
10441042

10451043
if (Shell.WINDOWS && fs instanceof LocalFileSystem) {

tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class TezYarnClient extends FrameworkClient {
4343
private String amHost;
4444
private int amPort;
4545

46-
protected TezYarnClient(YarnClient yarnClient) {
46+
public TezYarnClient(YarnClient yarnClient) {
4747
this.yarnClient = yarnClient;
4848
}
4949

tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ public ServiceRecord toServiceRecord() {
156156
return serviceRecord;
157157
}
158158

159+
@Override
160+
public String toString() {
161+
return toServiceRecord().attributes().toString();
162+
}
163+
159164
@Override
160165
public int hashCode() {
161166
return Objects.hash(appId, host, port, externalId);

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistry.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23-
import java.util.Optional;
2423

2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.service.AbstractService;
@@ -31,19 +30,28 @@
3130
import org.slf4j.LoggerFactory;
3231

3332
/**
34-
* Base class for AMRegistry implementation
35-
* Implementation class is configured by tez.am.registry.class
36-
* Implementations should implement relevant service lifecycle operations:
37-
* init, serviceStart, serviceStop, etc..
33+
* Base class for {@code AMRegistry} implementations.
3834
*
39-
* init/serviceStart will be invoked during DAGAppMaster.serviceInit
35+
* <p>The specific implementation is configured via the
36+
* {@code tez.am.registry.class} property.</p>
4037
*
41-
* serviceStop will invoked on DAGAppMaster shutdown
38+
* <p>Implementations are expected to provide appropriate service lifecycle
39+
* behavior, including:
40+
* <ul>
41+
* <li>{@code init}</li>
42+
* <li>{@code serviceStart}</li>
43+
* <li>{@code serviceStop}</li>
44+
* </ul>
45+
* </p>
46+
*
47+
* <p>{@code init} and {@code serviceStart} are invoked during
48+
* {@code DAGAppMaster.serviceInit()}, while {@code serviceStop} is called
49+
* when {@code DAGAppMaster} shuts down.</p>
4250
*/
4351
public abstract class AMRegistry extends AbstractService {
4452

4553
private static final Logger LOG = LoggerFactory.getLogger(AMRegistry.class);
46-
protected List<AMRecord> amRecords = new ArrayList<>();
54+
private List<AMRecord> amRecords = new ArrayList<>();
4755

4856
@Override
4957
public void init(Configuration conf) {
@@ -79,13 +87,14 @@ public void add(AMRecord server) throws Exception {
7987

8088
public abstract void remove(AMRecord server) throws Exception;
8189

82-
public Optional<ApplicationId> generateNewId() throws Exception {
83-
return Optional.empty();
90+
public ApplicationId generateNewId() throws Exception {
91+
return null;
8492
}
8593

8694
public abstract AMRecord createAmRecord(ApplicationId appId, String hostName, int port);
8795

88-
@Override public void serviceStop() throws Exception {
96+
@Override
97+
public void serviceStop() throws Exception {
8998
List<AMRecord> records = new ArrayList<>(amRecords);
9099
for(AMRecord record : records) {
91100
remove(record);

0 commit comments

Comments
 (0)