Skip to content

Commit c0d667d

Browse files
authored
feat: add HA deployment function for HDFS (#268)
1 parent dc5824d commit c0d667d

File tree

9 files changed

+547
-12
lines changed

9 files changed

+547
-12
lines changed

bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ protected Map<String, List<String>> getComponentHostsMap() {
9393
String componentName = componentHost.getComponentName();
9494
List<String> hostnames = componentHost.getHostnames();
9595
if (CollectionUtils.isEmpty(hostnames)) {
96-
throw new RuntimeException("No hostnames found for component " + componentName);
96+
// throw new RuntimeException("No hostnames found for component "
97+
// + componentName);
98+
continue;
9799
}
98100
componentHostsMap.put(componentName, hostnames);
99101
}

bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/hadoop/metainfo.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,23 @@
5252
<category>server</category>
5353
<cardinality>1+</cardinality>
5454
</component>
55+
<component>
56+
<name>zkfc</name>
57+
<display-name>zkfc</display-name>
58+
<category>server</category>
59+
<cardinality>0-2</cardinality>
60+
</component>
61+
<component>
62+
<name>journalnode</name>
63+
<display-name>journalnode</display-name>
64+
<category>server</category>
65+
<cardinality>0-3</cardinality>
66+
</component>
5567
<component>
5668
<name>secondarynamenode</name>
5769
<display-name>SNameNode</display-name>
5870
<category>server</category>
59-
<cardinality>1</cardinality>
71+
<cardinality>0-1</cardinality>
6072
</component>
6173

6274
<!-- Yarn Components -->

bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
import lombok.NoArgsConstructor;
3434
import lombok.extern.slf4j.Slf4j;
3535

36+
import java.io.File;
3637
import java.text.MessageFormat;
3738
import java.util.ArrayList;
3839
import java.util.Arrays;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.regex.Matcher;
43+
import java.util.regex.Pattern;
4144

4245
@Getter
4346
@Slf4j
@@ -64,6 +67,9 @@ public class HadoopParams extends BigtopParams {
6467
private String dfsNameNodeDir;
6568
private String dfsNameNodeCheckPointDir;
6669
private String dfsDomainSocketPathPrefix;
70+
private String dfsJourNalNodeDir;
71+
private String dfsHttpPort;
72+
private String journalHttpPort;
6773

6874
private String nodeManagerLogDir = "/hadoop/yarn/log";
6975
private String nodeManagerLocalDir = "/hadoop/yarn/local";
@@ -103,9 +109,27 @@ public Map<String, Object> hdfsLog4j() {
103109
public Map<String, Object> coreSite() {
104110
Map<String, Object> coreSite = LocalSettings.configurations(getServiceName(), "core-site");
105111
List<String> namenodeList = LocalSettings.componentHosts("namenode");
106-
if (!namenodeList.isEmpty()) {
112+
List<String> zookeeperServerHosts = LocalSettings.componentHosts("zookeeper_server");
113+
Map<String, Object> ZKPort = LocalSettings.configurations("zookeeper", "zoo.cfg");
114+
String clientPort = (String) ZKPort.get("clientPort");
115+
StringBuilder zkString = new StringBuilder();
116+
for (int i = 0; i < zookeeperServerHosts.size(); i++) {
117+
String host = zookeeperServerHosts.get(i);
118+
if (host == null || host.trim().isEmpty()) {
119+
continue;
120+
}
121+
zkString.append(host.trim()).append(":").append(clientPort);
122+
if (i != zookeeperServerHosts.size() - 1) {
123+
zkString.append(",");
124+
}
125+
}
126+
if (!namenodeList.isEmpty() && namenodeList.size() == 1) {
107127
coreSite.put(
108128
"fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost", namenodeList.get(0)));
129+
} else if (!namenodeList.isEmpty() && namenodeList.size() == 2) {
130+
coreSite.put(
131+
"fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost:8020", "nameservice1"));
132+
coreSite.put("ha.zookeeper.quorum", zkString);
109133
}
110134
return coreSite;
111135
}
@@ -119,7 +143,8 @@ public Map<String, Object> hadoopPolicy() {
119143
public Map<String, Object> hdfsSite() {
120144
Map<String, Object> hdfsSite = LocalSettings.configurations(getServiceName(), "hdfs-site");
121145
List<String> namenodeList = LocalSettings.componentHosts("namenode");
122-
if (!namenodeList.isEmpty()) {
146+
List<String> journalNodeList = LocalSettings.componentHosts("journalnode");
147+
if (!namenodeList.isEmpty() && namenodeList.size() == 1) {
123148
hdfsSite.put(
124149
"dfs.namenode.rpc-address",
125150
((String) hdfsSite.get("dfs.namenode.rpc-address")).replace("0.0.0.0", namenodeList.get(0)));
@@ -129,6 +154,26 @@ public Map<String, Object> hdfsSite() {
129154
hdfsSite.put(
130155
"dfs.namenode.https-address",
131156
((String) hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0", namenodeList.get(0)));
157+
} else if (!namenodeList.isEmpty() && namenodeList.size() == 2) {
158+
hdfsSite.remove("dfs.namenode.http-address");
159+
hdfsSite.put("dfs.ha.automatic-failover.enabled", "true");
160+
hdfsSite.put("dfs.nameservices", "nameservice1");
161+
hdfsSite.put("dfs.ha.namenodes.nameservice1", "nn1,nn2");
162+
hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn1", namenodeList.get(0) + ":8020");
163+
hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn2", namenodeList.get(1) + ":8020");
164+
hdfsSite.put("dfs.namenode.http-address.nameservice1.nn1", namenodeList.get(0) + ":9870");
165+
hdfsSite.put("dfs.namenode.http-address.nameservice1.nn2", namenodeList.get(1) + ":9870");
166+
hdfsSite.put(
167+
"dfs.namenode.shared.edits.dir",
168+
"qjournal://" + journalNodeList.get(0) + ":8485;" + journalNodeList.get(1) + ":8485;"
169+
+ journalNodeList.get(2) + ":8485" + "/nameservice1");
170+
hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal");
171+
hdfsSite.put(
172+
"dfs.client.failover.proxy.provider.nameservice1",
173+
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
174+
hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal");
175+
hdfsSite.put("dfs.ha.fencing.methods", "shell(/bin/true)");
176+
hdfsSite.put("dfs.replication", "3");
132177
}
133178

134179
// Configure native library dependent settings
@@ -139,12 +184,30 @@ public Map<String, Object> hdfsSite() {
139184
nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(","))
140185
.map(x -> x + "/namenode-formatted/")
141186
.toList();
142-
187+
String dfsHttpAddress = (String) hdfsSite.get("dfs.namenode.http-address.nameservice1.nn1");
188+
if (dfsHttpAddress != null && dfsHttpAddress.contains(":")) {
189+
String[] parts = dfsHttpAddress.split(":");
190+
if (parts.length >= 2) {
191+
dfsHttpPort = parts[1].trim();
192+
}
193+
}
194+
String journalHttpAddress = (String) hdfsSite.get("dfs.namenode.shared.edits.dir");
195+
Pattern pattern = Pattern.compile(":(\\d{1,5})");
196+
Matcher matcher = pattern.matcher(journalHttpAddress);
197+
if (matcher.find()) {
198+
journalHttpPort = matcher.group(1);
199+
log.info("find jounalnode port: " + journalHttpPort);
200+
} else {
201+
log.warn("not found journalnode port!");
202+
}
143203
String dfsDomainSocketPath = (String) hdfsSite.get("dfs.domain.socket.path");
144204
if (StringUtils.isNotBlank(dfsDomainSocketPath)) {
145-
dfsDomainSocketPathPrefix = dfsDomainSocketPath.replace("dn._PORT", "");
205+
File file = new File(dfsDomainSocketPath);
206+
dfsDomainSocketPathPrefix = file.getParent();
207+
// dfsDomainSocketPathPrefix = dfsDomainSocketPath.replace("dn._PORT", "");
146208
}
147209
dfsNameNodeCheckPointDir = (String) hdfsSite.get("dfs.namenode.checkpoint.dir");
210+
dfsJourNalNodeDir = (String) hdfsSite.get("dfs.journalnode.edits.dir");
148211
return hdfsSite;
149212
}
150213

bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.bigtop.manager.stack.core.enums.ConfigType;
2424
import org.apache.bigtop.manager.stack.core.exception.StackException;
2525
import org.apache.bigtop.manager.stack.core.spi.param.Params;
26+
import org.apache.bigtop.manager.stack.core.utils.LocalSettings;
2627
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxFileUtils;
2728
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;
2829

@@ -33,8 +34,12 @@
3334
import lombok.extern.slf4j.Slf4j;
3435

3536
import java.io.File;
37+
import java.net.InetSocketAddress;
38+
import java.net.Socket;
3639
import java.text.MessageFormat;
40+
import java.util.List;
3741
import java.util.Map;
42+
import java.util.concurrent.TimeUnit;
3843

3944
@Slf4j
4045
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -75,6 +80,14 @@ public static ShellResult configure(Params params, String componentName) {
7580
Constants.PERMISSION_755,
7681
true);
7782
}
83+
case "journalnode": {
84+
LinuxFileUtils.createDirectories(
85+
hadoopParams.getDfsJourNalNodeDir(),
86+
hadoopUser,
87+
hadoopGroup,
88+
Constants.PERMISSION_755,
89+
true);
90+
}
7891
case "datanode": {
7992
LinuxFileUtils.createDirectories(
8093
hadoopParams.getDfsDomainSocketPathPrefix(),
@@ -229,9 +242,13 @@ public static ShellResult configure(Params params, String componentName) {
229242
public static void formatNameNode(HadoopParams hadoopParams) {
230243
if (!isNameNodeFormatted(hadoopParams)) {
231244
String formatCmd = MessageFormat.format(
232-
"{0}/hdfs --config {1} namenode -format -nonInteractive",
245+
"{0}/hdfs --config {1} namenode -format -nonInteractive -force",
233246
hadoopParams.binDir(), hadoopParams.confDir());
234247
try {
248+
boolean allJnReachable = checkAllJournalNodesPortReachable(hadoopParams);
249+
if (!allJnReachable) {
250+
throw new StackException("Cannot format NameNode: Some JournalNodes are unreachable.");
251+
}
235252
LinuxOSUtils.sudoExecCmd(formatCmd, hadoopParams.user());
236253
} catch (Exception e) {
237254
throw new StackException(e);
@@ -248,6 +265,59 @@ public static void formatNameNode(HadoopParams hadoopParams) {
248265
}
249266
}
250267

268+
private static boolean checkAllJournalNodesPortReachable(HadoopParams hadoopParams) throws InterruptedException {
269+
List<String> journalNodeList = LocalSettings.componentHosts("journalnode");
270+
String port = hadoopParams.getJournalHttpPort();
271+
if (journalNodeList == null || journalNodeList.isEmpty()) {
272+
throw new IllegalArgumentException("JournalNode host list cannot be empty!");
273+
}
274+
int retryCount = 0;
275+
int maxRetry = 100;
276+
long retryIntervalMs = 2000;
277+
int connectTimeoutMs = 1000;
278+
while (retryCount < maxRetry) {
279+
boolean allReachable = true;
280+
for (String host : journalNodeList) {
281+
boolean isReachable = false;
282+
Socket socket = null;
283+
try {
284+
socket = new Socket();
285+
socket.connect(new InetSocketAddress(host, Integer.parseInt(port)), connectTimeoutMs);
286+
isReachable = true;
287+
log.info("JournalNode [{}:{}] is reachable.", host, port);
288+
} catch (Exception e) {
289+
allReachable = false;
290+
log.warn(
291+
"JournalNode [{}:{}] is NOT reachable (retry {}/{}). Error: {}",
292+
host,
293+
port,
294+
retryCount + 1,
295+
maxRetry,
296+
e.getMessage());
297+
} finally {
298+
if (socket != null && !socket.isClosed()) {
299+
try {
300+
socket.close();
301+
} catch (Exception e) {
302+
log.debug("Failed to close socket for [{}:{}].", host, port, e);
303+
}
304+
}
305+
}
306+
}
307+
if (allReachable) {
308+
log.info("All {} JournalNodes are reachable. Proceeding to format NameNode.", journalNodeList.size());
309+
return true;
310+
}
311+
retryCount++;
312+
if (retryCount < maxRetry) {
313+
log.info("Waiting {}ms before next retry ({} remaining).", retryIntervalMs, maxRetry - retryCount);
314+
TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
315+
}
316+
}
317+
log.error("Failed to reach all JournalNodes after {} retries. JournalNode list: {}", maxRetry, journalNodeList);
318+
return false;
319+
}
320+
251321
public static boolean isNameNodeFormatted(HadoopParams hadoopParams) {
252322

253323
boolean isFormatted = false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bigtop.manager.stack.bigtop.v3_3_0.hadoop;
20+
21+
import org.apache.bigtop.manager.common.shell.ShellResult;
22+
import org.apache.bigtop.manager.stack.core.exception.StackException;
23+
import org.apache.bigtop.manager.stack.core.spi.param.Params;
24+
import org.apache.bigtop.manager.stack.core.spi.script.AbstractServerScript;
25+
import org.apache.bigtop.manager.stack.core.spi.script.Script;
26+
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;
27+
28+
import com.google.auto.service.AutoService;
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import java.text.MessageFormat;
32+
import java.util.Properties;
33+
34+
@Slf4j
35+
@AutoService(Script.class)
36+
public class JournalNodeScript extends AbstractServerScript {
37+
@Override
38+
public ShellResult add(Params params) {
39+
Properties properties = new Properties();
40+
properties.setProperty(PROPERTY_KEY_SKIP_LEVELS, "1");
41+
42+
return super.add(params, properties);
43+
}
44+
45+
@Override
46+
public ShellResult configure(Params params) {
47+
super.configure(params);
48+
49+
return HadoopSetup.configure(params, getComponentName());
50+
}
51+
52+
@Override
53+
public ShellResult start(Params params) {
54+
configure(params);
55+
HadoopParams hadoopParams = (HadoopParams) params;
56+
57+
String cmd = MessageFormat.format("{0}/hdfs --daemon start journalnode", hadoopParams.binDir());
58+
try {
59+
return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user());
60+
} catch (Exception e) {
61+
throw new StackException(e);
62+
}
63+
}
64+
65+
@Override
66+
public ShellResult stop(Params params) {
67+
HadoopParams hadoopParams = (HadoopParams) params;
68+
String cmd = MessageFormat.format("{0}/hdfs --daemon stop journalnode", hadoopParams.binDir());
69+
try {
70+
return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user());
71+
} catch (Exception e) {
72+
throw new StackException(e);
73+
}
74+
}
75+
76+
@Override
77+
public ShellResult status(Params params) {
78+
HadoopParams hadoopParams = (HadoopParams) params;
79+
return LinuxOSUtils.checkProcess(hadoopParams.getJournalNodePidFile());
80+
}
81+
82+
@Override
83+
public String getComponentName() {
84+
return "journalnode";
85+
}
86+
}

0 commit comments

Comments
 (0)