Skip to content

Commit 1b01383

Browse files
committed
PR comments
1 parent b3ca9d5 commit 1b01383

File tree

24 files changed

+984
-243
lines changed

24 files changed

+984
-243
lines changed

tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) {
6767
throw new TezUncheckedException("Fail to create LocalClient", e);
6868
}
6969
} else {
70-
ClientFrameworkService clientFrameworkService = FrameworkUtils.get(ClientFrameworkService.class, tezConf);
71-
return clientFrameworkService == null ? new YarnClientFrameworkService().newFrameworkClient()
72-
: clientFrameworkService.newFrameworkClient();
70+
ClientFrameworkService clientFrameworkService = FrameworkUtils.get(ClientFrameworkService.class, tezConf,
71+
YarnClientFrameworkService.class);
72+
return clientFrameworkService.newFrameworkClient();
7373
}
7474
}
7575

tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.tez.client.registry;
2020

2121
import java.util.Objects;
22+
import java.util.Optional;
2223

2324
import org.apache.hadoop.classification.InterfaceAudience;
2425
import org.apache.hadoop.registry.client.types.ServiceRecord;
@@ -51,6 +52,8 @@ public class AMRecord {
5152
private final String externalId;
5253
private final String computeName;
5354

55+
private ServiceRecord serviceRecord;
56+
5457
/**
5558
* Creates a new {@code AMRecord} with the given application ID, host, port, and identifier.
5659
* <p>
@@ -74,8 +77,8 @@ public AMRecord(ApplicationId appId, String hostName, String hostIp, int port, S
7477
this.hostIp = hostIp;
7578
this.port = port;
7679
//externalId is optional, if not provided, convert to empty string
77-
this.externalId = (externalId == null) ? "" : externalId;
78-
this.computeName = (computeName == null) ? ZkConfig.DEFAULT_COMPUTE_GROUP_NAME : computeName;
80+
this.externalId = Optional.ofNullable(externalId).orElse("");
81+
this.computeName = Optional.ofNullable(computeName).orElse(ZkConfig.DEFAULT_COMPUTE_GROUP_NAME);
7982
}
8083

8184
/**
@@ -89,12 +92,15 @@ public AMRecord(ApplicationId appId, String hostName, String hostIp, int port, S
8992
* @param other the {@code AMRecord} instance to copy
9093
*/
9194
public AMRecord(AMRecord other) {
92-
this.appId = other.getApplicationId();
93-
this.hostName = other.getHost();
94-
this.hostIp = other.getHostIp();
95-
this.port = other.getPort();
96-
this.externalId = other.getExternalId();
97-
this.computeName = other.getComputeName();
95+
this.appId = other.appId;
96+
this.hostName = other.hostName;
97+
this.hostIp = other.hostIp;
98+
this.port = other.port;
99+
this.externalId = other.externalId;
100+
this.computeName = other.computeName;
101+
// all fields are final immutable, we can copy the serviceRecord,
102+
// if it's initialized there already, as it won't change
103+
this.serviceRecord = other.serviceRecord;
98104
}
99105

100106
/**
@@ -121,10 +127,6 @@ public ApplicationId getApplicationId() {
121127
return appId;
122128
}
123129

124-
public String getHost() {
125-
return hostName;
126-
}
127-
128130
public String getHostName() {
129131
return hostName;
130132
}
@@ -150,8 +152,7 @@ public boolean equals(Object other) {
150152
if (this == other) {
151153
return true;
152154
}
153-
if (other instanceof AMRecord) {
154-
AMRecord otherRecord = (AMRecord) other;
155+
if (other instanceof AMRecord otherRecord) {
155156
return appId.equals(otherRecord.appId)
156157
&& hostName.equals(otherRecord.hostName)
157158
&& hostIp.equals(otherRecord.hostIp)
@@ -178,13 +179,17 @@ public boolean equals(Object other) {
178179
* @return a {@link ServiceRecord} populated with the values of this {@code AMRecord}
179180
*/
180181
public ServiceRecord toServiceRecord() {
181-
ServiceRecord serviceRecord = new ServiceRecord();
182+
if (serviceRecord != null) {
183+
return serviceRecord;
184+
}
185+
serviceRecord = new ServiceRecord();
182186
serviceRecord.set(APP_ID_RECORD_KEY, appId);
183187
serviceRecord.set(HOST_NAME_RECORD_KEY, hostName);
184188
serviceRecord.set(HOST_IP_RECORD_KEY, hostIp);
185189
serviceRecord.set(PORT_RECORD_KEY, port);
186190
serviceRecord.set(EXTERNAL_ID_KEY, externalId);
187191
serviceRecord.set(COMPUTE_GROUP_NAME_KEY, computeName);
192+
188193
return serviceRecord;
189194
}
190195

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistry.java

Lines changed: 2 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,10 @@
1818

1919
package org.apache.tez.client.registry;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
2321

24-
import org.apache.hadoop.conf.Configuration;
2522
import org.apache.hadoop.service.AbstractService;
26-
import org.apache.hadoop.service.ServiceStateException;
2723
import org.apache.hadoop.yarn.api.records.ApplicationId;
2824

29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3125

3226
/**
3327
* Base class for {@code AMRegistry} implementations.
@@ -43,62 +37,19 @@
4337
* <li>{@code serviceStop}</li>
4438
* </ul>
4539
* </p>
46-
*
47-
* <p>{@code init} and {@code serviceStart} are invoked during
48-
* {@code DAGAppMaster.serviceInit()}, while {@code serviceStop} is called
49-
* when {@code DAGAppMaster} shuts down.</p>
5040
*/
5141
public abstract class AMRegistry extends AbstractService {
52-
53-
private static final Logger LOG = LoggerFactory.getLogger(AMRegistry.class);
54-
private List<AMRecord> amRecords = new ArrayList<>();
55-
56-
@Override
57-
public void init(Configuration conf) {
58-
try {
59-
this.serviceInit(conf);
60-
} catch (Exception e) {
61-
LOG.error("Failed to init AMRegistry: name={}, type={}", getName(), getClass().getName());
62-
throw ServiceStateException.convert(e);
63-
}
64-
}
65-
66-
@Override
67-
public void start() {
68-
try {
69-
this.serviceStart();
70-
} catch(Exception e) {
71-
LOG.error("Failed to start AMRegistry: name={}, type={}", getName(), getClass().getName());
72-
throw ServiceStateException.convert(e);
73-
}
74-
}
75-
7642
/* Implementations should provide a public no-arg constructor */
7743
protected AMRegistry(String name) {
7844
super(name);
7945
}
8046

81-
/* Under typical usage, add will be called once automatically with an AMRecord
82-
for the DAGClientServer servicing an AM
83-
*/
84-
public void add(AMRecord server) throws Exception {
85-
amRecords.add(server);
86-
}
47+
public abstract void add(AMRecord server) throws Exception;
8748

8849
public abstract void remove(AMRecord server) throws Exception;
8950

90-
public ApplicationId generateNewId() throws Exception {
91-
return null;
92-
}
51+
public abstract ApplicationId generateNewId() throws Exception;
9352

9453
public abstract AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port,
9554
String computeName);
96-
97-
@Override
98-
public void serviceStop() throws Exception {
99-
List<AMRecord> records = new ArrayList<>(amRecords);
100-
for(AMRecord record : records) {
101-
remove(record);
102-
}
103-
}
10455
}

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClient.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525

