未验证 提交 45886e46 编写于 作者: journey2018's avatar journey2018 提交者: GitHub

Merge pull request #267 from Baoqi/bwu_s3_storage

close #106 support to store resources in s3a(support aws s3 & minio)
...@@ -230,6 +230,29 @@ ...@@ -230,6 +230,29 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
......
...@@ -34,12 +34,14 @@ import org.slf4j.LoggerFactory; ...@@ -34,12 +34,14 @@ import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static cn.escheduler.common.Constants.*; import static cn.escheduler.common.Constants.*;
import static cn.escheduler.common.utils.PropertyUtils.getInt; import static cn.escheduler.common.utils.PropertyUtils.getInt;
import static cn.escheduler.common.utils.PropertyUtils.getString; import static cn.escheduler.common.utils.PropertyUtils.getString;
import static cn.escheduler.common.utils.PropertyUtils.getPrefixedProperties;
/** /**
* hadoop utils * hadoop utils
...@@ -76,7 +78,9 @@ public class HadoopUtils implements Closeable { ...@@ -76,7 +78,9 @@ public class HadoopUtils implements Closeable {
if(defaultFS.startsWith("file")){ if(defaultFS.startsWith("file")){
String defaultFSProp = getString(FS_DEFAULTFS); String defaultFSProp = getString(FS_DEFAULTFS);
if(StringUtils.isNotBlank(defaultFSProp)){ if(StringUtils.isNotBlank(defaultFSProp)){
Map<String, String> fsRelatedProps = getPrefixedProperties("fs.");
configuration.set(FS_DEFAULTFS,defaultFSProp); configuration.set(FS_DEFAULTFS,defaultFSProp);
fsRelatedProps.entrySet().stream().forEach(entry -> configuration.set(entry.getKey(), entry.getValue()));
}else{ }else{
logger.error("property:{} can not to be empty, please set!"); logger.error("property:{} can not to be empty, please set!");
throw new RuntimeException("property:{} can not to be empty, please set!"); throw new RuntimeException("property:{} can not to be empty, please set!");
...@@ -316,7 +320,13 @@ public class HadoopUtils implements Closeable { ...@@ -316,7 +320,13 @@ public class HadoopUtils implements Closeable {
* @return data hdfs path * @return data hdfs path
*/ */
public static String getHdfsDataBasePath() { public static String getHdfsDataBasePath() {
return getString(DATA_STORE_2_HDFS_BASEPATH); String basePath = getString(DATA_STORE_2_HDFS_BASEPATH);
if ("/".equals(basePath)) {
// if basepath is configured to /, the generated url may be //default/resources (with extra leading /)
return "";
} else {
return basePath;
}
} }
/** /**
...@@ -365,7 +375,7 @@ public class HadoopUtils implements Closeable { ...@@ -365,7 +375,7 @@ public class HadoopUtils implements Closeable {
* @return file directory of tenants on hdfs * @return file directory of tenants on hdfs
*/ */
private static String getHdfsTenantDir(String tenantCode) { private static String getHdfsTenantDir(String tenantCode) {
return String.format("%s/%s", getString(DATA_STORE_2_HDFS_BASEPATH), tenantCode); return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
} }
......
...@@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory; ...@@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import static cn.escheduler.common.Constants.COMMON_PROPERTIES_PATH; import static cn.escheduler.common.Constants.COMMON_PROPERTIES_PATH;
...@@ -189,4 +191,19 @@ public class PropertyUtils { ...@@ -189,4 +191,19 @@ public class PropertyUtils {
String val = getString(key); String val = getString(key);
return val == null ? defaultValue : Enum.valueOf(type, val); return val == null ? defaultValue : Enum.valueOf(type, val);
} }
/**
* get all properties with specified prefix, like: fs.
* @param prefix prefix to search
* @return
*/
public static Map<String, String> getPrefixedProperties(String prefix) {
Map<String, String> matchedProperties = new HashMap<>();
for (String propName : properties.stringPropertyNames()) {
if (propName.startsWith(prefix)) {
matchedProperties.put(propName, properties.getProperty(propName));
}
}
return matchedProperties;
}
} }
...@@ -323,6 +323,11 @@ ...@@ -323,6 +323,11 @@
<artifactId>hadoop-yarn-common</artifactId> <artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version> <version>${hadoop.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency> <dependency>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册