未验证 提交 1513aae3 编写于 作者: I itbasketplayer 提交者: GitHub

add job history to judge application status/2625 (#2848)

* job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)

* job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)

* job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)

Co-authored-by: yuhaibin@lizhi.fm <35716fc5847f6d154cf556296453ca91>
Co-authored-by: Ndailidong <dailidong66@gmail.com>
上级 d00627ff
......@@ -103,6 +103,11 @@ public final class Constants {
*/
public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
/**
* yarn.job.history.status.address
*/
public static final String YARN_JOB_HISTORY_STATUS_ADDRESS = "yarn.job.history.status.address";
/**
* hdfs configuration
* hdfs.root.user
......
......@@ -16,16 +16,16 @@
*/
package org.apache.dolphinscheduler.common.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
......@@ -59,6 +59,7 @@ public class HadoopUtils implements Closeable {
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
......@@ -114,11 +115,11 @@ public class HadoopUtils implements Closeable {
String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
if (resUploadType == ResUploadType.HDFS){
if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
if (resUploadType == ResUploadType.HDFS) {
if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
hdfsUser = "";
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
......@@ -195,7 +196,7 @@ public class HadoopUtils implements Closeable {
*/
String appUrl = "";
//not use resourcemanager
if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){
if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)) {
yarnEnabled = false;
logger.warn("should not step here");
......@@ -212,6 +213,12 @@ public class HadoopUtils implements Closeable {
return String.format(appUrl, applicationId);
}
public String getJobHistoryUrl(String applicationId) {
//eg:application_1587475402360_712719 -> job_1587475402360_712719
String jobId = applicationId.replace("application", "job");
return String.format(jobHistoryAddress, jobId);
}
/**
* cat file on hdfs
*
......@@ -389,9 +396,10 @@ public class HadoopUtils implements Closeable {
/**
* hadoop resourcemanager enabled or not
*
* @return result
*/
public boolean isYarnEnabled() {
public boolean isYarnEnabled() {
return yarnEnabled;
}
......@@ -407,12 +415,22 @@ public class HadoopUtils implements Closeable {
return null;
}
String result = Constants.FAILED;
String applicationUrl = getApplicationUrl(applicationId);
logger.info("applicationUrl={}", applicationUrl);
String responseContent = HttpUtils.get(applicationUrl);
JSONObject jsonObject = JSON.parseObject(responseContent);
String result = jsonObject.getJSONObject("app").getString("finalStatus");
if (responseContent != null) {
JSONObject jsonObject = JSON.parseObject(responseContent);
result = jsonObject.getJSONObject("app").getString("finalStatus");
} else {
//may be in job history
String jobHistoryUrl = getJobHistoryUrl(applicationId);
logger.info("jobHistoryUrl={}", jobHistoryUrl);
responseContent = HttpUtils.get(jobHistoryUrl);
JSONObject jsonObject = JSONObject.parseObject(responseContent);
result = jsonObject.getJSONObject("job").getString("state");
}
switch (result) {
case Constants.ACCEPTED:
......@@ -435,6 +453,7 @@ public class HadoopUtils implements Closeable {
/**
* get data hdfs path
*
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
......@@ -452,7 +471,7 @@ public class HadoopUtils implements Closeable {
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
public static String getHdfsDir(ResourceType resourceType, String tenantCode) {
String hdfsDir = "";
if (resourceType.equals(ResourceType.FILE)) {
hdfsDir = getHdfsResDir(tenantCode);
......@@ -497,16 +516,16 @@ public class HadoopUtils implements Closeable {
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param fileName file name
* @param resourceType resource type
* @param tenantCode tenant code
* @param fileName file name
* @return hdfs file name
*/
public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
if (fileName.startsWith("/")) {
fileName = fileName.replaceFirst("/","");
fileName = fileName.replaceFirst("/", "");
}
return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName);
}
/**
......@@ -518,7 +537,7 @@ public class HadoopUtils implements Closeable {
*/
public static String getHdfsResourceFileName(String tenantCode, String fileName) {
if (fileName.startsWith("/")) {
fileName = fileName.replaceFirst("/","");
fileName = fileName.replaceFirst("/", "");
}
return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
}
......@@ -532,7 +551,7 @@ public class HadoopUtils implements Closeable {
*/
public static String getHdfsUdfFileName(String tenantCode, String fileName) {
if (fileName.startsWith("/")) {
fileName = fileName.replaceFirst("/","");
fileName = fileName.replaceFirst("/", "");
}
return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
}
......
......@@ -18,7 +18,7 @@
# resource storage type : HDFS,S3,NONE
resource.storage.type=NONE
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions"/dolphinscheduler" is recommended
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions."/dolphinscheduler" is recommended
#resource.upload.path=/dolphinscheduler
# user data local directory path, please make sure the directory exists and have read write permissions
......@@ -42,16 +42,16 @@ resource.storage.type=NONE
# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# if resource.storage.type=S3the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://mycluster:8020
# if resource.storage.type=S3s3 endpoint
# if resource.storage.type=S3,s3 endpoint
#fs.s3a.endpoint=http://192.168.199.91:9010
# if resource.storage.type=S3s3 access key
# if resource.storage.type=S3,s3 access key
#fs.s3a.access.key=A3DXS30FO22544RE
# if resource.storage.type=S3s3 secret key
# if resource.storage.type=S3,s3 secret key
#fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
......@@ -59,8 +59,10 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
yarn.job.history.status.address=http://ark1:19888/ws/v1/history/mapreduce/jobs/%s
# system env path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
development.state=false
kerberos.expire.time=7
\ No newline at end of file
kerberos.expire.time=7
......@@ -190,6 +190,12 @@ public class HadoopUtilsTest {
logger.info(application_1516778421218_0042);
}
@Test
public void getJobHistoryUrl(){
String application_1516778421218_0042 = hadoopUtils.getJobHistoryUrl("application_1529051418016_0167");
logger.info(application_1516778421218_0042);
}
@Test
public void catFileWithLimitTest() {
List<String> stringList = new ArrayList<>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册