提交 02cae2a2 编写于 作者: Z Zesong Sun 提交者: Jialin Qiao

[IOTDB-271] Add configuration for HDFS HA (#486)

* Add configuration for HDFS HA
上级 ee0c57f3
......@@ -306,7 +306,7 @@
|名字| hdfs\_ip |
|:---:|:---|
|描述| 在Tsfile和相关数据文件存储到HDFS的情况下用于配置HDFS的IP|
|描述| 在Tsfile和相关数据文件存储到HDFS的情况下用于配置HDFS的IP**如果配置了多于1个hdfs\_ip,则表明启用了Hadoop HA**|
|类型| String |
|默认值|localhost |
|改后生效方式|重启服务器生效|
......@@ -319,3 +319,39 @@
|类型| String |
|默认值|9000 |
|改后生效方式|重启服务器生效|
* dfs\_nameservices
|名字| hdfs\_nameservices |
|:---:|:---|
|描述| 在使用Hadoop HA的情况下用于配置HDFS的nameservices|
|类型| String |
|默认值|hdfsnamespace |
|改后生效方式|重启服务器生效|
* dfs\_ha\_namenodes
|名字| hdfs\_ha\_namenodes |
|:---:|:---|
|描述| 在使用Hadoop HA的情况下用于配置HDFS的nameservices下的namenodes|
|类型| String |
|默认值|nn1,nn2 |
|改后生效方式|重启服务器生效|
* dfs\_ha\_automatic\_failover\_enabled
|名字| dfs\_ha\_automatic\_failover\_enabled |
|:---:|:---|
|描述| 在使用Hadoop HA的情况下用于配置是否使用失败自动切换|
|类型| Boolean |
|默认值|true |
|改后生效方式|重启服务器生效|
* dfs\_client\_failover\_proxy\_provider
|名字| dfs\_client\_failover\_proxy\_provider |
|:---:|:---|
|描述| 在使用Hadoop HA且使用失败自动切换的情况下配置失败自动切换的实现方式|
|类型| String |
|默认值|org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider |
|改后生效方式|重启服务器生效|
\ No newline at end of file
......@@ -54,7 +54,7 @@
|名字| hdfs\_ip |
|:---:|:---|
|描述| 在Tsfile和相关数据文件存储到HDFS的情况下用于配置HDFS的IP|
|描述| 在Tsfile和相关数据文件存储到HDFS的情况下用于配置HDFS的IP**如果配置了多于1个hdfs\_ip,则表明启用了Hadoop HA**|
|类型| String |
|默认值|localhost |
|改后生效方式|重启服务器生效|
......@@ -68,6 +68,41 @@
|默认值|9000 |
|改后生效方式|重启服务器生效|
* dfs\_nameservices
|名字| hdfs\_nameservices |
|:---:|:---|
|描述| 在使用Hadoop HA的情况下用于配置HDFS的nameservices|
|类型| String |
|默认值|hdfsnamespace |
|改后生效方式|重启服务器生效|
* dfs\_ha\_namenodes
|名字| hdfs\_ha\_namenodes |
|:---:|:---|
|描述| 在使用Hadoop HA的情况下用于配置HDFS的nameservices下的namenodes|
|类型| String |
|默认值|nn1,nn2 |
|改后生效方式|重启服务器生效|
* dfs\_ha\_automatic\_failover\_enabled
|名字| dfs\_ha\_automatic\_failover\_enabled |
|:---:|:---|
|描述| 在使用Hadoop HA的情况下用于配置是否使用失败自动切换|
|类型| Boolean |
|默认值|true |
|改后生效方式|重启服务器生效|
* dfs\_client\_failover\_proxy\_provider
|名字| dfs\_client\_failover\_proxy\_provider |
|:---:|:---|
|描述| 在使用Hadoop HA且使用失败自动切换的情况下配置失败自动切换的实现方式|
|类型| String |
|默认值|org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider |
|改后生效方式|重启服务器生效
启动server, Tsfile将会被存储到HDFS上。
......
......@@ -341,7 +341,7 @@ The detail of each variables are as follows:
|Name| hdfs\_ip |
|:---:|:---|
|Description| IP of HDFS if Tsfile and related data files are stored in HDFS|
|Description| IP of HDFS if Tsfile and related data files are stored in HDFS. **If there are more than one hdfs\_ip in configuration, Hadoop HA is used.**|
|Type| String |
|Default|localhost |
|Effective|After restart system|
......@@ -354,3 +354,39 @@ The detail of each variables are as follows:
|Type| String |
|Default|9000 |
|Effective|After restart system|
* dfs\_nameservices
|Name| hdfs\_nameservices |
|:---:|:---|
|Description| Nameservices of HDFS HA if using Hadoop HA|
|Type| String |
|Default|hdfsnamespace |
|Effective|After restart system|
* dfs\_ha\_namenodes
|Name| hdfs\_ha\_namenodes |
|:---:|:---|
|Description| Namenodes under DFS nameservices of HDFS HA if using Hadoop HA|
|Type| String |
|Default|nn1,nn2 |
|Effective|After restart system|
* dfs\_ha\_automatic\_failover\_enabled
|Name| dfs\_ha\_automatic\_failover\_enabled |
|:---:|:---|
|Description| Whether using automatic failover if using Hadoop HA|
|Type| Boolean |
|Default|true |
|Effective|After restart system|
* dfs\_client\_failover\_proxy\_provider
|Name| dfs\_client\_failover\_proxy\_provider |
|:---:|:---|
|Description| Proxy provider if using Hadoop HA and enabling automatic failover|
|Type| String |
|Default|org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider |
|Effective|After restart system|
\ No newline at end of file
......@@ -54,7 +54,7 @@ Edit user config in `iotdb-engine.properties`. Related configurations are:
|Name| hdfs\_ip |
|:---:|:---|
|Description| IP of HDFS if Tsfile and related data files are stored in HDFS|
|Description| IP of HDFS if Tsfile and related data files are stored in HDFS. **If there are more than one hdfs\_ip in configuration, Hadoop HA is used.**|
|Type| String |
|Default|localhost |
|Effective|After restart system|
......@@ -68,6 +68,42 @@ Edit user config in `iotdb-engine.properties`. Related configurations are:
|Default|9000 |
|Effective|After restart system|
* dfs\_nameservices
|Name| hdfs\_nameservices |
|:---:|:---|
|Description| Nameservices of HDFS HA if using Hadoop HA|
|Type| String |
|Default|hdfsnamespace |
|Effective|After restart system|
* dfs\_ha\_namenodes
|Name| hdfs\_ha\_namenodes |
|:---:|:---|
|Description| Namenodes under DFS nameservices of HDFS HA if using Hadoop HA|
|Type| String |
|Default|nn1,nn2 |
|Effective|After restart system|
* dfs\_ha\_automatic\_failover\_enabled
|Name| dfs\_ha\_automatic\_failover\_enabled |
|:---:|:---|
|Description| Whether using automatic failover if using Hadoop HA|
|Type| Boolean |
|Default|true |
|Effective|After restart system|
* dfs\_client\_failover\_proxy\_provider
|Name| dfs\_client\_failover\_proxy\_provider |
|:---:|:---|
|Description| Proxy provider if using Hadoop HA and enabling automatic failover|
|Type| String |
|Default|org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider |
|Effective|After restart system|
Start server, and Tsfile will be stored on HDFS.
If you'd like to reset storage file system to local, just edit configuration `tsfile_storage_fs` to `LOCAL`. In this situation, if you have already had some data files on HDFS, you should either download them to local and move them to your config data file folder (`../server/target/iotdb-server-0.9.0-SNAPSHOT/data/data` by default), or restart your process and import data to IoTDB.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.hadoop.fileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
public class HDFSConfUtil {
private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
public static Configuration setConf(Configuration conf) {
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
// HA configuration
String[] hdfsIps = tsFileConfig.getHdfsIp();
if (hdfsIps.length > 1) {
String dfsNameservices = tsFileConfig.getDfsNameServices();
String[] dfsHaNamenodes = tsFileConfig.getDfsHaNamenodes();
conf.set("dfs.nameservices", dfsNameservices);
conf.set("dfs.ha.namenodes." + dfsNameservices, String.join(",", dfsHaNamenodes));
for (int i = 0; i < dfsHaNamenodes.length; i++) {
conf.set("dfs.namenode.rpc-address." + dfsNameservices + "." + dfsHaNamenodes[i].trim(),
hdfsIps[i] + ":" + tsFileConfig.getHdfsPort());
}
boolean dfsHaAutomaticFailoverEnabled = tsFileConfig.isDfsHaAutomaticFailoverEnabled();
conf.set("dfs.ha.automatic-failover.enabled", String.valueOf(dfsHaAutomaticFailoverEnabled));
if (dfsHaAutomaticFailoverEnabled) {
conf.set("dfs.client.failover.proxy.provider." + dfsNameservices,
tsFileConfig.getDfsClientFailoverProxyProvider());
}
}
return conf;
}
}
......@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -74,10 +76,7 @@ public class HDFSFile extends File {
}
private void setConfAndGetFS() {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
Configuration conf = HDFSConfUtil.setConf(new Configuration());
try {
fs = hdfsPath.getFileSystem(conf);
} catch (IOException e) {
......
......@@ -55,10 +55,7 @@ public class HDFSOutput implements TsFileOutput {
public HDFSOutput(Path path, Configuration configuration, boolean overwrite)
throws IOException {
fs = path.getFileSystem(configuration);
configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
configuration.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
fs = path.getFileSystem(HDFSConfUtil.setConf(configuration));
fsDataOutputStream = fs.exists(path) ? fs.append(path) : fs.create(path, overwrite);
this.path = path;
}
......
......@@ -131,12 +131,25 @@ timestamp_precision=ms
# TSFile storage file system. Currently, Tsfile are supported to be stored in LOCAL file system or HDFS.
tsfile_storage_fs=LOCAL
# If using HDFS, hadoop ip can be configured
# If using HDFS, hadoop ip can be configured. If there are more than one hdfs_ip, Hadoop HA is used
hdfs_ip=localhost
# If using HDFS, hadoop port can be configured
hdfs_port=9000
# If there are more than one hdfs_ip, Hadoop HA is used. Below are configuration for HA
# If using Hadoop HA, nameservices of hdfs can be configured
dfs_nameservices=hdfsnamespace
# If using Hadoop HA, namenodes under dfs nameservices can be configured
dfs_ha_namenodes=nn1,nn2
# If using Hadoop HA, automatic failover can be enabled or disabled
dfs_ha_automatic_failover_enabled=true
# If using Hadoop HA and enabling automatic failover, the proxy provider can be configured
dfs_client_failover_proxy_provider=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
####################
### Memory Control Configuration
####################
......
......@@ -391,6 +391,26 @@ public class IoTDBConfig {
*/
private String hdfsPort = "9000";
/**
* Default DFS NameServices is hdfsnamespace
*/
private String dfsNameServices = "hdfsnamespace";
/**
* Default DFS HA name nodes are nn1 and nn2
*/
private String dfsHaNamenodes = "nn1,nn2";
/**
* Default DFS HA automatic failover is enabled
*/
private boolean dfsHaAutomaticFailoverEnabled = true;
/**
* Default DFS client failover proxy provider is "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
*/
private String dfsClientFailoverProxyProvider = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
/**
* default TTL for storage groups that are not set TTL by statements, in ms
* Notice: if this property is changed, previous created storage group which are not set TTL will
......@@ -430,8 +450,13 @@ public class IoTDBConfig {
}
if (TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS)) {
String hdfsDir = "hdfs://" + TSFileDescriptor.getInstance().getConfig().getHdfsIp() + ":"
+ TSFileDescriptor.getInstance().getConfig().getHdfsPort();
String[] hdfsIps = TSFileDescriptor.getInstance().getConfig().getHdfsIp();
String hdfsDir = "hdfs://";
if (hdfsIps.length > 1) {
hdfsDir += TSFileDescriptor.getInstance().getConfig().getDfsNameServices();
} else {
hdfsDir += hdfsIps[0] + ":" + TSFileDescriptor.getInstance().getConfig().getHdfsPort();
}
for (int i = 5; i < dirs.size(); i++) {
String dir = dirs.get(i);
dir = hdfsDir + File.separatorChar + dir;
......@@ -494,7 +519,7 @@ public class IoTDBConfig {
public void setMetricsPort(int metricsPort) {
this.metricsPort = metricsPort;
}
public String getRpcAddress() {
return rpcAddress;
}
......@@ -903,6 +928,7 @@ public class IoTDBConfig {
public void setMergeFileSelectionTimeBudget(long mergeFileSelectionTimeBudget) {
this.mergeFileSelectionTimeBudget = mergeFileSelectionTimeBudget;
}
public boolean isRpcThriftCompressionEnable() {
return rpcThriftCompressionEnable;
}
......@@ -1077,12 +1103,12 @@ public class IoTDBConfig {
this.tsFileStorageFs = FSType.valueOf(tsFileStorageFs);
}
public String getHdfsIp() {
return hdfsIp;
public String[] getHdfsIp() {
return hdfsIp.split(",");
}
public void setHdfsIp(String hdfsIp) {
this.hdfsIp = hdfsIp;
public void setHdfsIp(String[] hdfsIp) {
this.hdfsIp = String.join(",", hdfsIp);
}
public String getHdfsPort() {
......@@ -1093,6 +1119,38 @@ public class IoTDBConfig {
this.hdfsPort = hdfsPort;
}
public String getDfsNameServices() {
return dfsNameServices;
}
public void setDfsNameServices(String dfsNameServices) {
this.dfsNameServices = dfsNameServices;
}
public String[] getDfsHaNamenodes() {
return dfsHaNamenodes.split(",");
}
public void setDfsHaNamenodes(String[] dfsHaNamenodes) {
this.dfsHaNamenodes = String.join(",", dfsHaNamenodes);
}
public boolean isDfsHaAutomaticFailoverEnabled() {
return dfsHaAutomaticFailoverEnabled;
}
public void setDfsHaAutomaticFailoverEnabled(boolean dfsHaAutomaticFailoverEnabled) {
this.dfsHaAutomaticFailoverEnabled = dfsHaAutomaticFailoverEnabled;
}
public String getDfsClientFailoverProxyProvider() {
return dfsClientFailoverProxyProvider;
}
public void setDfsClientFailoverProxyProvider(String dfsClientFailoverProxyProvider) {
this.dfsClientFailoverProxyProvider = dfsClientFailoverProxyProvider;
}
public long getDefaultTTL() {
return defaultTTL;
}
......
......@@ -18,15 +18,19 @@
*/
package org.apache.iotdb.db.conf;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.time.ZoneId;
import java.util.Properties;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.time.ZoneId;
import java.util.Properties;
public class IoTDBDescriptor {
private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class);
......@@ -297,17 +301,28 @@ public class IoTDBDescriptor {
conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
conf.setTsFileStorageFs(properties.getProperty("tsfile_storage_fs"));
conf.setHdfsIp(properties.getProperty("hdfs_ip"));
conf.setHdfsIp(properties.getProperty("hdfs_ip").split(","));
conf.setHdfsPort(properties.getProperty("hdfs_port"));
conf.setDfsNameServices(properties.getProperty("dfs_nameservices"));
conf.setDfsHaNamenodes(properties.getProperty("dfs_ha_namenodes").split(","));
conf.setDfsHaAutomaticFailoverEnabled(
Boolean.parseBoolean(properties.getProperty("dfs_ha_automatic_failover_enabled")));
conf.setDfsClientFailoverProxyProvider(
properties.getProperty("dfs_client_failover_proxy_provider"));
conf.setDefaultTTL(Long.parseLong(properties.getProperty("default_ttl",
String.valueOf(conf.getDefaultTTL()))));
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance().getConfig()
.setTSFileStorageFs(properties.getProperty("tsfile_storage_fs"));
TSFileDescriptor.getInstance().getConfig().setHdfsIp(properties.getProperty("hdfs_ip"));
TSFileDescriptor.getInstance().getConfig().setHdfsPort(properties.getProperty("hdfs_port"));
TSFileConfig tsFileConfig =TSFileDescriptor.getInstance().getConfig();
tsFileConfig.setTSFileStorageFs(properties.getProperty("tsfile_storage_fs"));
tsFileConfig.setHdfsIp(properties.getProperty("hdfs_ip").split(","));
tsFileConfig.setHdfsPort(properties.getProperty("hdfs_port"));tsFileConfig.setDfsNameServices(properties.getProperty("dfs_nameservices"));
tsFileConfig.setDfsHaNamenodes(properties.getProperty("dfs_ha_namenodes").split(","));
tsFileConfig.setDfsHaAutomaticFailoverEnabled(
Boolean.parseBoolean(properties.getProperty("dfs_ha_automatic_failover_enabled")));
tsFileConfig.setDfsClientFailoverProxyProvider(
properties.getProperty("dfs_client_failover_proxy_provider"));
// set tsfile-format config
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(Integer
......
......@@ -298,6 +298,26 @@ public class TSFileConfig {
*/
private String hdfsPort = "9000";
/**
* Default DFS NameServices is hdfsnamespace
*/
private String dfsNameServices = "hdfsnamespace";
/**
* Default DFS HA name nodes are nn1 and nn2
*/
private String dfsHaNamenodes = "nn1,nn2";
/**
* Default DFS HA automatic failover is enabled
*/
private boolean dfsHaAutomaticFailoverEnabled = true;
/**
* Default DFS client failover proxy provider is "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
*/
private String dfsClientFailoverProxyProvider = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
/**
* The acceptable error rate of bloom filter
*/
......@@ -324,12 +344,12 @@ public class TSFileConfig {
this.TSFileStorageFs = FSType.valueOf(TSFileStorageFs);
}
public String getHdfsIp() {
return this.hdfsIp;
public String[] getHdfsIp() {
return hdfsIp.split(",");
}
public void setHdfsIp(String hdfsIp) {
this.hdfsIp = hdfsIp;
public void setHdfsIp(String[] hdfsIp) {
this.hdfsIp = String.join(",", hdfsIp);
}
public String getHdfsPort() {
......@@ -340,4 +360,36 @@ public class TSFileConfig {
this.hdfsPort = hdfsPort;
}
public String getDfsNameServices() {
return dfsNameServices;
}
public void setDfsNameServices(String dfsNameServices) {
this.dfsNameServices = dfsNameServices;
}
public String[] getDfsHaNamenodes() {
return dfsHaNamenodes.split(",");
}
public void setDfsHaNamenodes(String[] dfsHaNamenodes) {
this.dfsHaNamenodes = String.join(",", dfsHaNamenodes);
}
public boolean isDfsHaAutomaticFailoverEnabled() {
return dfsHaAutomaticFailoverEnabled;
}
public void setDfsHaAutomaticFailoverEnabled(boolean dfsHaAutomaticFailoverEnabled) {
this.dfsHaAutomaticFailoverEnabled = dfsHaAutomaticFailoverEnabled;
}
public String getDfsClientFailoverProxyProvider() {
return dfsClientFailoverProxyProvider;
}
public void setDfsClientFailoverProxyProvider(String dfsClientFailoverProxyProvider) {
this.dfsClientFailoverProxyProvider = dfsClientFailoverProxyProvider;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册