Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 0 additions & 116 deletions hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_DNS_INTERFACE_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_DNS_NAMESERVER_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
Expand All @@ -58,12 +52,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import javax.management.ObjectName;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.ConfigRedactor;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
Expand Down Expand Up @@ -280,113 +271,6 @@ public static OptionalInt getPortNumberFromConfigKeys(
return OptionalInt.empty();
}

/**
* Retrieve the socket addresses of all storage container managers.
*
* @return A collection of SCM addresses
* @throws IllegalArgumentException If the configuration is invalid
*/
public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
ConfigurationSource conf) {

// First check HA style config, if not defined fall back to OZONE_SCM_NAMES

if (getScmServiceId(conf) != null) {
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
Collection<InetSocketAddress> scmAddressList =
new HashSet<>(scmNodeInfoList.size());
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
scmAddressList.add(
NetUtils.createSocketAddr(scmNodeInfo.getScmDatanodeAddress()));
}
return scmAddressList;
} else {
// fall back to OZONE_SCM_NAMES.
Collection<String> names =
conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
if (names.isEmpty()) {
throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
+ " need to be a set of valid DNS names or IP addresses."
+ " Empty address list found.");
}

Collection<InetSocketAddress> addresses = new HashSet<>(names.size());
for (String address : names) {
Optional<String> hostname = getHostName(address);
if (!hostname.isPresent()) {
throw new IllegalArgumentException("Invalid hostname for SCM: "
+ address);
}
int port = getHostPort(address)
.orElse(conf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
OZONE_SCM_DATANODE_PORT_DEFAULT));
InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
port);
addresses.add(addr);
}

if (addresses.size() > 1) {
LOG.warn("When SCM HA is configured, configure {} appended with " +
"serviceId and nodeId. {} is deprecated.", OZONE_SCM_ADDRESS_KEY,
OZONE_SCM_NAMES);
}
return addresses;
}
}

/**
* Returns the SCM address for datanodes based on the service ID and the SCM addresses.
* @param conf Configuration
* @param scmServiceId SCM service ID
* @param scmNodeIds Requested SCM node IDs
* @return A collection with addresses of the request SCM node IDs.
* Null if there is any wrongly configured SCM address. Note that the returned collection
* might not be ordered the same way as the requested SCM node IDs
*/
public static Collection<Pair<String, InetSocketAddress>> getSCMAddressForDatanodes(
ConfigurationSource conf, String scmServiceId, Set<String> scmNodeIds) {
Collection<Pair<String, InetSocketAddress>> scmNodeAddress = new HashSet<>(scmNodeIds.size());
for (String scmNodeId : scmNodeIds) {
String addressKey = ConfUtils.addKeySuffixes(
OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
String scmAddress = conf.get(addressKey);
if (scmAddress == null) {
LOG.warn("The SCM address configuration {} is not defined, return nothing", addressKey);
return null;
}

int scmDatanodePort = SCMNodeInfo.getPort(conf, scmServiceId, scmNodeId,
OZONE_SCM_DATANODE_ADDRESS_KEY, OZONE_SCM_DATANODE_PORT_KEY,
OZONE_SCM_DATANODE_PORT_DEFAULT);

String scmDatanodeAddressStr = SCMNodeInfo.buildAddress(scmAddress, scmDatanodePort);
InetSocketAddress scmDatanodeAddress = NetUtils.createSocketAddr(scmDatanodeAddressStr);
scmNodeAddress.add(Pair.of(scmNodeId, scmDatanodeAddress));
}
return scmNodeAddress;
}

/**
* Retrieve the socket addresses of recon.
*
* @return Recon address
* @throws IllegalArgumentException If the configuration is invalid
*/
public static InetSocketAddress getReconAddresses(
ConfigurationSource conf) {
String name = conf.get(OZONE_RECON_ADDRESS_KEY);
if (StringUtils.isEmpty(name)) {
return null;
}
Optional<String> hostname = getHostName(name);
if (!hostname.isPresent()) {
throw new IllegalArgumentException("Invalid hostname for Recon: "
+ name);
}
int port = getHostPort(name).orElse(OZONE_RECON_DATANODE_PORT_DEFAULT);
return NetUtils.createSocketAddr(hostname.get(), port);
}

/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,14 @@

package org.apache.hadoop.hdds;

import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import static org.apache.hadoop.hdds.HddsUtils.processForLogging;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.InetSocketAddress;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -103,128 +92,6 @@ void validatePathRejectsInvalidPath(String path, String ancestor) {
() -> HddsUtils.validatePath(Paths.get(path), Paths.get(ancestor)));
}

@Test
void testGetSCMAddresses() {
final OzoneConfiguration conf = new OzoneConfiguration();
Collection<InetSocketAddress> addresses;
InetSocketAddress addr;
Iterator<InetSocketAddress> it;

// Verify valid IP address setup
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
addresses = getSCMAddressForDatanodes(conf);
assertEquals(1, addresses.size());
addr = addresses.iterator().next();
assertEquals("1.2.3.4", addr.getHostName());
assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());

// Verify valid hostname setup
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
addresses = getSCMAddressForDatanodes(conf);
assertEquals(1, addresses.size());
addr = addresses.iterator().next();
assertEquals("scm1", addr.getHostName());
assertEquals(OZONE_SCM_DATANODE_PORT_DEFAULT, addr.getPort());

