未验证 提交 ace907e7 编写于 作者: D dailidong 提交者: GitHub

[refactor-worker] simplify master、 worker、alert、dao properties (#2304)

* update logback

* update log

* refactor worker registry (#2107)

* Refactor worker (#2115)

* refactor worker registry

* refactor master server

* Modify workgroupid parameter name (#2105)

* Delete worker group management page

* Modify workgroupid parameter name

* Refactor worker (#2121)

* refactor worker registry

* refactor master server

* refactor MasterSchedulerService

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>

* not exist in openjdk,just delete

* add master and worker properties

* add master and worker properties

* add master and worker properties

* fix cpu 100% bug

* simplify master、 worker、alert、dao properties

* add master and worker properties

* add master and worker properties

* add master and worker properties
Co-authored-by: NTboy <guo.jiwei@immomo.com>
Co-authored-by: Nbreak60 <790061044@qq.com>
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 f112415b
...@@ -55,7 +55,7 @@ public class MailUtils { ...@@ -55,7 +55,7 @@ public class MailUtils {
public static final Boolean mailUseSSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE); public static final Boolean mailUseSSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE);
public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH); public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH,"/tmp/xls");
public static final String starttlsEnable = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE); public static final String starttlsEnable = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE);
......
...@@ -79,6 +79,18 @@ public class PropertyUtils { ...@@ -79,6 +79,18 @@ public class PropertyUtils {
return properties.getProperty(key.trim()); return properties.getProperty(key.trim());
} }
/**
* get property value
*
* @param key property name
* @param defaultVal default value
* @return property value
*/
public static String getString(String key, String defaultVal) {
String val = properties.getProperty(key.trim());
return val == null ? defaultVal : val;
}
/** /**
* get property value * get property value
* *
......
...@@ -36,18 +36,18 @@ mail.smtp.ssl.enable=false ...@@ -36,18 +36,18 @@ mail.smtp.ssl.enable=false
mail.smtp.ssl.trust=xxx.xxx.com mail.smtp.ssl.trust=xxx.xxx.com
#xls file path,need create if not exist #xls file path,need create if not exist
xls.file.path=/tmp/xls #xls.file.path=/tmp/xls
# Enterprise WeChat configuration # Enterprise WeChat configuration
enterprise.wechat.enable=false enterprise.wechat.enable=false
enterprise.wechat.corp.id=xxxxxxx #enterprise.wechat.corp.id=xxxxxxx
enterprise.wechat.secret=xxxxxxx #enterprise.wechat.secret=xxxxxxx
enterprise.wechat.agent.id=xxxxxxx #enterprise.wechat.agent.id=xxxxxxx
enterprise.wechat.users=xxxxxxx #enterprise.wechat.users=xxxxxxx
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret #enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token #enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} #enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}} #enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}
...@@ -25,9 +25,43 @@ import java.util.regex.Pattern; ...@@ -25,9 +25,43 @@ import java.util.regex.Pattern;
* Constants * Constants
*/ */
public final class Constants { public final class Constants {
private Constants() { private Constants() {
throw new IllegalStateException("Constants class"); throw new IllegalStateException("Constants class");
} }
/**
* quartz config
*/
public static final String ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS = "org.quartz.jobStore.driverDelegateClass";
public static final String ORG_QUARTZ_SCHEDULER_INSTANCENAME = "org.quartz.scheduler.instanceName";
public static final String ORG_QUARTZ_SCHEDULER_INSTANCEID = "org.quartz.scheduler.instanceId";
public static final String ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON = "org.quartz.scheduler.makeSchedulerThreadDaemon";
public static final String ORG_QUARTZ_JOBSTORE_USEPROPERTIES = "org.quartz.jobStore.useProperties";
public static final String ORG_QUARTZ_THREADPOOL_CLASS = "org.quartz.threadPool.class";
public static final String ORG_QUARTZ_THREADPOOL_THREADCOUNT = "org.quartz.threadPool.threadCount";
public static final String ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS = "org.quartz.threadPool.makeThreadsDaemons";
public static final String ORG_QUARTZ_THREADPOOL_THREADPRIORITY = "org.quartz.threadPool.threadPriority";
public static final String ORG_QUARTZ_JOBSTORE_CLASS = "org.quartz.jobStore.class";
public static final String ORG_QUARTZ_JOBSTORE_TABLEPREFIX = "org.quartz.jobStore.tablePrefix";
public static final String ORG_QUARTZ_JOBSTORE_ISCLUSTERED = "org.quartz.jobStore.isClustered";
public static final String ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD = "org.quartz.jobStore.misfireThreshold";
public static final String ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL = "org.quartz.jobStore.clusterCheckinInterval";
public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE = "org.quartz.jobStore.dataSource";
public static final String ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS = "org.quartz.dataSource.myDs.connectionProvider.class";
/**
* quartz config default value
*/
public static final String QUARTZ_TABLE_PREFIX = "QRTZ_";
public static final String QUARTZ_MISFIRETHRESHOLD = "60000";
public static final String QUARTZ_CLUSTERCHECKININTERVAL = "5000";
public static final String QUARTZ_DATASOURCE = "myDs";
public static final String QUARTZ_THREADCOUNT = "25";
public static final String QUARTZ_THREADPRIORITY = "5";
public static final String QUARTZ_INSTANCENAME = "DolphinScheduler";
public static final String QUARTZ_INSTANCEID = "AUTO";
/** /**
* common properties path * common properties path
*/ */
...@@ -56,9 +90,11 @@ public final class Constants { ...@@ -56,9 +90,11 @@ public final class Constants {
/** /**
* yarn.resourcemanager.ha.rm.idsfs.defaultFS * yarn.resourcemanager.ha.rm.ids
*/ */
public static final String YARN_RESOURCEMANAGER_HA_RM_IDS = "yarn.resourcemanager.ha.rm.ids"; public static final String YARN_RESOURCEMANAGER_HA_RM_IDS = "yarn.resourcemanager.ha.rm.ids";
public static final String YARN_RESOURCEMANAGER_HA_XX = "xx";
/** /**
* yarn.application.status.address * yarn.application.status.address
...@@ -72,31 +108,25 @@ public final class Constants { ...@@ -72,31 +108,25 @@ public final class Constants {
public static final String HDFS_ROOT_USER = "hdfs.root.user"; public static final String HDFS_ROOT_USER = "hdfs.root.user";
/** /**
* hdfs configuration * hdfs/s3 configuration
* data.store2hdfs.basepath * resource.upload.path
*/ */
public static final String DATA_STORE_2_HDFS_BASEPATH = "data.store2hdfs.basepath"; public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
/** /**
* data.basedir.path * data basedir path
*/ */
public static final String DATA_BASEDIR_PATH = "data.basedir.path"; public static final String DATA_BASEDIR_PATH = "data.basedir.path";
/**
* data.download.basedir.path
*/
public static final String DATA_DOWNLOAD_BASEDIR_PATH = "data.download.basedir.path";
/**
* process.exec.basepath
*/
public static final String PROCESS_EXEC_BASEPATH = "process.exec.basepath";
/** /**
* dolphinscheduler.env.path * dolphinscheduler.env.path
*/ */
public static final String DOLPHINSCHEDULER_ENV_PATH = "dolphinscheduler.env.path"; public static final String DOLPHINSCHEDULER_ENV_PATH = "dolphinscheduler.env.path";
/**
* environment properties default path
*/
public static final String ENV_PATH = "env/dolphinscheduler_env.sh";
/** /**
* python home * python home
...@@ -108,15 +138,23 @@ public final class Constants { ...@@ -108,15 +138,23 @@ public final class Constants {
*/ */
public static final String RESOURCE_VIEW_SUFFIXS = "resource.view.suffixs"; public static final String RESOURCE_VIEW_SUFFIXS = "resource.view.suffixs";
public static final String RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE = "txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties";
/** /**
* development.state * development.state
*/ */
public static final String DEVELOPMENT_STATE = "development.state"; public static final String DEVELOPMENT_STATE = "development.state";
public static final String DEVELOPMENT_STATE_DEFAULT_VALUE = "true";
/**
* string true
*/
public static final String STRING_TRUE = "true";
/** /**
* res.upload.startup.type * resource storage type
*/ */
public static final String RES_UPLOAD_STARTUP_TYPE = "res.upload.startup.type"; public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type";
/** /**
* MasterServer directory registered in zookeeper * MasterServer directory registered in zookeeper
...@@ -346,9 +384,9 @@ public final class Constants { ...@@ -346,9 +384,9 @@ public final class Constants {
public static final String FLOWNODE_RUN_FLAG_FORBIDDEN = "FORBIDDEN"; public static final String FLOWNODE_RUN_FLAG_FORBIDDEN = "FORBIDDEN";
/** /**
* task record configuration path * datasource configuration path
*/ */
public static final String APPLICATION_PROPERTIES = "application.properties"; public static final String DATASOURCE_PROPERTIES = "/datasource.properties";
public static final String TASK_RECORD_URL = "task.record.datasource.url"; public static final String TASK_RECORD_URL = "task.record.datasource.url";
......
...@@ -20,13 +20,18 @@ import org.apache.dolphinscheduler.common.Constants; ...@@ -20,13 +20,18 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.net.URL;
/** /**
* common utils * common utils
*/ */
public class CommonUtils { public class CommonUtils {
private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
private CommonUtils() { private CommonUtils() {
throw new IllegalStateException("CommonUtils class"); throw new IllegalStateException("CommonUtils class");
} }
...@@ -37,7 +42,14 @@ public class CommonUtils { ...@@ -37,7 +42,14 @@ public class CommonUtils {
public static String getSystemEnvPath() { public static String getSystemEnvPath() {
String envPath = PropertyUtils.getString(Constants.DOLPHINSCHEDULER_ENV_PATH); String envPath = PropertyUtils.getString(Constants.DOLPHINSCHEDULER_ENV_PATH);
if (StringUtils.isEmpty(envPath)) { if (StringUtils.isEmpty(envPath)) {
envPath = System.getProperty("user.home") + File.separator + ".bash_profile"; URL envDefaultPath = CommonUtils.class.getClassLoader().getResource(Constants.ENV_PATH);
if (envDefaultPath != null){
envPath = envDefaultPath.getPath();
logger.debug("env path :{}", envPath);
}else{
envPath = System.getProperty("user.home") + File.separator + ".bash_profile";
}
} }
return envPath; return envPath;
...@@ -55,7 +67,7 @@ public class CommonUtils { ...@@ -55,7 +67,7 @@ public class CommonUtils {
* @return is develop mode * @return is develop mode
*/ */
public static boolean isDevelopMode() { public static boolean isDevelopMode() {
return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE); return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE, true);
} }
...@@ -65,9 +77,9 @@ public class CommonUtils { ...@@ -65,9 +77,9 @@ public class CommonUtils {
* @return true if upload resource is HDFS and kerberos startup * @return true if upload resource is HDFS and kerberos startup
*/ */
public static boolean getKerberosStartupState(){ public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false);
return resUploadType == ResUploadType.HDFS && kerberosStartupState; return resUploadType == ResUploadType.HDFS && kerberosStartupState;
} }
......
...@@ -34,6 +34,8 @@ import static org.apache.dolphinscheduler.common.Constants.*; ...@@ -34,6 +34,8 @@ import static org.apache.dolphinscheduler.common.Constants.*;
public class FileUtils { public class FileUtils {
public static final Logger logger = LoggerFactory.getLogger(FileUtils.class); public static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler");
/** /**
* get file suffix * get file suffix
* *
...@@ -59,7 +61,14 @@ public class FileUtils { ...@@ -59,7 +61,14 @@ public class FileUtils {
* @return download file name * @return download file name
*/ */
public static String getDownloadFilename(String filename) { public static String getDownloadFilename(String filename) {
return String.format("%s/%s/%s", PropertyUtils.getString(DATA_DOWNLOAD_BASEDIR_PATH), DateUtils.getCurrentTime(YYYYMMDDHHMMSS), filename); String fileName = String.format("%s/download/%s/%s", DATA_BASEDIR, DateUtils.getCurrentTime(YYYYMMDDHHMMSS), filename);
File file = new File(fileName);
if (!file.getParentFile().exists()){
file.getParentFile().mkdirs();
}
return fileName;
} }
/** /**
...@@ -70,7 +79,13 @@ public class FileUtils { ...@@ -70,7 +79,13 @@ public class FileUtils {
* @return local file path * @return local file path
*/ */
public static String getUploadFilename(String tenantCode, String filename) { public static String getUploadFilename(String tenantCode, String filename) {
return String.format("%s/%s/resources/%s", PropertyUtils.getString(DATA_BASEDIR_PATH), tenantCode, filename); String fileName = String.format("%s/%s/resources/%s", DATA_BASEDIR, tenantCode, filename);
File file = new File(fileName);
if (!file.getParentFile().exists()){
file.getParentFile().mkdirs();
}
return fileName;
} }
/** /**
...@@ -82,9 +97,14 @@ public class FileUtils { ...@@ -82,9 +97,14 @@ public class FileUtils {
* @return directory of process execution * @return directory of process execution
*/ */
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) { public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) {
String fileName = String.format("%s/exec/process/%s/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
return String.format("%s/process/%s/%s/%s/%s", PropertyUtils.getString(PROCESS_EXEC_BASEPATH), Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId),Integer.toString(taskInstanceId)); Integer.toString(processDefineId), Integer.toString(processInstanceId),Integer.toString(taskInstanceId));
File file = new File(fileName);
if (!file.getParentFile().exists()){
file.getParentFile().mkdirs();
}
return fileName;
} }
/** /**
...@@ -95,15 +115,21 @@ public class FileUtils { ...@@ -95,15 +115,21 @@ public class FileUtils {
* @return directory of process instances * @return directory of process instances
*/ */
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId) { public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId) {
return String.format("%s/process/%s/%s/%s", PropertyUtils.getString(PROCESS_EXEC_BASEPATH), Integer.toString(projectId), String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId)); Integer.toString(processDefineId), Integer.toString(processInstanceId));
File file = new File(fileName);
if (!file.getParentFile().exists()){
file.getParentFile().mkdirs();
}
return fileName;
} }
/** /**
* @return get suffixes for resource files that support online viewing * @return get suffixes for resource files that support online viewing
*/ */
public static String getResourceViewSuffixs() { public static String getResourceViewSuffixs() {
return PropertyUtils.getString(RESOURCE_VIEW_SUFFIXS); return PropertyUtils.getString(RESOURCE_VIEW_SUFFIXS, RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE);
} }
/** /**
......
...@@ -38,6 +38,8 @@ import java.util.Map; ...@@ -38,6 +38,8 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;
/** /**
* hadoop utils * hadoop utils
* single instance * single instance
...@@ -47,8 +49,11 @@ public class HadoopUtils implements Closeable { ...@@ -47,8 +49,11 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER); private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH,"/dolphinscheduler");
private static volatile HadoopUtils instance = new HadoopUtils(); private static volatile HadoopUtils instance = new HadoopUtils();
private static volatile Configuration configuration; private static volatile Configuration configuration;
private static volatile boolean yarnEnabled = false;
private static FileSystem fs; private static FileSystem fs;
...@@ -72,8 +77,7 @@ public class HadoopUtils implements Closeable { ...@@ -72,8 +77,7 @@ public class HadoopUtils implements Closeable {
* init dolphinscheduler root path in hdfs * init dolphinscheduler root path in hdfs
*/ */
private void initHdfsPath(){ private void initHdfsPath(){
String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH); Path path = new Path(resourceUploadPath);
Path path = new Path(hdfsPath);
try { try {
if (!fs.exists(path)) { if (!fs.exists(path)) {
...@@ -95,11 +99,11 @@ public class HadoopUtils implements Closeable { ...@@ -95,11 +99,11 @@ public class HadoopUtils implements Closeable {
try { try {
configuration = new Configuration(); configuration = new Configuration();
String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
if (resUploadType == ResUploadType.HDFS){ if (resUploadType == ResUploadType.HDFS){
if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){ if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)); PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos"); configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
...@@ -151,14 +155,28 @@ public class HadoopUtils implements Closeable { ...@@ -151,14 +155,28 @@ public class HadoopUtils implements Closeable {
fs = FileSystem.get(configuration); fs = FileSystem.get(configuration);
} }
/**
* if rmHaIds includes xx, it signs not use resourcemanager
* otherwise:
* if rmHaIds is empty, single resourcemanager enabled
* if rmHaIds not empty: resourcemanager HA enabled
*/
String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
if (!StringUtils.isEmpty(rmHaIds)) { //not use resourcemanager
if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){
yarnEnabled = false;
} else if (!StringUtils.isEmpty(rmHaIds)) {
//resourcemanager HA enabled
appAddress = getAppAddress(appAddress, rmHaIds); appAddress = getAppAddress(appAddress, rmHaIds);
yarnEnabled = true;
logger.info("appAddress : {}", appAddress); logger.info("appAddress : {}", appAddress);
} else {
//single resourcemanager enabled
yarnEnabled = true;
} }
configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress); configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
...@@ -361,6 +379,16 @@ public class HadoopUtils implements Closeable { ...@@ -361,6 +379,16 @@ public class HadoopUtils implements Closeable {
return fs.rename(new Path(src), new Path(dst)); return fs.rename(new Path(src), new Path(dst));
} }
/**
*
* haddop resourcemanager enabled or not
*
* @return true if haddop resourcemanager enabled
* @throws IOException errors
*/
public boolean isYarnEnabled() {
return yarnEnabled;
}
/** /**
* get the state of an application * get the state of an application
...@@ -405,12 +433,11 @@ public class HadoopUtils implements Closeable { ...@@ -405,12 +433,11 @@ public class HadoopUtils implements Closeable {
* @return data hdfs path * @return data hdfs path
*/ */
public static String getHdfsDataBasePath() { public static String getHdfsDataBasePath() {
String basePath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH); if ("/".equals(resourceUploadPath)) {
if ("/".equals(basePath)) {
// if basepath is configured to /, the generated url may be //default/resources (with extra leading /) // if basepath is configured to /, the generated url may be //default/resources (with extra leading /)
return ""; return "";
} else { } else {
return basePath; return resourceUploadPath;
} }
} }
......
...@@ -74,7 +74,7 @@ public class PropertyUtils { ...@@ -74,7 +74,7 @@ public class PropertyUtils {
* @return judge whether resource upload startup * @return judge whether resource upload startup
*/ */
public static Boolean getResUploadStartupState(){ public static Boolean getResUploadStartupState(){
String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
return resUploadType == ResUploadType.HDFS || resUploadType == ResUploadType.S3; return resUploadType == ResUploadType.HDFS || resUploadType == ResUploadType.S3;
} }
...@@ -89,6 +89,18 @@ public class PropertyUtils { ...@@ -89,6 +89,18 @@ public class PropertyUtils {
return properties.getProperty(key.trim()); return properties.getProperty(key.trim());
} }
/**
* get property value
*
* @param key property name
* @param defaultVal default value
* @return property value
*/
public static String getString(String key, String defaultVal) {
String val = properties.getProperty(key.trim());
return val == null ? defaultVal : val;
}
/** /**
* get property value * get property value
* *
...@@ -134,6 +146,22 @@ public class PropertyUtils { ...@@ -134,6 +146,22 @@ public class PropertyUtils {
return false; return false;
} }
/**
* get property value
*
* @param key property name
* @param defaultValue default value
* @return property value
*/
public static Boolean getBoolean(String key, boolean defaultValue) {
String value = properties.getProperty(key.trim());
if(null != value){
return Boolean.parseBoolean(value);
}
return defaultValue;
}
/** /**
* get property long value * get property long value
* @param key key * @param key key
......
...@@ -15,80 +15,53 @@ ...@@ -15,80 +15,53 @@
# limitations under the License. # limitations under the License.
# #
#task queue implementation, default "zookeeper" # task queue implementation, default "zookeeper" TODO
dolphinscheduler.queue.impl=zookeeper dolphinscheduler.queue.impl=zookeeper
#zookeeper cluster. multiple are separated by commas. eg. 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 # resource storage type : HDFS,S3,NONE
zookeeper.quorum=localhost:2181 resource.storage.type=HDFS
#dolphinscheduler root directory # resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
zookeeper.dolphinscheduler.root=/dolphinscheduler #resource.upload.path=/dolphinscheduler
#dolphinscheduler failover directory
zookeeper.session.timeout=300
zookeeper.connection.timeout=300
zookeeper.retry.base.sleep=100
zookeeper.retry.max.sleep=30000
zookeeper.retry.maxtime=5
# resource upload startup type : HDFS,S3,NONE
res.upload.startup.type=NONE
# Users who have permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
data.store2hdfs.basepath=/dolphinscheduler
# user data directory path, self configuration, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler
# directory path for user data download. self configuration, please make sure the directory exists and have read write permissions
data.download.basedir.path=/tmp/dolphinscheduler/download
# process execute directory. self configuration, please make sure the directory exists and have read write permissions
process.exec.basepath=/tmp/dolphinscheduler/exec
# user data local directory path, please make sure the directory exists and have read write permissions
#data.basedir.path=/tmp/dolphinscheduler
# whether kerberos starts # whether kerberos starts
hadoop.security.authentication.startup.state=false #hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path # java.security.krb5.conf path
java.security.krb5.conf.path=/opt/krb5.conf #java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user # loginUserFromKeytab user
login.user.keytab.username=hdfs-mycluster@ESZ.COM #login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path # loginUserFromKeytab path
login.user.keytab.path=/opt/hdfs.headless.keytab #login.user.keytab.path=/opt/hdfs.headless.keytab
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
dolphinscheduler.env.path=/opt/dolphinscheduler_env.sh
#resource.view.suffixs #resource.view.suffixs
resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties #resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
# is development state? default "false"
development.state=true
# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# ha or single namenode,If namenode ha needs to copy core-site.xml and hdfs-site.xml # if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
# to the conf directory,support s3,for example : s3a://dolphinscheduler fs.defaultFS=hdfs://l:8020
fs.defaultFS=hdfs://mycluster:8020
# s3 need,s3 endpoint # if resource.storage.type=S3,s3 endpoint
fs.s3a.endpoint=http://192.168.199.91:9010 #fs.s3a.endpoint=http://192.168.199.91:9010
# s3 need,s3 access key # if resource.storage.type=S3,s3 access key
fs.s3a.access.key=A3DXS30FO22544RE #fs.s3a.access.key=A3DXS30FO22544RE
# s3 need,s3 secret key # if resource.storage.type=S3,s3 secret key
fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK #fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
#resourcemanager ha note this need ips , this empty if single # if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty TODO
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine # If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions, TODO
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
...@@ -22,9 +22,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; ...@@ -22,9 +22,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskRecord; import org.apache.dolphinscheduler.dao.entity.TaskRecord;
import org.apache.commons.configuration.Configuration; import org.apache.dolphinscheduler.dao.utils.PropertyUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -42,26 +40,12 @@ public class TaskRecordDao { ...@@ -42,26 +40,12 @@ public class TaskRecordDao {
private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName());
/**
* load conf
*/
private static Configuration conf;
static {
try {
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
}catch (ConfigurationException e){
logger.error("load configuration exception",e);
System.exit(1);
}
}
/** /**
* get task record flag * get task record flag
* @return whether startup taskrecord * @return whether startup taskrecord
*/ */
public static boolean getTaskRecordFlag(){ public static boolean getTaskRecordFlag(){
return conf.getBoolean(Constants.TASK_RECORD_FLAG,false); return PropertyUtils.getBoolean(Constants.TASK_RECORD_FLAG,false);
} }
/** /**
* create connection * create connection
...@@ -72,9 +56,9 @@ public class TaskRecordDao { ...@@ -72,9 +56,9 @@ public class TaskRecordDao {
return null; return null;
} }
String driver = "com.mysql.jdbc.Driver"; String driver = "com.mysql.jdbc.Driver";
String url = conf.getString(Constants.TASK_RECORD_URL); String url = PropertyUtils.getString(Constants.TASK_RECORD_URL);
String username = conf.getString(Constants.TASK_RECORD_USER); String username = PropertyUtils.getString(Constants.TASK_RECORD_USER);
String password = conf.getString(Constants.TASK_RECORD_PWD); String password = PropertyUtils.getString(Constants.TASK_RECORD_PWD);
Connection conn = null; Connection conn = null;
try { try {
//classLoader,load driver //classLoader,load driver
......
...@@ -58,32 +58,7 @@ public class ConnectionFactory extends SpringConnectionFactory{ ...@@ -58,32 +58,7 @@ public class ConnectionFactory extends SpringConnectionFactory{
*/ */
public static DruidDataSource getDataSource() { public static DruidDataSource getDataSource() {
DruidDataSource druidDataSource = new DruidDataSource(); DruidDataSource druidDataSource = dataSource();
druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL));
druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME));
druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD));
druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY));
druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS));
druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE));
druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW));
druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN));
druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE));
druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE));
druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE));
druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT));
druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE));
druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE));
druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS));
druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS));
druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS));
druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT));
//auto commit
druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT));
return druidDataSource; return druidDataSource;
} }
......
...@@ -25,6 +25,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; ...@@ -25,6 +25,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.utils.PropertyUtils;
import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.JdbcType; import org.apache.ibatis.type.JdbcType;
...@@ -48,19 +49,6 @@ public class SpringConnectionFactory { ...@@ -48,19 +49,6 @@ public class SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringConnectionFactory.class); private static final Logger logger = LoggerFactory.getLogger(SpringConnectionFactory.class);
/**
* Load configuration file
*/
protected static org.apache.commons.configuration.Configuration conf;
static {
try {
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
} catch (ConfigurationException e) {
logger.error("load configuration exception", e);
System.exit(1);
}
}
/** /**
* pagination interceptor * pagination interceptor
...@@ -76,33 +64,33 @@ public class SpringConnectionFactory { ...@@ -76,33 +64,33 @@ public class SpringConnectionFactory {
* @return druid dataSource * @return druid dataSource
*/ */
@Bean(destroyMethod="") @Bean(destroyMethod="")
public DruidDataSource dataSource() { public static DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource(); DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME)); druidDataSource.setDriverClassName(PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL)); druidDataSource.setUrl(PropertyUtils.getString(Constants.SPRING_DATASOURCE_URL));
druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME)); druidDataSource.setUsername(PropertyUtils.getString(Constants.SPRING_DATASOURCE_USERNAME));
druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD)); druidDataSource.setPassword(PropertyUtils.getString(Constants.SPRING_DATASOURCE_PASSWORD));
druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY)); druidDataSource.setValidationQuery(PropertyUtils.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY,"SELECT 1"));
druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS)); druidDataSource.setPoolPreparedStatements(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS,true));
druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE)); druidDataSource.setTestWhileIdle(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE,true));
druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW)); druidDataSource.setTestOnBorrow(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW,true));
druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN)); druidDataSource.setTestOnReturn(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN,true));
druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE)); druidDataSource.setKeepAlive(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE,true));
druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE)); druidDataSource.setMinIdle(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE,5));
druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE)); druidDataSource.setMaxActive(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE,50));
druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT)); druidDataSource.setMaxWait(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT,60000));
druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE)); druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE,20));
druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE)); druidDataSource.setInitialSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE,5));
druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS)); druidDataSource.setTimeBetweenEvictionRunsMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS,60000));
druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS)); druidDataSource.setTimeBetweenConnectErrorMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS,60000));
druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS)); druidDataSource.setMinEvictableIdleTimeMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS,300000));
druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT)); druidDataSource.setValidationQueryTimeout(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT,3));
//auto commit //auto commit
druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT)); druidDataSource.setDefaultAutoCommit(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT,true));
return druidDataSource; return druidDataSource;
} }
......
...@@ -49,7 +49,7 @@ public class PropertyUtils { ...@@ -49,7 +49,7 @@ public class PropertyUtils {
* init * init
*/ */
private void init(){ private void init(){
String[] propertyFiles = new String[]{Constants.APPLICATION_PROPERTIES}; String[] propertyFiles = new String[]{Constants.DATASOURCE_PROPERTIES};
for (String fileName : propertyFiles) { for (String fileName : propertyFiles) {
InputStream fis = null; InputStream fis = null;
try { try {
...@@ -77,6 +77,17 @@ public class PropertyUtils { ...@@ -77,6 +77,17 @@ public class PropertyUtils {
return properties.getProperty(key); return properties.getProperty(key);
} }
/**
* get property value
*
* @param key property name
* @param defaultVal default value
* @return property value
*/
public static String getString(String key, String defaultVal) {
String val = properties.getProperty(key.trim());
return val == null ? defaultVal : val;
}
/** /**
* get property value * get property value
...@@ -106,4 +117,46 @@ public class PropertyUtils { ...@@ -106,4 +117,46 @@ public class PropertyUtils {
} }
return defaultValue; return defaultValue;
} }
/**
* get property value
*
* @param key property name
* @return property value
*/
public static Boolean getBoolean(String key) {
String value = properties.getProperty(key.trim());
if(null != value){
return Boolean.parseBoolean(value);
}
return false;
}
/**
* get property value
*
* @param key property name
* @param defaultValue default value
* @return property value
*/
public static Boolean getBoolean(String key, boolean defaultValue) {
String value = properties.getProperty(key.trim());
if(null != value){
return Boolean.parseBoolean(value);
}
return defaultValue;
}
/**
* get property long value
* @param key key
* @param defaultVal default value
* @return property value
*/
public static long getLong(String key, long defaultVal) {
String val = getString(key);
return val == null ? defaultVal : Long.parseLong(val);
}
} }
...@@ -15,59 +15,61 @@ ...@@ -15,59 +15,61 @@
# limitations under the License. # limitations under the License.
# #
# base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# postgre # postgre
spring.datasource.driver-class-name=org.postgresql.Driver #spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler #spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
# mysql # mysql
#spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=test spring.datasource.username=root
spring.datasource.password=test spring.datasource.password=root@123
## base spring data source configuration todo need to remove
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# connection configuration # connection configuration
spring.datasource.initialSize=5 #spring.datasource.initialSize=5
# min connection number # min connection number
spring.datasource.minIdle=5 #spring.datasource.minIdle=5
# max connection number # max connection number
spring.datasource.maxActive=50 #spring.datasource.maxActive=50
# max wait time for get a connection in milliseconds. if configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases. # max wait time for get a connection in milliseconds. if configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases.
# If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true. # If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true.
spring.datasource.maxWait=60000 #spring.datasource.maxWait=60000
# milliseconds for check to close free connections # milliseconds for check to close free connections
spring.datasource.timeBetweenEvictionRunsMillis=60000 #spring.datasource.timeBetweenEvictionRunsMillis=60000
# the Destroy thread detects the connection interval and closes the physical connection in milliseconds if the connection idle time is greater than or equal to minEvictableIdleTimeMillis. # the Destroy thread detects the connection interval and closes the physical connection in milliseconds if the connection idle time is greater than or equal to minEvictableIdleTimeMillis.
spring.datasource.timeBetweenConnectErrorMillis=60000 #spring.datasource.timeBetweenConnectErrorMillis=60000
# the longest time a connection remains idle without being evicted, in milliseconds # the longest time a connection remains idle without being evicted, in milliseconds
spring.datasource.minEvictableIdleTimeMillis=300000 #spring.datasource.minEvictableIdleTimeMillis=300000
#the SQL used to check whether the connection is valid requires a query statement. If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work. #the SQL used to check whether the connection is valid requires a query statement. If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work.
spring.datasource.validationQuery=SELECT 1 #spring.datasource.validationQuery=SELECT 1
#check whether the connection is valid for timeout, in seconds #check whether the connection is valid for timeout, in seconds
spring.datasource.validationQueryTimeout=3 #spring.datasource.validationQueryTimeout=3
# when applying for a connection, if it is detected that the connection is idle longer than time Between Eviction Runs Millis, # when applying for a connection, if it is detected that the connection is idle longer than time Between Eviction Runs Millis,
# validation Query is performed to check whether the connection is valid # validation Query is performed to check whether the connection is valid
spring.datasource.testWhileIdle=true #spring.datasource.testWhileIdle=true
#execute validation to check if the connection is valid when applying for a connection #execute validation to check if the connection is valid when applying for a connection
spring.datasource.testOnBorrow=true #spring.datasource.testOnBorrow=true
#execute validation to check if the connection is valid when the connection is returned #execute validation to check if the connection is valid when the connection is returned
spring.datasource.testOnReturn=false #spring.datasource.testOnReturn=false
spring.datasource.defaultAutoCommit=true #spring.datasource.defaultAutoCommit=true
spring.datasource.keepAlive=true #spring.datasource.keepAlive=true
# open PSCache, specify count PSCache for every connection # open PSCache, specify count PSCache for every connection
spring.datasource.poolPreparedStatements=true #spring.datasource.poolPreparedStatements=true
spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 #spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
spring.datasource.spring.datasource.filters=stat,wall,log4j #spring.datasource.filters=stat,wall,log4j
spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 #spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.consumer; package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
......
...@@ -16,13 +16,20 @@ ...@@ -16,13 +16,20 @@
*/ */
package org.apache.dolphinscheduler.service.quartz; package org.apache.dolphinscheduler.service.quartz;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.quartz.*; import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -30,6 +37,7 @@ import java.util.*; ...@@ -30,6 +37,7 @@ import java.util.*;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob; import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger; import static org.quartz.TriggerBuilder.newTrigger;
...@@ -59,7 +67,21 @@ public class QuartzExecutors { ...@@ -59,7 +67,21 @@ public class QuartzExecutors {
*/ */
private static volatile QuartzExecutors INSTANCE = null; private static volatile QuartzExecutors INSTANCE = null;
private QuartzExecutors() {} /**
* load conf
*/
private static Configuration conf;
static {
try {
conf = new PropertiesConfiguration(Constants.QUARTZ_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.warn("not loaded quartz configuration file, will used default value",e);
}
}
private QuartzExecutors() {
}
/** /**
* thread safe and performance promote * thread safe and performance promote
...@@ -87,7 +109,32 @@ public class QuartzExecutors { ...@@ -87,7 +109,32 @@ public class QuartzExecutors {
*/ */
private void init() { private void init() {
try { try {
SchedulerFactory schedulerFactory = new StdSchedulerFactory(Constants.QUARTZ_PROPERTIES_PATH); StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
Properties properties = new Properties();
String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME);
if (dataSourceDriverClass.contains(ORG_POSTGRESQL_DRIVER)){
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
} else {
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
}
properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, Constants.QUARTZ_INSTANCENAME));
properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, Constants.QUARTZ_INSTANCEID));
properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT, Constants.QUARTZ_THREADCOUNT));
properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY, Constants.QUARTZ_THREADPRIORITY));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX, Constants.QUARTZ_TABLE_PREFIX));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,Constants.STRING_TRUE));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, Constants.QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, Constants.QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE, Constants.QUARTZ_DATASOURCE));
properties.setProperty(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler(); scheduler = schedulerFactory.getScheduler();
} catch (SchedulerException e) { } catch (SchedulerException e) {
......
...@@ -24,7 +24,7 @@ import org.springframework.stereotype.Component; ...@@ -24,7 +24,7 @@ import org.springframework.stereotype.Component;
* zookeeper conf * zookeeper conf
*/ */
@Component @Component
@PropertySource("classpath:common.properties") @PropertySource("classpath:zookeeper.properties")
public class ZookeeperConfig { public class ZookeeperConfig {
//zk connect config //zk connect config
......
...@@ -18,45 +18,36 @@ ...@@ -18,45 +18,36 @@
#============================================================================ #============================================================================
# Configure Main Scheduler Properties # Configure Main Scheduler Properties
#============================================================================ #============================================================================
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate #org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
# postgre #org.quartz.scheduler.instanceName = DolphinScheduler
#org.quartz.dataSource.myDs.driver = org.postgresql.Driver #org.quartz.scheduler.instanceId = AUTO
#org.quartz.dataSource.myDs.URL = jdbc:postgresql://localhost:5432/dolphinscheduler?characterEncoding=utf8 #org.quartz.scheduler.makeSchedulerThreadDaemon = true
# mysql #org.quartz.jobStore.useProperties = false
#org.quartz.dataSource.myDs.driver = com.mysql.jdbc.Driver
#org.quartz.dataSource.myDs.URL = jdbc:mysql://localhost:3306/dolphinscheduler?characterEncoding=utf8
#org.quartz.dataSource.myDs.user = root
#org.quartz.dataSource.myDs.password = 123456
org.quartz.scheduler.instanceName = DolphinScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.scheduler.makeSchedulerThreadDaemon = true
org.quartz.jobStore.useProperties = false
#============================================================================ #============================================================================
# Configure ThreadPool # Configure ThreadPool
#============================================================================ #============================================================================
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool #org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemons = true #org.quartz.threadPool.makeThreadsDaemons = true
org.quartz.threadPool.threadCount = 25 #org.quartz.threadPool.threadCount = 25
org.quartz.threadPool.threadPriority = 5 #org.quartz.threadPool.threadPriority = 5
#============================================================================ #============================================================================
# Configure JobStore # Configure JobStore
#============================================================================ #============================================================================
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX #org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.tablePrefix = QRTZ_ #org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true #org.quartz.jobStore.isClustered = true
org.quartz.jobStore.misfireThreshold = 60000 #org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.clusterCheckinInterval = 5000 #org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.acquireTriggersWithinLock=true org.quartz.jobStore.acquireTriggersWithinLock=true
org.quartz.jobStore.dataSource = myDs #org.quartz.jobStore.dataSource = myDs
#============================================================================ #============================================================================
# Configure Datasources # Configure Datasources
#============================================================================ #============================================================================
org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider #org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider
\ No newline at end of file \ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# task queue implementation, default "zookeeper"
dolphinscheduler.queue.impl=zookeeper
# zookeeper cluster. multiple are separated by commas. eg. 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181
zookeeper.quorum=192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181
# dolphinscheduler root directory
#zookeeper.dolphinscheduler.root=/dolphinscheduler
# dolphinscheduler failover directory
#zookeeper.session.timeout=60000
#zookeeper.connection.timeout=300
#zookeeper.retry.base.sleep=100
#zookeeper.retry.max.sleep=30000
#zookeeper.retry.maxtime=5
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册