diff --git a/pom.xml b/pom.xml
index 0e6955030c..1369e75b24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,7 @@
The specific implementation is configured via the
+ * {@code tez.am.registry.class} property. Implementations are expected to provide appropriate service lifecycle
+ * behavior, including:
+ *
+ *
+ *
Implementations are responsible for locating AM endpoints and returning + * their metadata. This API is used by client components to discover running + * Tez AMs.
+ * + *Listeners may be registered to receive notifications when AM records + * appear or are removed.
+ */ +public abstract class AMRegistryClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(AMRegistryClient.class); + + private final List+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + +public interface AMRegistryClientListener { + + void onAdd(AMRecord record); + + /** + * Default implementation of {@code onUpdate} delegates to {@code onAdd}. + * + *
This provides a convenient backward-compatible behavior for consumers that + * store {@link AMRecord} instances in collections keyed by something stable + * (such as ApplicationId). In such cases, re-adding an {@link AMRecord} + * effectively overwrites the previous entry, making an explicit update handler + * unnecessary for many implementations.
+ * + * @param record the updated {@link AMRecord} instance + */ + default void onUpdate(AMRecord record){ + onAdd(record); + } + + void onRemove(AMRecord record); +} diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryUtils.java b/tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryUtils.java new file mode 100644 index 0000000000..79b372c683 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.client.registry; + +import java.io.IOException; + +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.types.ServiceRecord; + +public final class AMRegistryUtils { + + private AMRegistryUtils() {} + + public static AMRecord jsonStringToRecord(String json) throws IOException { + RegistryUtils.ServiceRecordMarshal marshal = new RegistryUtils.ServiceRecordMarshal(); + ServiceRecord serviceRecord = marshal.fromJson(json); + return new AMRecord(serviceRecord); + } + + public static String recordToJsonString(AMRecord amRecord) throws IOException { + RegistryUtils.ServiceRecordMarshal marshal = new RegistryUtils.ServiceRecordMarshal(); + return marshal.toJson(amRecord.toServiceRecord()); + } +} diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java new file mode 100644 index 0000000000..7e189490da --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client.registry.zookeeper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.registry.AMRecord;
+import org.apache.tez.client.registry.AMRegistryClient;
+import org.apache.tez.client.registry.AMRegistryUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Curator/Zookeeper implementation of {@link AMRegistryClient}.
+ */
+@InterfaceAudience.Public
+public final class ZkAMRegistryClient extends AMRegistryClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistryClient.class);
+ private static final Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client.registry.zookeeper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.client.registry.AMRecord;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ZkFrameworkClient extends FrameworkClient {
+
+ private AMRecord amRecord;
+ private ZkAMRegistryClient amRegistryClient = null;
+ private volatile boolean isRunning = false;
+ private String amHost;
+ private int amPort;
+
+ @Override
+ public synchronized void init(TezConfiguration tezConf) {
+ if (amRegistryClient == null) {
+ try {
+ amRegistryClient = ZkAMRegistryClient.getClient(tezConf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ try {
+ amRegistryClient.start();
+ isRunning = true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ isRunning = false;
+ close();
+ }
+
+ @Override
+ public void close() {
+ if (amRegistryClient != null) {
+ amRegistryClient.close();
+ }
+ }
+
+ /**
+ * Creates a dummy {@link YarnClientApplication} using a pre-existing {@link ApplicationId}
+ * rather than requesting a new one from the ResourceManager.
+ *
+ * Note: This is a dummy, backward-compatibility implementation.
+ * Instead of allocating a fresh application ID from the ResourceManager, this method
+ * reuses the {@code applicationId} already obtained via {@code getApplicationReport()}.
+ * This allows legacy code paths to continue operating without requiring actual
+ * creation of a new application. Hidden assumption here: this method assumes that
+ * {@code getApplicationReport()} has already been called before
+ * {@code createApplication()}, ensuring that {@code amRecord.getApplicationId()}
+ * is always available. This assumption holds in all supported usage patterns:
+ * the only code path where {@code createApplication()} might be called first is
+ * {@code TezClient.submitDAGApplication()}, but that path is never exercised in
+ * Zookeeper standalone mode because that mode assumes applications are already
+ * running. Therefore, the ordering guarantee is valid in practice.
+ * The method constructs a minimal {@link ApplicationSubmissionContext} and a
+ * synthetic {@link GetNewApplicationResponse}, both populated with the already
+ * known application ID. These objects are then wrapped into a
+ * {@link YarnClientApplication} instance and returned.
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins;
+
+
+import org.apache.tez.client.FrameworkClient;
+
+/**
+ * A {@code FrameworkService} that runs within the client process using {@code TezClient}.
+ *
+ * This service bundles together a compatible {@code FrameworkClient} and
+ * {@code AMRegistryClient} to enable communication and coordination with the
+ * Application Master. Implementations must provide a {@link FrameworkClient} instance that will
+ * be used by the Tez client layer.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins;
+
+/*
+ A FrameworkMode is a pair of classes implementing (ClientFrameworkService, ServerFrameworkService)
+ Clients using one FrameworkMode should only connect to AMs using the same FrameworkMode
+ It is the responsibility of the user to setup their environment/configs to ensure matching modes
+ e.g. a client using a mode that requires a Zookeeper-based registry should not be configured
+ to interact with AMs that do not keep a Zookeeper-based registry
+ */
+public enum FrameworkMode {
+
+ STANDALONE_ZOOKEEPER(
+ "org.apache.tez.frameworkplugins.zookeeper.ZkStandaloneClientFrameworkService",
+ "org.apache.tez.frameworkplugins.zookeeper.ZkStandaloneServerFrameworkService"),
+
+ YARN(
+ "org.apache.tez.frameworkplugins.yarn.YarnClientFrameworkService",
+ "org.apache.tez.frameworkplugins.yarn.YarnServerFrameworkService");
+
+ private final String clientClassName;
+ private final String serverClassName;
+
+ FrameworkMode(String clientClassName, String serverClassName) {
+ this.clientClassName = clientClassName;
+ this.serverClassName = serverClassName;
+ }
+
+ public String getClientClassName() {
+ return clientClassName;
+ }
+
+ public String getServerClassName() {
+ return serverClassName;
+ }
+}
diff --git a/tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkService.java b/tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkService.java
new file mode 100644
index 0000000000..f27daf6577
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins;
+
+/**
+ * Marker interface for framework-level services in Tez.
+ *
+ * This interface is extended by ClientFrameworkService and ServerFrameworkService
+ * to represent client-side and server-side framework service implementations, respectively.
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins.yarn;
+
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.client.TezYarnClient;
+import org.apache.tez.frameworkplugins.ClientFrameworkService;
+
+/**
+ * YARN-based client framework service implementation.
+ * Provides default YARN framework client functionality.
+ */
+public class YarnClientFrameworkService implements ClientFrameworkService {
+
+ @Override
+ public FrameworkClient newFrameworkClient() {
+ return new TezYarnClient(YarnClient.createYarnClient());
+ }
+}
diff --git a/tez-api/src/main/java/org/apache/tez/frameworkplugins/yarn/package-info.java b/tez-api/src/main/java/org/apache/tez/frameworkplugins/yarn/package-info.java
new file mode 100644
index 0000000000..ffc5b78fb9
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/frameworkplugins/yarn/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Public
+@Evolving
+package org.apache.tez.frameworkplugins.yarn;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
diff --git a/tez-api/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneClientFrameworkService.java b/tez-api/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneClientFrameworkService.java
new file mode 100644
index 0000000000..43b11140cc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneClientFrameworkService.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins.zookeeper;
+
+
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.client.registry.zookeeper.ZkFrameworkClient;
+import org.apache.tez.frameworkplugins.ClientFrameworkService;
+
+public class ZkStandaloneClientFrameworkService implements ClientFrameworkService {
+ @Override public FrameworkClient newFrameworkClient() {
+ return new ZkFrameworkClient();
+ }
+}
diff --git a/tez-api/src/main/java/org/apache/tez/frameworkplugins/zookeeper/package-info.java b/tez-api/src/main/java/org/apache/tez/frameworkplugins/zookeeper/package-info.java
new file mode 100644
index 0000000000..3acc21feae
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/frameworkplugins/zookeeper/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Public
+@Evolving
+package org.apache.tez.frameworkplugins.zookeeper;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 46a972c1d6..9d004e8303 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -221,6 +221,16 @@ public void testTezClientSession() throws Exception {
testTezClient(true, true, "testTezClientSession");
}
+ @Test (timeout = 5000)
+ public void testTezClientReconnect() throws Exception {
+ testTezClientReconnect(true);
+ }
+
+ @Test (timeout = 5000, expected = IllegalStateException.class)
+ public void testTezClientReconnectNoSession() throws Exception {
+ testTezClientReconnect(false);
+ }
+
@Test (timeout = 5000)
public void testTezClientSessionLargeDAGPlan() throws Exception {
// request size is within threshold of being serialized
@@ -387,18 +397,18 @@ public TezClientForTest testTezClient(boolean isSession, boolean shouldStop, Str
assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
lrName1));
}
-
+
// add resources
String lrName2 = "LR2";
lrs.clear();
lrs.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test2"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
client.addAppMasterLocalFiles(lrs);
-
+
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
when(client.mockYarnClient.createApplication().getNewApplicationResponse().getApplicationId())
.thenReturn(appId2);
-
+
when(client.mockYarnClient.getApplicationReport(appId2).getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
dag = DAG.create("DAG-2-" + dagName).addVertex(
@@ -447,6 +457,97 @@ public TezClientForTest testTezClient(boolean isSession, boolean shouldStop, Str
return client;
}
+ public void testTezClientReconnect(boolean isSession) throws Exception {
+ //Setup 1
+ Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.client.registry;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.registry.zookeeper.ZkConfig;
+
+import org.junit.Test;
+
+public class TestAMRecord {
+
+ @Test
+ public void testConstructorWithAllParameters() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+ String externalId = "external-123";
+ String computeName = "test-compute";
+
+ AMRecord record = new AMRecord(appId, hostName, hostIp, port, externalId, computeName);
+
+ assertEquals(appId, record.getApplicationId());
+ assertEquals(hostName, record.getHostName());
+ assertEquals(hostName, record.getHostName());
+ assertEquals(hostIp, record.getHostIp());
+ assertEquals(port, record.getPort());
+ assertEquals(externalId, record.getExternalId());
+ assertEquals(computeName, record.getComputeName());
+ }
+
+ @Test
+ public void testConstructorWithNullExternalIdAndComputeName() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+
+ AMRecord record = new AMRecord(appId, hostName, hostIp, port, null, null);
+
+ assertEquals("", record.getExternalId());
+ assertEquals(ZkConfig.DEFAULT_COMPUTE_GROUP_NAME, record.getComputeName());
+ }
+
+ @Test
+ public void testCopyConstructor() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+ String externalId = "external-123";
+ String computeName = "test-compute";
+
+ AMRecord original = new AMRecord(appId, hostName, hostIp, port, externalId, computeName);
+ AMRecord copy = new AMRecord(original);
+
+ assertEquals(original.getApplicationId(), copy.getApplicationId());
+ assertEquals(original.getHostName(), copy.getHostName());
+ assertEquals(original.getHostIp(), copy.getHostIp());
+ assertEquals(original.getPort(), copy.getPort());
+ assertEquals(original.getExternalId(), copy.getExternalId());
+ assertEquals(original.getComputeName(), copy.getComputeName());
+ assertEquals(original, copy);
+ assertEquals(original.hashCode(), copy.hashCode());
+ }
+
+ @Test
+ public void testConstructorFromServiceRecord() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+ String externalId = "external-123";
+ String computeName = "test-compute";
+
+ AMRecord original = new AMRecord(appId, hostName, hostIp, port, externalId, computeName);
+ ServiceRecord serviceRecord = original.toServiceRecord();
+ AMRecord reconstructed = new AMRecord(serviceRecord);
+
+ assertEquals(original.getApplicationId(), reconstructed.getApplicationId());
+ assertEquals(original.getHostName(), reconstructed.getHostName());
+ assertEquals(original.getHostIp(), reconstructed.getHostIp());
+ assertEquals(original.getPort(), reconstructed.getPort());
+ assertEquals(original.getExternalId(), reconstructed.getExternalId());
+ assertEquals(original.getComputeName(), reconstructed.getComputeName());
+ assertEquals(original, reconstructed);
+ }
+
+ @Test
+ public void testConstructorFromServiceRecordWithNullDefaults() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+
+ // Create record with null externalId and computeName
+ AMRecord original = new AMRecord(appId, hostName, hostIp, port, null, null);
+
+ // Convert to ServiceRecord and back
+ ServiceRecord serviceRecord = original.toServiceRecord();
+ AMRecord reconstructed = new AMRecord(serviceRecord);
+
+ // Verify defaults are preserved
+ assertEquals("", reconstructed.getExternalId());
+ assertEquals(ZkConfig.DEFAULT_COMPUTE_GROUP_NAME, reconstructed.getComputeName());
+ assertEquals(original, reconstructed);
+ }
+
+ @Test
+ public void testToServiceRecord() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+ String externalId = "external-123";
+ String computeName = "test-compute";
+
+ AMRecord record = new AMRecord(appId, hostName, hostIp, port, externalId, computeName);
+ ServiceRecord serviceRecord = record.toServiceRecord();
+
+ assertNotNull(serviceRecord);
+ assertEquals(appId.toString(), serviceRecord.get("appId"));
+ assertEquals(hostName, serviceRecord.get("hostName"));
+ assertEquals(hostIp, serviceRecord.get("hostIp"));
+ assertEquals(String.valueOf(port), serviceRecord.get("port"));
+ assertEquals(externalId, serviceRecord.get("externalId"));
+ assertEquals(computeName, serviceRecord.get("computeName"));
+ }
+
+ @Test
+ public void testToServiceRecordCaching() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+
+ AMRecord record = new AMRecord(appId, hostName, hostIp, port, "external-123", "test-compute");
+ ServiceRecord serviceRecord1 = record.toServiceRecord();
+ ServiceRecord serviceRecord2 = record.toServiceRecord();
+
+ // Should return the same cached instance
+ assertSame(serviceRecord1, serviceRecord2);
+ }
+
+ @Test
+ public void testEquals() {
+ ApplicationId appId1 = ApplicationId.newInstance(12345L, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(12345L, 1);
+ ApplicationId appId3 = ApplicationId.newInstance(12345L, 2);
+
+ AMRecord record1 = new AMRecord(appId1, "host1", "192.168.1.1", 8080, "ext1", "compute1");
+ AMRecord record2 = new AMRecord(appId2, "host1", "192.168.1.1", 8080, "ext1", "compute1");
+ AMRecord record3 = new AMRecord(appId3, "host1", "192.168.1.1", 8080, "ext1", "compute1");
+ AMRecord record4 = new AMRecord(appId1, "host2", "192.168.1.1", 8080, "ext1", "compute1");
+ AMRecord record5 = new AMRecord(appId1, "host1", "192.168.1.2", 8080, "ext1", "compute1");
+ AMRecord record6 = new AMRecord(appId1, "host1", "192.168.1.1", 8081, "ext1", "compute1");
+ AMRecord record7 = new AMRecord(appId1, "host1", "192.168.1.1", 8080, "ext2", "compute1");
+ AMRecord record8 = new AMRecord(appId1, "host1", "192.168.1.1", 8080, "ext1", "compute2");
+
+ // Same values should be equal
+ assertEquals(record1, record2);
+ assertEquals(record2, record1);
+ // Different appId
+ assertNotEquals(record1, record3);
+ // Different hostName
+ assertNotEquals(record1, record4);
+ // Different hostIp
+ assertNotEquals(record1, record5);
+ // Different port
+ assertNotEquals(record1, record6);
+ // Different externalId
+ assertNotEquals(record1, record7);
+ // Different computeName
+ assertNotEquals(record1, record8);
+ // Self equality
+ assertEquals(record1, record1);
+ // Null equality
+ assertNotEquals(null, record1);
+ // Different type
+ assertNotEquals("not an AMRecord", record1);
+ }
+
+ @Test
+ public void testHashCode() {
+ ApplicationId appId1 = ApplicationId.newInstance(12345L, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(12345L, 1);
+
+ AMRecord record1 = new AMRecord(appId1, "host1", "192.168.1.1", 8080, "ext1", "compute1");
+ AMRecord record2 = new AMRecord(appId2, "host1", "192.168.1.1", 8080, "ext1", "compute1");
+ AMRecord record3 = new AMRecord(appId1, "host2", "192.168.1.1", 8080, "ext1", "compute1");
+
+ // Equal objects should have same hashCode
+ assertEquals(record1.hashCode(), record2.hashCode());
+ }
+
+ @Test
+ public void testToString() {
+ ApplicationId appId = ApplicationId.newInstance(12345L, 1);
+ String hostName = "test-host.example.com";
+ String hostIp = "192.168.1.100";
+ int port = 8080;
+ String externalId = "external-123";
+ String computeName = "test-compute";
+
+ AMRecord record = new AMRecord(appId, hostName, hostIp, port, externalId, computeName);
+ String str = record.toString();
+
+ assertNotNull(str);
+ // Validate actual JSON-like snippets from the string
+ assertTrue("Should contain appId=value snippet", str.contains("appId=" + appId.toString()));
+ assertTrue("Should contain hostName=value snippet", str.contains("hostName=" + hostName));
+ assertTrue("Should contain hostIp=value snippet", str.contains("hostIp=" + hostIp));
+ assertTrue("Should contain port=value snippet", str.contains("port=" + port));
+ assertTrue("Should contain externalId=value snippet", str.contains("externalId=" + externalId));
+ assertTrue("Should contain computeName=value snippet", str.contains("computeName=" + computeName));
+ }
+
+ @Test
+ public void testRemoveFromCacheByDeserializedRecordAppId() throws Exception {
+ ConcurrentHashMap
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.client.registry.zookeeper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import org.junit.Test;
+
+public class TestZkConfig {
+
+ @Test
+ public void testBasicConfiguration() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "/test-namespace");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ assertEquals("localhost:2181", zkConfig.getZkQuorum());
+ assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace());
+ }
+
+ @Test
+ public void testDefaultValues() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ Configuration defaultConf = new Configuration();
+ long expectedBackoffSleep = defaultConf.getTimeDuration(
+ TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP,
+ TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP_DEFAULT, TimeUnit.MILLISECONDS);
+ long expectedSessionTimeout = defaultConf.getTimeDuration(
+ TezConfiguration.TEZ_AM_CURATOR_SESSION_TIMEOUT,
+ TezConfiguration.TEZ_AM_CURATOR_SESSION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+ long expectedConnectionTimeout = defaultConf.getTimeDuration(
+ TezConfiguration.TEZ_AM_CURATOR_CONNECTION_TIMEOUT,
+ TezConfiguration.TEZ_AM_CURATOR_CONNECTION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+
+ assertEquals(expectedBackoffSleep, zkConfig.getCuratorBackoffSleepMs());
+ assertEquals(TezConfiguration.TEZ_AM_CURATOR_MAX_RETRIES_DEFAULT, zkConfig.getCuratorMaxRetries());
+ assertEquals(expectedSessionTimeout, zkConfig.getSessionTimeoutMs());
+ assertEquals(expectedConnectionTimeout, zkConfig.getConnectionTimeoutMs());
+ }
+
+ @Test
+ public void testCustomConfigurationValues() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "zk1:2181,zk2:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "/custom-namespace");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP, "2000ms");
+ conf.setInt(TezConfiguration.TEZ_AM_CURATOR_MAX_RETRIES, 5);
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_SESSION_TIMEOUT, "200000ms");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_CONNECTION_TIMEOUT, "20000ms");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ assertEquals("zk1:2181,zk2:2181", zkConfig.getZkQuorum());
+ assertEquals("/tez-external-sessions/custom-namespace", zkConfig.getZkNamespace());
+ assertEquals(2000, zkConfig.getCuratorBackoffSleepMs());
+ assertEquals(5, zkConfig.getCuratorMaxRetries());
+ assertEquals(200000, zkConfig.getSessionTimeoutMs());
+ assertEquals(20000, zkConfig.getConnectionTimeoutMs());
+ }
+
+ @Test
+ public void testNamespaceWithLeadingSlash() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "/namespace-with-slash");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ assertEquals("/tez-external-sessions/namespace-with-slash", zkConfig.getZkNamespace());
+ }
+
+ @Test
+ public void testNamespaceWithoutLeadingSlash() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "namespace-without-slash");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ assertEquals("/tez-external-sessions/namespace-without-slash", zkConfig.getZkNamespace());
+ }
+
+ @Test
+ public void testComputeGroupsDisabled() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "/test-namespace");
+ conf.setBoolean(TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS, false);
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace());
+ }
+
+ @Test
+ public void testComputeGroupsEnabledWithoutEnvVar() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "/test-namespace");
+ conf.setBoolean(TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS, true);
+
+ // When compute groups are enabled but env var is not set, namespace should not include sub-namespace
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ // Namespace should start with base namespace (env var not set, so no sub-namespace added)
+ assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace());
+ }
+
+ @Test
+ public void testGetRetryPolicy() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP, "1500ms");
+ conf.setInt(TezConfiguration.TEZ_AM_CURATOR_MAX_RETRIES, 4);
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+ RetryPolicy retryPolicy = zkConfig.getRetryPolicy();
+
+ assertNotNull(retryPolicy);
+ // Verify it's an ExponentialBackoffRetry instance
+ assertEquals("org.apache.curator.retry.ExponentialBackoffRetry", retryPolicy.getClass().getName());
+ }
+
+ @Test
+ public void testTimeUnitSupport() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ // Test different time units
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP, "2s");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_SESSION_TIMEOUT, "3m");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_CONNECTION_TIMEOUT, "5s");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+
+ assertEquals(2000, zkConfig.getCuratorBackoffSleepMs());
+ assertEquals(180000, zkConfig.getSessionTimeoutMs());
+ assertEquals(5000, zkConfig.getConnectionTimeoutMs());
+
+ // Unit-less values should default to milliseconds
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP, "2000");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_SESSION_TIMEOUT, "300000");
+ conf.set(TezConfiguration.TEZ_AM_CURATOR_CONNECTION_TIMEOUT, "15000");
+
+ ZkConfig unitlessConfig = new ZkConfig(conf);
+ assertEquals(2000, unitlessConfig.getCuratorBackoffSleepMs());
+ assertEquals(300000, unitlessConfig.getSessionTimeoutMs());
+ assertEquals(15000, unitlessConfig.getConnectionTimeoutMs());
+ }
+
+ @Test
+ public void testCreateCuratorFramework() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+
+ ZkConfig zkConfig = new ZkConfig(conf);
+ CuratorFramework curator = zkConfig.createCuratorFramework();
+
+ assertNotNull(curator);
+ assertEquals(zkConfig.getZkQuorum(), curator.getZookeeperClient().getCurrentConnectionString());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullZkQuorum() {
+ TezConfiguration conf = new TezConfiguration();
+ // Don't set zkQuorum
+ new ZkConfig(conf);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyZkQuorum() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "");
+ new ZkConfig(conf);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullNamespace() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, null);
+ new ZkConfig(conf);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyNamespace() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ conf.set(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, "");
+ new ZkConfig(conf);
+ }
+
+ @Test
+ public void testDefaultNamespace() {
+ TezConfiguration conf = new TezConfiguration();
+ conf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, "localhost:2181");
+ // Don't set namespace, should use default
+ ZkConfig zkConfig = new ZkConfig(conf);
+ assertEquals("/tez-external-sessions" + TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE_DEFAULT,
+ zkConfig.getZkNamespace());
+ }
+}
diff --git a/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java b/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java
new file mode 100644
index 0000000000..efb5f98733
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client.registry.zookeeper;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.tez.client.registry.AMRecord;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.zookeeper.CreateMode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for {@link ZkFrameworkClient}.
+ *
+ * This test class validates the ZooKeeper-based framework client that discovers
+ * and communicates with Application Masters through ZooKeeper registry.
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.registry.zookeeper;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.registry.AMRecord;
+import org.apache.tez.client.registry.AMRegistry;
+import org.apache.tez.client.registry.AMRegistryUtils;
+import org.apache.tez.client.registry.zookeeper.ZkConfig;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Curator/Zookeeper impl of AMRegistry (for internal use only)
+ * Clients should use org.apache.tez.dag.api.client.registry.zookeeper.ZkAMRegistryClient instead.
+ */
+@InterfaceAudience.Private
+public class ZkAMRegistry implements AMRegistry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistry.class);
+
+ private final List After all removal attempts, the ZooKeeper client is closed and the shutdown
+ * is logged. These hooks allow injecting alternate or additional logic into the
+ * Application Master without requiring a standalone service. They are
+ * intended for behaviors that are too small or cross-cutting to justify
+ * a dedicated service.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.client.registry.AMRegistry;
+
+/**
+ * A {@code FrameworkService} that runs inside the Application Master (AM) process.
+ *
+ * This service bundles together an {@code AMRegistry} and an
+ * {@code AMExtensions} implementation that are designed to be compatible and
+ * work together within the AM lifecycle.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides basic classes of framework plugins for Apache Tez.
+ */
+package org.apache.tez.frameworkplugins;
diff --git a/tez-dag/src/main/java/org/apache/tez/frameworkplugins/yarn/YarnServerFrameworkService.java b/tez-dag/src/main/java/org/apache/tez/frameworkplugins/yarn/YarnServerFrameworkService.java
new file mode 100644
index 0000000000..40d6eb1d9c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/frameworkplugins/yarn/YarnServerFrameworkService.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.registry.AMRegistry;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.frameworkplugins.AMExtensions;
+import org.apache.tez.frameworkplugins.ServerFrameworkService;
+
+/**
+ * YARN-based server framework service implementation.
+ * Provides default YARN framework server functionality with default implementations
+ * for all AmExtensions methods.
+ */
+public class YarnServerFrameworkService implements ServerFrameworkService {
+
+ private final YarnAMExtensions amExtensions = new YarnAMExtensions();
+
+ @Override
+ public AMRegistry getAMRegistry(Configuration conf) {
+ // YARN mode doesn't require a custom AM registry
+ return null;
+ }
+
+ @Override
+ public AMExtensions getAMExtensions() {
+ return amExtensions;
+ }
+
+ /**
+ * Default YARN implementation of AmExtensions.
+ * Provides sensible defaults for all methods.
+ */
+ public static class YarnAMExtensions implements AMExtensions {
+
+ @Override
+ public DAGProtos.ConfigurationProto loadConfigurationProto() throws IOException {
+ return TezUtilsInternal
+ .readUserSpecifiedTezConfiguration(System.getenv(ApplicationConstants.Environment.PWD.name()));
+ }
+
+ @Override
+ public ContainerId allocateContainerId(Configuration conf) {
+ String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ return ContainerId.fromString(containerIdStr);
+ }
+
+ @Override
+ public void checkTaskResources(Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides YARN-specific framework plugins for Apache Tez.
+ */
+package org.apache.tez.frameworkplugins.yarn;
diff --git a/tez-dag/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneAMExtensions.java b/tez-dag/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneAMExtensions.java
new file mode 100644
index 0000000000..6f95bcfb56
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/frameworkplugins/zookeeper/ZkStandaloneAMExtensions.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins.zookeeper;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.client.registry.AMRegistry;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.frameworkplugins.AMExtensions;
+import org.apache.tez.frameworkplugins.ServerFrameworkService;
+
+public class ZkStandaloneAMExtensions implements AMExtensions {
+
+ private final ServerFrameworkService frameworkService;
+
+ public ZkStandaloneAMExtensions(ServerFrameworkService frameworkService) {
+ this.frameworkService = frameworkService;
+ }
+
+ @Override
+ public ContainerId allocateContainerId(Configuration conf) {
+ try {
+ AMRegistry amRegistry = frameworkService.getAMRegistry(conf);
+ if (amRegistry != null) {
+ ApplicationId appId = amRegistry.generateNewId();
+ // attemptId is set to 1 only then APP_LAUNCHED event gets triggered
+ ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ return ContainerId.newContainerId(applicationAttemptId, 0);
+ } else {
+ throw new IllegalStateException("AMRegistry must not be null for standalone AM mode");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void checkTaskResources(Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.frameworkplugins.zookeeper;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.client.registry.AMRegistry;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.client.registry.zookeeper.ZkAMRegistry;
+import org.apache.tez.frameworkplugins.AMExtensions;
+import org.apache.tez.frameworkplugins.ServerFrameworkService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkStandaloneServerFrameworkService implements ServerFrameworkService {
+ private static final Logger LOG = LoggerFactory.getLogger(ZkStandaloneServerFrameworkService.class);
+ private final ZkStandaloneAMExtensions amExtensions = new ZkStandaloneAMExtensions(this);
+ private volatile ZkAMRegistry amRegistry;
+
+ /**
+ * Returns a singleton {@link AMRegistry} instance backed by ZooKeeper.
+ *
+ * If the registry has not yet been created, this method initializes and starts
+ * a new {@link ZkAMRegistry} using the external AM identifier obtained from the
+ * {@code TEZ_AM_EXTERNAL_ID} environment variable. When the registry is used as a service within the DAGAppMaster, the
+ * DAGAppMaster is responsible for managing its lifecycle, including closure.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides Zookeeper-specific framework plugins for Apache Tez.
+ */
+package org.apache.tez.frameworkplugins.zookeeper;
diff --git a/tez-dag/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkAMRegistryClient.java b/tez-dag/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkAMRegistryClient.java
new file mode 100644
index 0000000000..bbeee6b3c2
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkAMRegistryClient.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client.registry.zookeeper;
+
+
+import static org.apache.tez.frameworkplugins.FrameworkMode.STANDALONE_ZOOKEEPER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.registry.AMRecord;
+import org.apache.tez.client.registry.AMRegistryClientListener;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.MockDAGAppMaster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for {@link ZkAMRegistryClient}.
+ *
+ * This test class validates the ZooKeeper-based AM (Application Master) registry and discovery
+ * mechanism. It tests that when a DAGAppMaster is started with STANDALONE_ZOOKEEPER framework mode,
+ * it properly registers itself to ZooKeeper and can be discovered by a {@link ZkAMRegistryClient}.
+ *
+ * The tests use an embedded ZooKeeper {@link TestingServer} to avoid external dependencies
+ * and ensure test isolation.
+ *
+ * This test validates the following workflow:
+ *
+ * The test uses a {@link CountDownLatch} to synchronize between the AM registration
+ * event and the test assertions, ensuring the AM has fully registered before validation.
+ * This test class focuses on the low-level AM registry implementation that runs
+ * inside the AM process. It validates that:
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Public
+@Evolving
+package org.apache.tez.runtime.library;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
*
+ *
+ *
+ *
+ */
+public class TestZkAMRegistry {
+
+ private TestingServer zkServer;
+
+ @Before
+ public void setup() throws Exception {
+ zkServer = new TestingServer();
+ zkServer.start();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (zkServer != null) {
+ zkServer.close();
+ }
+ }
+
+ @Test
+ public void testGenerateNewIdProducesUniqueIds() throws Exception {
+ TezConfiguration conf = createTezConf();
+ try (ZkAMRegistry registry = new ZkAMRegistry("external-id")) {
+ registry.init(conf);
+ registry.start();
+
+ ApplicationId first = registry.generateNewId();
+ ApplicationId second = registry.generateNewId();
+
+ assertNotNull(first);
+ assertNotNull(second);
+ assertEquals("Cluster timestamps should match", first.getClusterTimestamp(), second.getClusterTimestamp());
+ assertEquals("Second id should be first id + 1", first.getId() + 1, second.getId());
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testGenerateNewIdFromParallelThreads() throws Exception {
+ final int threadCount = 50;
+
+ TezConfiguration conf = createTezConf();
+ // this is the maxRetries for ExponentialBackoffRetry, let's use it to be able to test high concurrency
+ conf.setInt(TezConfiguration.TEZ_AM_CURATOR_MAX_RETRIES, 29);
+
+ try (ZkAMRegistry registry = new ZkAMRegistry("external-id")) {
+ registry.init(conf);
+ registry.start();
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+ Set