// Verify valid hostname and port
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
addresses = getSCMAddressForDatanodes(conf);
assertEquals(1, addresses.size());
addr = addresses.iterator().next();
assertEquals("scm1", addr.getHostName());
assertEquals(1234, addr.getPort());

final Map<String, Integer> hostsAndPorts = new HashMap<>();
hostsAndPorts.put("scm1", 1234);
hostsAndPorts.put("scm2", 2345);
hostsAndPorts.put("scm3", 3456);

// Verify multiple hosts and port
conf.setStrings(
ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
addresses = getSCMAddressForDatanodes(conf);
assertEquals(3, addresses.size());
it = addresses.iterator();
HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
while (it.hasNext()) {
InetSocketAddress current = it.next();
assertTrue(expected1.remove(current.getHostName(),
current.getPort()));
}
assertThat(expected1).isEmpty();

// Verify names with spaces
conf.setStrings(
ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
addresses = getSCMAddressForDatanodes(conf);
assertEquals(3, addresses.size());
it = addresses.iterator();
HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
while (it.hasNext()) {
InetSocketAddress current = it.next();
assertTrue(expected2.remove(current.getHostName(),
current.getPort()));
}
assertThat(expected2).isEmpty();

// Verify empty value
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
assertThrows(IllegalArgumentException.class,
() -> getSCMAddressForDatanodes(conf),
"Empty value should cause an IllegalArgumentException");

// Verify invalid hostname
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
assertThrows(IllegalArgumentException.class,
() -> getSCMAddressForDatanodes(conf),
"An invalid hostname should cause an IllegalArgumentException");

// Verify invalid port
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
assertThrows(IllegalArgumentException.class,
() -> getSCMAddressForDatanodes(conf),
"An invalid port should cause an IllegalArgumentException");

// Verify a mixed case (valid and invalid value both appears)
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
assertThrows(IllegalArgumentException.class,
() -> getSCMAddressForDatanodes(conf),
"An invalid value should cause an IllegalArgumentException");
}

@Test
void testGetSCMAddressesWithHAConfig() {
OzoneConfiguration conf = new OzoneConfiguration();
String scmServiceId = "scmserviceId";
String[] nodes = new String[]{"scm1", "scm2", "scm3"};
conf.set(ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY, scmServiceId);
conf.set(ScmConfigKeys.OZONE_SCM_NODES_KEY + "." + scmServiceId,
"scm1,scm2,scm3");

int port = 9880;
List<String> expected = new ArrayList<>();
for (String nodeId : nodes) {
conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_ADDRESS_KEY,
scmServiceId, nodeId), "scm");
conf.setInt(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
scmServiceId, nodeId), ++port);
expected.add("scm" + ":" + port);
}

Collection<InetSocketAddress> scmAddressList =
HddsUtils.getSCMAddressForDatanodes(conf);

assertNotNull(scmAddressList);
assertEquals(3, scmAddressList.size());

for (InetSocketAddress next : scmAddressList) {
expected.remove(next.getHostName() + ":" + next.getPort());
}

assertEquals(0, expected.size());

}

@Test
void testGetNumberFromConfigKeys() {
final String testnum1 = "8";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,12 +739,12 @@ private String reconfigScmNodes(String value) {
LOG.info("Reconfiguring SCM nodes for service ID {} with new SCM nodes {} and remove SCM nodes {}",
scmServiceId, scmNodesIdsToAdd, scmNodesIdsToRemove);

Collection<Pair<String, InetSocketAddress>> scmToAdd = HddsUtils.getSCMAddressForDatanodes(
Collection<Pair<String, InetSocketAddress>> scmToAdd = HddsServerUtil.getSCMAddressForDatanodes(
getConf(), scmServiceId, scmNodesIdsToAdd);
if (scmToAdd == null) {
throw new IllegalStateException("Reconfiguration failed to get SCM address to add due to wrong configuration");
}
Collection<Pair<String, InetSocketAddress>> scmToRemove = HddsUtils.getSCMAddressForDatanodes(
Collection<Pair<String, InetSocketAddress>> scmToRemove = HddsServerUtil.getSCMAddressForDatanodes(
getConf(), scmServiceId, scmNodesIdsToRemove);
if (scmToRemove == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
Expand Down Expand Up @@ -314,7 +313,7 @@ private int getEndPointTaskThreadPoolSize() {
int totalServerCount = 1;

try {
totalServerCount += HddsUtils.getSCMAddressForDatanodes(conf).size();
totalServerCount += HddsServerUtil.getSCMAddressForDatanodes(conf).size();
} catch (Exception e) {
LOG.error("Fail to get scm addresses", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.hadoop.ozone.container.common.states.datanode;

import static org.apache.hadoop.hdds.HddsUtils.getReconAddresses;
import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getReconAddressForDatanodes;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getSCMAddressForDatanodes;

import com.google.common.base.Strings;
import java.io.File;
Expand Down Expand Up @@ -104,7 +104,7 @@ public DatanodeStateMachine.DatanodeStates call() throws Exception {
connectionManager.addSCMServer(addr, context.getThreadNamePrefix());
this.context.addEndpoint(addr);
}
InetSocketAddress reconAddress = getReconAddresses(conf);
InetSocketAddress reconAddress = getReconAddressForDatanodes(conf);
if (reconAddress != null) {
connectionManager.addReconServer(reconAddress,
context.getThreadNamePrefix());
Expand Down
Loading