Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ In Kubernetes, the most commonly used mechanism for topology awareness are label
The most prevalent example for this is the node label [topology.kubernetes.io/zone](https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone) which often refers to availability zones in cloud providers or similar things.

The purpose of this tool is to feed information from Kubernetes into the HDFS rack awareness functionality.
In order to do this, it implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`.
In order to do this, `tech.stackable.hadoop.StackableTopologyProvider` implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`.

The topology provider watches all HDFS pods deployed by Stackable and Kubernetes nodes and keeps an in memory cache of the current state of these objects.
From this state store the tool can then calculate rack IDs for nodes that HDFS asks for without needing to talk to the api-server and incurring an extra network round-trip.
Expand Down
899 changes: 426 additions & 473 deletions src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java

Large diffs are not rendered by default.

75 changes: 75 additions & 0 deletions src/main/java/tech/stackable/hadoop/TopologyCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package tech.stackable.hadoop;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Pod;
import java.util.List;
import java.util.concurrent.TimeUnit;

/** Manages all caching layers for the topology provider. */
public class TopologyCache {
private final Cache<String, String> topology;
private final Cache<String, Node> nodes;
private final Cache<String, GenericKubernetesResource> listeners;
private final Cache<String, Pod> pods;

TopologyCache(int expirationSeconds, int defaultExpirationSeconds) {
this.topology =
Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build();

this.nodes =
Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build();

this.listeners =
Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build();

this.pods =
Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build();
}

String getTopology(String key) {
return topology.getIfPresent(key);
}

void putTopology(String key, String value) {
topology.put(key, value);
}

void invalidateAllTopologyKeys() {
topology.invalidateAll();
}

void invalidateTopologyKeys(List<String> keys) {
keys.forEach(topology::invalidate);
}

Node getNode(String name) {
return nodes.getIfPresent(name);
}

void putNode(String name, Node node) {
nodes.put(name, node);
}

GenericKubernetesResource getListener(String name) {
return listeners.getIfPresent(name);
}

void putListener(String name, GenericKubernetesResource listener) {
listeners.put(name, listener);
}

Pod getPod(String name) {
return pods.getIfPresent(name);
}

void putPod(String name, Pod pod) {
pods.put(name, pod);
}

boolean hasAllPods(List<String> names) {
return names.stream().noneMatch(name -> pods.getIfPresent(name) == null);
}
}
116 changes: 116 additions & 0 deletions src/main/java/tech/stackable/hadoop/TopologyLabel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package tech.stackable.hadoop;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyLabel {
private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class);
public static final String VARNAME_LABELS = "TOPOLOGY_LABELS";
public static final String VARNAME_MAX_LEVELS = "TOPOLOGY_MAX_LEVELS";
private static final int MAX_LEVELS_DEFAULT = 2;

public enum Type {
NODE,
POD,
UNDEFINED
}

private final Type type;
private final String name;

TopologyLabel(String config) {
if (config == null || config.isEmpty()) {
this.type = Type.UNDEFINED;
this.name = null;
return;
}

String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2);

if (parts.length != 2) {
LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:<label>'", config);
this.type = Type.UNDEFINED;
this.name = null;
return;
}

this.name = parts[1];

switch (parts[0]) {
case "node":
this.type = Type.NODE;
break;
case "pod":
this.type = Type.POD;
break;
default:
LOG.warn("Unsupported label type '{}' - must be 'node' or 'pod'", parts[0]);
this.type = Type.UNDEFINED;
}
}

boolean isNodeLabel() {
return type == Type.NODE;
}

boolean isUndefined() {
return type == Type.UNDEFINED;
}

String getName() {
return name;
}

Type getType() {
return type;
}

public static List<TopologyLabel> initializeTopologyLabels() {
// Read the labels to be used to build a topology from environment variables. Labels are
// configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form
// "[node|pod]:<labelname>" and separated by ";". So a valid configuration that reads topology
// information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node
// that is running a datanode pod would look like this:
// "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on
// the number of labels that are processed, because this is what Hadoop traditionally allows -
// this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS".
String topologyConfig = System.getenv(VARNAME_LABELS);

if (topologyConfig == null || topologyConfig.isEmpty()) {
LOG.error(
"Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS);
throw new RuntimeException("TOPOLOGY_LABELS environment variable not set");
}

String[] labelConfigs = topologyConfig.split(";");

if (labelConfigs.length > getMaxLabels()) {
LOG.error(
"Found [{}] topology labels configured, but maximum allowed number is [{}]: "
+ "please check your config or raise the number of allowed labels.",
labelConfigs.length,
getMaxLabels());
throw new RuntimeException("Too many topology labels configured");
}
// Create TopologyLabels from config strings
List<TopologyLabel> labels =
Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList());

if (labels.stream().anyMatch(TopologyLabel::isUndefined)) {
LOG.error(
"Invalid topology label configuration - labels must be in format '[pod|node]:<label>'");
throw new RuntimeException("Invalid topology label configuration");
}

return labels;
}