26+
import org.apache.hadoop.yarn.api.records.ApplicationId;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
2631
/**
2732
* Client-side interface for discovering Application Master (AM) instances
2833
* registered in the AM registry.
@@ -35,26 +40,18 @@
3540
* appear or are removed.</p>
3641
*/
3742
public abstract class AMRegistryClient implements Closeable {
43+
private static final Logger LOG = LoggerFactory.getLogger(AMRegistryClient.class);
3844

3945
private final List<AMRegistryClientListener> listeners = new ArrayList<>();
4046

41-
/**
42-
* Returns the current set of registered listeners.
43-
*
44-
* @return a mutable list of listeners
45-
*/
46-
protected List<AMRegistryClientListener> getListeners() {
47-
return listeners;
48-
}
49-
5047
/**
5148
* Lookup AM metadata for the given application ID.
5249
*
5350
* @param appId the application ID
5451
* @return the AM record if found, otherwise {@code null}
5552
* @throws IOException if the lookup fails
5653
*/
57-
public abstract AMRecord getRecord(String appId) throws IOException;
54+
public abstract AMRecord getRecord(ApplicationId appId) throws IOException;
5855

5956
/**
6057
* Retrieve all AM records known in the registry.
@@ -81,7 +78,26 @@ public synchronized void addListener(AMRegistryClientListener listener) {
8178
*/
8279
protected synchronized void notifyOnAdded(AMRecord record) {
8380
for (AMRegistryClientListener listener : listeners) {
84-
listener.onAdd(record);
81+
try {
82+
listener.onAdd(record);
83+
} catch (Exception e) {
84+
LOG.warn("Exception while calling AM add listener, AM record {}", record, e);
85+
}
86+
}
87+
}
88+
89+
/**
90+
* Notify listeners of an updated AM record.
91+
*
92+
* @param record the updated AM record
93+
*/
94+
protected synchronized void notifyOnUpdated(AMRecord record) {
95+
for (AMRegistryClientListener listener : listeners) {
96+
try {
97+
listener.onUpdate(record);
98+
} catch (Exception e) {
99+
LOG.warn("Exception while calling AM update listener, AM record {}", record, e);
100+
}
85101
}
86102
}
87103

@@ -92,7 +108,11 @@ protected synchronized void notifyOnAdded(AMRecord record) {
92108
*/
93109
protected synchronized void notifyOnRemoved(AMRecord record) {
94110
for (AMRegistryClientListener listener : listeners) {
95-
listener.onRemove(record);
111+
try {
112+
listener.onRemove(record);
113+
} catch (Exception e) {
114+
LOG.warn("Exception while calling AM remove listener, AM record {}", record, e);
115+
}
96116
}
97117
}
98118
}

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClientListener.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@
1919
package org.apache.tez.client.registry;
2020

2121
public interface AMRegistryClientListener {
22+
2223
void onAdd(AMRecord record);
24+
25+
/**
26+
* Default implementation of {@code onUpdate} delegates to {@code onAdd}.
27+
*
28+
* <p>This provides a convenient backward-compatible behavior for consumers that
29+
* store {@link AMRecord} instances in collections keyed by something stable
30+
* (such as {@link ApplicationId}). In such cases, re-adding an {@link AMRecord}
31+
* effectively overwrites the previous entry, making an explicit update handler
32+
* unnecessary for many implementations.</p>
33+
*
34+
* @param record the updated {@link AMRecord} instance
35+
*/
36+
default void onUpdate(AMRecord record){
37+
onAdd(record);
38+
}
39+
2340
void onRemove(AMRecord record);
2441
}

tez-dag/src/main/java/org/apache/tez/frameworkplugins/TaskResourceException.java renamed to tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryUtils.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,23 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.tez.frameworkplugins;
18+
package org.apache.tez.client.registry;
1919

20-
public class TaskResourceException extends Exception {
21-
public TaskResourceException(String msg) {
22-
super(msg);
20+
import java.io.IOException;
21+
22+
import org.apache.hadoop.registry.client.binding.RegistryUtils;
23+
import org.apache.hadoop.registry.client.types.ServiceRecord;
24+
25+
public class AMRegistryUtils {
26+
27+
public static AMRecord jsonStringToRecord(String json) throws IOException {
28+
RegistryUtils.ServiceRecordMarshal marshal = new RegistryUtils.ServiceRecordMarshal();
29+
ServiceRecord serviceRecord = marshal.fromJson(json);
30+
return new AMRecord(serviceRecord);
31+
}
32+
33+
public static String recordToJsonString(AMRecord amRecord) throws IOException {
34+
RegistryUtils.ServiceRecordMarshal marshal = new RegistryUtils.ServiceRecordMarshal();
35+
return marshal.toJson(amRecord.toServiceRecord());
2336
}
2437
}

0 commit comments

Comments
 (0)