private static int getMaxLabels() {
return TopologyUtils.parseIntFromEnv(
VARNAME_MAX_LEVELS, MAX_LEVELS_DEFAULT, "maximum topology levels");
}
}
22 changes: 22 additions & 0 deletions src/main/java/tech/stackable/hadoop/TopologyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyUtils {
private static final Logger LOG = LoggerFactory.getLogger(TopologyUtils.class);

private static final String ADDRESS = "address";
private static final String STATUS = "status";
private static final String INGRESS_ADDRESSES = "ingressAddresses";
Expand All @@ -21,4 +25,22 @@ public static List<String> getIngressAddresses(GenericKubernetesResource listene
.map(ingress -> (String) ingress.get(ADDRESS))
.collect(Collectors.toList());
}

public static int parseIntFromEnv(String varName, int defaultValue, String description) {
String value = System.getenv(varName);
if (value == null || value.isEmpty()) {
LOG.info("Set {} to default value {}", description, defaultValue);
return defaultValue;
}

try {
int parsed = Integer.parseInt(value);
LOG.info("Set {} to {} from environment variable {}", description, parsed, varName);
return parsed;
} catch (NumberFormatException e) {
LOG.warn(
"Invalid integer value '{}' for {} - using default: {}", value, varName, defaultValue);
return defaultValue;
}
}
}
8 changes: 4 additions & 4 deletions test/topology-provider/stack/01-install-krb5-kdc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ spec:
spec:
initContainers:
- name: init
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
args:
- sh
- -euo
Expand All @@ -52,7 +52,7 @@ spec:
name: data
containers:
- name: kdc
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
args:
- krb5kdc
- -n
Expand All @@ -65,7 +65,7 @@ spec:
- mountPath: /var/kerberos/krb5kdc
name: data
- name: kadmind
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
args:
- kadmind
- -nofork
Expand All @@ -78,7 +78,7 @@ spec:
- mountPath: /var/kerberos/krb5kdc
name: data
- name: client
image: oci.stackable.tech/sdp/krb5:1.18.2-stackable0.0.0-dev
image: oci.stackable.tech/sdp/krb5:1.21.1-stackable0.0.0-dev
tty: true
stdin: true
env:
Expand Down
38 changes: 21 additions & 17 deletions test/topology-provider/stack/03-hdfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,55 @@ metadata:
name: simple-zk
spec:
image:
productVersion: 3.8.3
productVersion: 3.9.4
pullPolicy: IfNotPresent
servers:
roleGroups:
default:
replicas: 1
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: simple-hdfs-znode
spec:
clusterRef:
name: simple-zk
---
apiVersion: hdfs.stackable.tech/v1alpha1
kind: HdfsCluster
metadata:
name: simple-hdfs
spec:
image:
productVersion: 3.4.0
custom: hdfs # updated by tilt
productVersion: 3.4.2
custom: oci.stackable.tech/sandbox/andrew/hadoop:3.4.2-stackable0.0.0-topprov
pullPolicy: IfNotPresent
clusterConfig:
dfsReplication: 1
zookeeperConfigMapName: simple-hdfs-znode
zookeeperConfigMapName: simple-zk
rackAwareness:
- labelType: node
labelName: kubernetes.io/hostname
- labelType: pod
labelName: app.kubernetes.io/role-group
- nodeLabel: kubernetes.io/hostname
- podLabel: app.kubernetes.io/role-group
authentication:
tlsSecretClass: tls
kerberos:
secretClass: kerberos-default
nameNodes:
config:
listenerClass: external-stable # We want to access the Web UI
listenerClass: external-stable
logging:
enableVectorAgent: false
containers:
hdfs:
console:
level: DEBUG
file:
level: DEBUG
loggers:
ROOT:
level: DEBUG
configOverrides: &configOverrides
core-site.xml:
hadoop.user.group.static.mapping.overrides: "dr.who=;nn=;nm=;jn=;testuser=supergroup;"
roleGroups:
default:
replicas: 2
dataNodes:
config:
listenerClass: external-stable
configOverrides: *configOverrides
roleGroups:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ metadata:
name: spark-teragen
spec:
sparkImage:
custom: docker.stackable.tech/stackable/spark-k8s-with-teragen:3.5.0-stackable0.0.0-dev
productVersion: 3.5.0
custom: oci.stackable.tech/sandbox/spark-k8s:3.5.6-stackable0.0.0-terasort
productVersion: 3.5.6
pullPolicy: IfNotPresent
mode: cluster
mainApplicationFile: local:///tmp/spark-terasort-1.2-SNAPSHOT.jar
mainClass: com.github.ehiggs.spark.terasort.TeraGen
args:
- "10M"
- "100M"
- "hdfs://simple-hdfs/user/stackable/teragen_output"
sparkConf:
"spark.driver.extraClassPath": "/etc/hadoop/conf/:/stackable/spark/extra-jars/*"
Expand Down
Loading
Loading