Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
3ff7b34a
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
704
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
3ff7b34a
编写于
2月 13, 2020
作者:
Q
qiaozhanwei
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'remotes/upstream/dev' into dev
上级
fb5de178
44b76468
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
339 addition
and
110 deletion
+339
-110
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
...va/org/apache/dolphinscheduler/alert/utils/Constants.java
+3
-1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
...apache/dolphinscheduler/api/service/ResourcesService.java
+1
-1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
...ava/org/apache/dolphinscheduler/api/utils/CheckUtils.java
+3
-1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
...org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
+111
-0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
...in/java/org/apache/dolphinscheduler/common/Constants.java
+23
-29
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java
...cheduler/common/task/subprocess/SubProcessParameters.java
+1
-1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java
...apache/dolphinscheduler/common/utils/CollectionUtils.java
+5
-22
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
...org/apache/dolphinscheduler/common/utils/CommonUtils.java
+3
-4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
...ava/org/apache/dolphinscheduler/common/utils/OSUtils.java
+6
-17
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java
...he/dolphinscheduler/common/utils/CollectionUtilsTest.java
+38
-5
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CommonUtilsTest.java
...apache/dolphinscheduler/common/utils/CommonUtilsTest.java
+36
-0
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
...org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
+83
-0
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ClickHouseDataSource.java
...dolphinscheduler/dao/datasource/ClickHouseDataSource.java
+1
-1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
...he/dolphinscheduler/dao/datasource/ConnectionFactory.java
+0
-5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
...org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+1
-4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java
...he/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java
+1
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
...g/apache/dolphinscheduler/server/master/MasterServer.java
+1
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
.../apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+1
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
...g/apache/dolphinscheduler/server/worker/WorkerServer.java
+1
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
...scheduler/server/worker/task/AbstractCommandExecutor.java
+1
-1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
...pache/dolphinscheduler/service/quartz/cron/CronUtils.java
+5
-3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleFactory.java
...he/dolphinscheduler/service/quartz/cron/CycleFactory.java
+3
-1
dolphinscheduler-service/src/test/java/cron/CronUtilsTest.java
...inscheduler-service/src/test/java/cron/CronUtilsTest.java
+11
-10
未找到文件。
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
浏览文件 @
3ff7b34a
...
...
@@ -20,7 +20,9 @@ package org.apache.dolphinscheduler.alert.utils;
* constants
*/
public
class
Constants
{
private
Constants
()
{
throw
new
IllegalStateException
(
"Constants class"
);
}
/**
* alert properties path
*/
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
浏览文件 @
3ff7b34a
...
...
@@ -119,7 +119,7 @@ public class ResourcesService extends BaseService {
putMsg
(
result
,
Status
.
UDF_RESOURCE_SUFFIX_NOT_JAR
);
return
result
;
}
if
(
file
.
getSize
()
>
Constants
.
maxFileSize
)
{
if
(
file
.
getSize
()
>
Constants
.
MAX_FILE_SIZE
)
{
logger
.
error
(
"file size is too large: {}"
,
file
.
getOriginalFilename
());
putMsg
(
result
,
Status
.
RESOURCE_SIZE_EXCEED_LIMIT
);
return
result
;
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
浏览文件 @
3ff7b34a
...
...
@@ -35,7 +35,9 @@ import java.util.regex.Pattern;
*/
public
class
CheckUtils
{
private
CheckUtils
()
{
throw
new
IllegalStateException
(
"CheckUtils class"
);
}
/**
* check username
*
...
...
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
浏览文件 @
3ff7b34a
...
...
@@ -17,8 +17,26 @@
package
org.apache.dolphinscheduler.api.utils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.dolphinscheduler.api.enums.Status
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.enums.ProgramType
;
import
org.apache.dolphinscheduler.common.enums.TaskType
;
import
org.apache.dolphinscheduler.common.process.ResourceInfo
;
import
org.apache.dolphinscheduler.common.task.AbstractParameters
;
import
org.apache.dolphinscheduler.common.task.datax.DataxParameters
;
import
org.apache.dolphinscheduler.common.task.dependent.DependentParameters
;
import
org.apache.dolphinscheduler.common.task.flink.FlinkParameters
;
import
org.apache.dolphinscheduler.common.task.http.HttpParameters
;
import
org.apache.dolphinscheduler.common.task.mr.MapreduceParameters
;
import
org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters
;
import
org.apache.dolphinscheduler.common.task.python.PythonParameters
;
import
org.apache.dolphinscheduler.common.task.shell.ShellParameters
;
import
org.apache.dolphinscheduler.common.task.spark.SparkParameters
;
import
org.apache.dolphinscheduler.common.task.sql.SqlParameters
;
import
org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.utils.TaskParametersUtils
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Test
;
...
...
@@ -78,6 +96,14 @@ public class CheckUtilsTest {
}
@Test
public
void
testCheckOtherParams
()
{
assertFalse
(
CheckUtils
.
checkOtherParams
(
null
));
assertFalse
(
CheckUtils
.
checkOtherParams
(
""
));
assertTrue
(
CheckUtils
.
checkOtherParams
(
"xxx"
));
assertFalse
(
CheckUtils
.
checkOtherParams
(
"{}"
));
assertFalse
(
CheckUtils
.
checkOtherParams
(
"{\"key1\":111}"
));
}
/**
* check passwd
*/
...
...
@@ -106,5 +132,90 @@ public class CheckUtilsTest {
assertTrue
(
CheckUtils
.
checkPhone
(
"17362537263"
));
}
@Test
public
void
testCheckTaskNodeParameters
()
{
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
null
,
null
));
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
null
,
"unKnown"
));
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
"unKnown"
,
"unKnown"
));
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
"unKnown"
,
null
));
// sub SubProcessParameters
SubProcessParameters
subProcessParameters
=
new
SubProcessParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
subProcessParameters
),
TaskType
.
SUB_PROCESS
.
toString
()));
subProcessParameters
.
setProcessDefinitionId
(
1234
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
subProcessParameters
),
TaskType
.
SUB_PROCESS
.
toString
()));
// ShellParameters
ShellParameters
shellParameters
=
new
ShellParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
shellParameters
),
TaskType
.
SHELL
.
toString
()));
shellParameters
.
setRawScript
(
""
);
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
shellParameters
),
TaskType
.
SHELL
.
toString
()));
shellParameters
.
setRawScript
(
"sss"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
shellParameters
),
TaskType
.
SHELL
.
toString
()));
// ProcedureParameters
ProcedureParameters
procedureParameters
=
new
ProcedureParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
procedureParameters
),
TaskType
.
PROCEDURE
.
toString
()));
procedureParameters
.
setDatasource
(
1
);
procedureParameters
.
setType
(
"xx"
);
procedureParameters
.
setMethod
(
"yy"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
procedureParameters
),
TaskType
.
PROCEDURE
.
toString
()));
// SqlParameters
SqlParameters
sqlParameters
=
new
SqlParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
sqlParameters
),
TaskType
.
SQL
.
toString
()));
sqlParameters
.
setDatasource
(
1
);
sqlParameters
.
setType
(
"xx"
);
sqlParameters
.
setSql
(
"yy"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
sqlParameters
),
TaskType
.
SQL
.
toString
()));
// MapreduceParameters
MapreduceParameters
mapreduceParameters
=
new
MapreduceParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
mapreduceParameters
),
TaskType
.
MR
.
toString
()));
mapreduceParameters
.
setMainJar
(
new
ResourceInfo
());
mapreduceParameters
.
setProgramType
(
ProgramType
.
JAVA
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
mapreduceParameters
),
TaskType
.
MR
.
toString
()));
// SparkParameters
SparkParameters
sparkParameters
=
new
SparkParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
sparkParameters
),
TaskType
.
SPARK
.
toString
()));
sparkParameters
.
setMainJar
(
new
ResourceInfo
());
sparkParameters
.
setProgramType
(
ProgramType
.
SCALA
);
sparkParameters
.
setSparkVersion
(
"1.1.1"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
sparkParameters
),
TaskType
.
SPARK
.
toString
()));
// PythonParameters
PythonParameters
pythonParameters
=
new
PythonParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
pythonParameters
),
TaskType
.
PYTHON
.
toString
()));
pythonParameters
.
setRawScript
(
"ss"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
pythonParameters
),
TaskType
.
PYTHON
.
toString
()));
// DependentParameters
DependentParameters
dependentParameters
=
new
DependentParameters
();
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
dependentParameters
),
TaskType
.
DEPENDENT
.
toString
()));
// FlinkParameters
FlinkParameters
flinkParameters
=
new
FlinkParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
flinkParameters
),
TaskType
.
FLINK
.
toString
()));
flinkParameters
.
setMainJar
(
new
ResourceInfo
());
flinkParameters
.
setProgramType
(
ProgramType
.
JAVA
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
flinkParameters
),
TaskType
.
FLINK
.
toString
()));
// HTTP
HttpParameters
httpParameters
=
new
HttpParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
httpParameters
),
TaskType
.
HTTP
.
toString
()));
httpParameters
.
setUrl
(
"httpUrl"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
httpParameters
),
TaskType
.
HTTP
.
toString
()));
// DataxParameters
DataxParameters
dataxParameters
=
new
DataxParameters
();
assertFalse
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
dataxParameters
),
TaskType
.
DATAX
.
toString
()));
dataxParameters
.
setDataSource
(
111
);
dataxParameters
.
setDataTarget
(
333
);
dataxParameters
.
setSql
(
"sql"
);
dataxParameters
.
setTargetTable
(
"tar"
);
assertTrue
(
CheckUtils
.
checkTaskNodeParameters
(
JSONUtils
.
toJsonString
(
dataxParameters
),
TaskType
.
DATAX
.
toString
()));
}
}
\ No newline at end of file
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
浏览文件 @
3ff7b34a
...
...
@@ -25,7 +25,9 @@ import java.util.regex.Pattern;
* Constants
*/
public
final
class
Constants
{
private
Constants
()
{
throw
new
IllegalStateException
(
"Constants class"
);
}
/**
* common properties path
*/
...
...
@@ -124,49 +126,41 @@ public final class Constants {
/**
* MasterServer directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "zookeeper.dolphinscheduler.masters";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_MASTERS
=
"/masters"
;
/**
* WorkerServer directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "zookeeper.dolphinscheduler.workers";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_WORKERS
=
"/workers"
;
/**
* all servers directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "zookeeper.dolphinscheduler.dead.servers";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS
=
"/dead-servers"
;
/**
* MasterServer lock directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "zookeeper.dolphinscheduler.lock.masters";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS
=
"/lock/masters"
;
/**
* WorkerServer lock directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "zookeeper.dolphinscheduler.lock.workers";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS
=
"/lock/workers"
;
/**
* MasterServer failover directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "zookeeper.dolphinscheduler.lock.failover.masters";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS
=
"/lock/failover/masters"
;
/**
* WorkerServer failover directory registered in zookeeper
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.dolphinscheduler.lock.failover.workers";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS
=
"/lock/failover/workers"
;
/**
* MasterServer startup failover runing and fault tolerance process
*/
//public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.dolphinscheduler.lock.failover.startup.masters";
public
static
final
String
ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS
=
"/lock/failover/startup-masters"
;
/**
...
...
@@ -354,87 +348,87 @@ public final class Constants {
/**
* heartbeat threads number
*/
public
static
final
int
defaulWorkerHeartbeatThreadNum
=
1
;
public
static
final
int
DEFAUL_WORKER_HEARTBEAT_THREAD_NUM
=
1
;
/**
* heartbeat interval
*/
public
static
final
int
defaultWorkerHeartbeatInterval
=
60
;
public
static
final
int
DEFAULT_WORKER_HEARTBEAT_INTERVAL
=
60
;
/**
* worker fetch task number
*/
public
static
final
int
defaultWorkerFetchTaskNum
=
1
;
public
static
final
int
DEFAULT_WORKER_FETCH_TASK_NUM
=
1
;
/**
* worker execute threads number
*/
public
static
final
int
defaultWorkerExecThreadNum
=
10
;
public
static
final
int
DEFAULT_WORKER_EXEC_THREAD_NUM
=
10
;
/**
* master cpu load
*/
public
static
final
int
defaultMasterCpuLoad
=
Runtime
.
getRuntime
().
availableProcessors
()
*
2
;
public
static
final
int
DEFAULT_MASTER_CPU_LOAD
=
Runtime
.
getRuntime
().
availableProcessors
()
*
2
;
/**
* master reserved memory
*/
public
static
final
double
defaultMasterReservedMemory
=
OSUtils
.
totalMemorySize
()
/
10
;
public
static
final
double
DEFAULT_MASTER_RESERVED_MEMORY
=
OSUtils
.
totalMemorySize
()
/
10
;
/**
* worker cpu load
*/
public
static
final
int
defaultWorkerCpuLoad
=
Runtime
.
getRuntime
().
availableProcessors
()
*
2
;
public
static
final
int
DEFAULT_WORKER_CPU_LOAD
=
Runtime
.
getRuntime
().
availableProcessors
()
*
2
;
/**
* worker reserved memory
*/
public
static
final
double
defaultWorkerReservedMemory
=
OSUtils
.
totalMemorySize
()
/
10
;
public
static
final
double
DEFAULT_WORKER_RESERVED_MEMORY
=
OSUtils
.
totalMemorySize
()
/
10
;
/**
* master execute threads number
*/
public
static
final
int
defaultMasterExecThreadNum
=
100
;
public
static
final
int
DEFAULT_MASTER_EXEC_THREAD_NUM
=
100
;
/**
* default master concurrent task execute num
*/
public
static
final
int
defaultMasterTaskExecNum
=
20
;
public
static
final
int
DEFAULT_MASTER_TASK_EXEC_NUM
=
20
;
/**
* default log cache rows num,output when reach the number
*/
public
static
final
int
defaultLogRowsNum
=
4
*
16
;
public
static
final
int
DEFAULT_LOG_ROWS_NUM
=
4
*
16
;
/**
* log flush interval,output when reach the interval
*/
public
static
final
int
defaultLogFlushInterval
=
1000
;
public
static
final
int
DEFAULT_LOG_FLUSH_INTERVAL
=
1000
;
/**
* default master heartbeat thread number
*/
public
static
final
int
defaulMasterHeartbeatThreadNum
=
1
;
public
static
final
int
DEFAULT_MASTER_HEARTBEAT_THREAD_NUM
=
1
;
/**
* default master heartbeat interval
*/
public
static
final
int
defaultMasterHeartbeatInterval
=
60
;
public
static
final
int
DEFAULT_MASTER_HEARTBEAT_INTERVAL
=
60
;
/**
* default master commit retry times
*/
public
static
final
int
defaultMasterCommitRetryTimes
=
5
;
public
static
final
int
DEFAULT_MASTER_COMMIT_RETRY_TIMES
=
5
;
/**
* default master commit retry interval
*/
public
static
final
int
defaultMasterCommitRetryInterval
=
3000
;
public
static
final
int
DEFAULT_MASTER_COMMIT_RETRY_INTERVAL
=
3000
;
/**
* time unit secong to minutes
...
...
@@ -474,9 +468,9 @@ public final class Constants {
public
static
final
String
THREAD_NAME_MASTER_SERVER
=
"Master-Server"
;
public
static
final
String
THREAD_NAME_WORKER_SERVER
=
"Worker-Server"
;
public
static
String
TASK_RECORD_TABLE_HIVE_LOG
=
"eamp_hive_log_hd"
;
public
static
final
String
TASK_RECORD_TABLE_HIVE_LOG
=
"eamp_hive_log_hd"
;
public
static
String
TASK_RECORD_TABLE_HISTORY_HIVE_LOG
=
"eamp_hive_hist_log_hd"
;
public
static
final
String
TASK_RECORD_TABLE_HISTORY_HIVE_LOG
=
"eamp_hive_hist_log_hd"
;
/**
...
...
@@ -874,7 +868,7 @@ public final class Constants {
public
static
final
String
FLINK_JOB_MANAGE_MEM
=
"-yjm"
;
public
static
final
String
FLINK_TASK_MANAGE_MEM
=
"-ytm"
;
public
static
final
String
FLINK_
detach
=
"-d"
;
public
static
final
String
FLINK_
DETACH
=
"-d"
;
public
static
final
String
FLINK_MAIN_CLASS
=
"-c"
;
...
...
@@ -989,7 +983,7 @@ public final class Constants {
* session timeout
*/
public
static
final
int
SESSION_TIME_OUT
=
7200
;
public
static
final
int
maxFileSize
=
1024
*
1024
*
1024
;
public
static
final
int
MAX_FILE_SIZE
=
1024
*
1024
*
1024
;
public
static
final
String
UDF
=
"UDF"
;
public
static
final
String
CLASS
=
"class"
;
public
static
final
String
RECEIVERS
=
"receivers"
;
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java
浏览文件 @
3ff7b34a
...
...
@@ -38,7 +38,7 @@ public class SubProcessParameters extends AbstractParameters {
@Override
public
boolean
checkParameters
()
{
return
this
.
processDefinitionId
!=
0
;
return
this
.
processDefinitionId
!=
null
&&
this
.
processDefinitionId
!=
0
;
}
@Override
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java
浏览文件 @
3ff7b34a
...
...
@@ -37,6 +37,9 @@ import java.util.*;
*/
public
class
CollectionUtils
{
private
CollectionUtils
()
{
throw
new
IllegalStateException
(
"CollectionUtils class"
);
}
/**
* Returns a new {@link Collection} containing <i>a</i> minus a subset of
* <i>b</i>. Only the elements of <i>b</i> that satisfy the predicate
...
...
@@ -139,26 +142,6 @@ public class CollectionUtils {
cardinalityB
=
CollectionUtils
.<
O
>
getCardinalityMap
(
b
);
}
/**
* Returns the maximum frequency of an object.
*
* @param obj the object
* @return the maximum frequency of the object
*/
private
int
max
(
final
Object
obj
)
{
return
Math
.
max
(
freqA
(
obj
),
freqB
(
obj
));
}
/**
* Returns the minimum frequency of an object.
*
* @param obj the object
* @return the minimum frequency of the object
*/
private
int
min
(
final
Object
obj
)
{
return
Math
.
min
(
freqA
(
obj
),
freqB
(
obj
));
}
/**
* Returns the frequency of this object in collection A.
*
...
...
@@ -225,7 +208,7 @@ public class CollectionUtils {
if
(
a
.
size
()
!=
b
.
size
())
{
return
false
;
}
final
CardinalityHelper
<
Object
>
helper
=
new
CardinalityHelper
<
Object
>(
a
,
b
);
final
CardinalityHelper
<
Object
>
helper
=
new
CardinalityHelper
<>(
a
,
b
);
if
(
helper
.
cardinalityA
.
size
()
!=
helper
.
cardinalityB
.
size
())
{
return
false
;
}
...
...
@@ -250,7 +233,7 @@ public class CollectionUtils {
* @return the populated cardinality map
*/
public
static
<
O
>
Map
<
O
,
Integer
>
getCardinalityMap
(
final
Iterable
<?
extends
O
>
coll
)
{
final
Map
<
O
,
Integer
>
count
=
new
HashMap
<
O
,
Integer
>();
final
Map
<
O
,
Integer
>
count
=
new
HashMap
<>();
for
(
final
O
obj
:
coll
)
{
count
.
put
(
obj
,
count
.
getOrDefault
(
obj
,
0
)
+
1
);
}
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
浏览文件 @
3ff7b34a
...
...
@@ -20,8 +20,6 @@ import org.apache.dolphinscheduler.common.Constants;
import
org.apache.dolphinscheduler.common.enums.ResUploadType
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.File
;
...
...
@@ -29,8 +27,9 @@ import java.io.File;
* common utils
*/
public
class
CommonUtils
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CommonUtils
.
class
);
private
CommonUtils
()
{
throw
new
IllegalStateException
(
"CommonUtils class"
);
}
/**
* @return get the path of system environment variables
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
浏览文件 @
3ff7b34a
...
...
@@ -370,25 +370,14 @@ public class OSUtils {
double
systemCpuLoad
;
double
systemReservedMemory
;
if
(
isMaster
){
systemCpuLoad
=
conf
.
getDouble
(
Constants
.
MASTER_MAX_CPULOAD_AVG
,
Constants
.
defaultMasterCpuLoad
);
systemReservedMemory
=
conf
.
getDouble
(
Constants
.
MASTER_RESERVED_MEMORY
,
Constants
.
defaultMasterReservedMemory
);
if
(
Boolean
.
TRUE
.
equals
(
isMaster
)
){
systemCpuLoad
=
conf
.
getDouble
(
Constants
.
MASTER_MAX_CPULOAD_AVG
,
Constants
.
DEFAULT_MASTER_CPU_LOAD
);
systemReservedMemory
=
conf
.
getDouble
(
Constants
.
MASTER_RESERVED_MEMORY
,
Constants
.
DEFAULT_MASTER_RESERVED_MEMORY
);
}
else
{
systemCpuLoad
=
conf
.
getDouble
(
Constants
.
WORKER_MAX_CPULOAD_AVG
,
Constants
.
defaultWorkerCpuLoad
);
systemReservedMemory
=
conf
.
getDouble
(
Constants
.
WORKER_RESERVED_MEMORY
,
Constants
.
defaultWorkerReservedMemory
);
}
// judging usage
double
loadAverage
=
OSUtils
.
loadAverage
();
//
double
availablePhysicalMemorySize
=
OSUtils
.
availablePhysicalMemorySize
();
if
(
loadAverage
>
systemCpuLoad
||
availablePhysicalMemorySize
<
systemReservedMemory
){
logger
.
warn
(
"load or availablePhysicalMemorySize(G) is too high, it's availablePhysicalMemorySize(G):{},loadAvg:{}"
,
availablePhysicalMemorySize
,
loadAverage
);
return
false
;
}
else
{
return
true
;
systemCpuLoad
=
conf
.
getDouble
(
Constants
.
WORKER_MAX_CPULOAD_AVG
,
Constants
.
DEFAULT_WORKER_CPU_LOAD
);
systemReservedMemory
=
conf
.
getDouble
(
Constants
.
WORKER_RESERVED_MEMORY
,
Constants
.
DEFAULT_WORKER_RESERVED_MEMORY
);
}
return
checkResource
(
systemCpuLoad
,
systemReservedMemory
);
}
}
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java
浏览文件 @
3ff7b34a
...
...
@@ -76,11 +76,11 @@ public class CollectionUtilsTest {
a
=
CollectionUtils
.
stringToMap
(
"a=b;c=d"
,
null
);
Assert
.
assertTrue
(
a
.
isEmpty
());
a
=
CollectionUtils
.
stringToMap
(
"a=b;c=d;e=f"
,
";"
);
Assert
.
assertEquals
(
a
.
size
(),
3
);
Assert
.
assertEquals
(
3
,
a
.
size
()
);
a
=
CollectionUtils
.
stringToMap
(
"a;b=f"
,
";"
);
Assert
.
assertTrue
(
a
.
isEmpty
());
a
=
CollectionUtils
.
stringToMap
(
"a=b;c=d;e=f;"
,
";"
,
"test"
);
Assert
.
assertEquals
(
a
.
size
(),
3
);
Assert
.
assertEquals
(
3
,
a
.
size
()
);
Assert
.
assertNotNull
(
a
.
get
(
"testa"
));
}
...
...
@@ -91,14 +91,14 @@ public class CollectionUtilsTest {
originList
.
add
(
1
);
originList
.
add
(
2
);
List
<
Map
<
String
,
Object
>>
ret
=
CollectionUtils
.
getListByExclusion
(
originList
,
null
);
Assert
.
assertEquals
(
ret
.
size
(),
2
);
Assert
.
assertEquals
(
2
,
ret
.
size
()
);
ret
=
CollectionUtils
.
getListByExclusion
(
originList
,
new
HashSet
<>());
Assert
.
assertEquals
(
ret
.
size
(),
2
);
Assert
.
assertEquals
(
2
,
ret
.
size
()
);
Assert
.
assertFalse
(
ret
.
get
(
0
).
isEmpty
());
Set
<
String
>
exclusion
=
new
HashSet
<>();
exclusion
.
add
(
Constants
.
CLASS
);
ret
=
CollectionUtils
.
getListByExclusion
(
originList
,
exclusion
);
Assert
.
assertEquals
(
ret
.
size
(),
2
);
Assert
.
assertEquals
(
2
,
ret
.
size
()
);
Assert
.
assertTrue
(
ret
.
get
(
0
).
isEmpty
());
}
...
...
@@ -108,5 +108,38 @@ public class CollectionUtilsTest {
Assert
.
assertFalse
(
CollectionUtils
.
isNotEmpty
(
list
));
Assert
.
assertFalse
(
CollectionUtils
.
isNotEmpty
(
null
));
}
@Test
public
void
isEmpty
(){
List
<
Integer
>
list
=
new
ArrayList
<>();
Assert
.
assertTrue
(
CollectionUtils
.
isEmpty
(
list
));
Assert
.
assertTrue
(
CollectionUtils
.
isEmpty
(
null
));
list
.
add
(
1
);
Assert
.
assertFalse
(
CollectionUtils
.
isEmpty
(
list
));
}
@Test
public
void
isEqualCollection
()
{
List
<
Integer
>
a
=
new
ArrayList
<>();
a
.
add
(
1
);
List
<
Integer
>
b
=
new
ArrayList
<>();
b
.
add
(
1
);
Assert
.
assertTrue
(
CollectionUtils
.
isEqualCollection
(
a
,
b
));
b
.
add
(
2
);
Assert
.
assertFalse
(
CollectionUtils
.
isEqualCollection
(
a
,
b
));
}
@Test
public
void
getCardinalityMap
(){
List
<
Integer
>
a
=
new
ArrayList
<>();
a
.
add
(
1
);
a
.
add
(
2
);
a
.
add
(
2
);
a
.
add
(
3
);
a
.
add
(
3
);
a
.
add
(
3
);
Map
<
Integer
,
Integer
>
cardinalityMap
=
CollectionUtils
.
getCardinalityMap
(
a
);
Assert
.
assertEquals
(
3
,
cardinalityMap
.
size
());
Assert
.
assertEquals
(
1
,
cardinalityMap
.
get
(
1
).
intValue
());
Assert
.
assertEquals
(
2
,
cardinalityMap
.
get
(
2
).
intValue
());
Assert
.
assertEquals
(
3
,
cardinalityMap
.
get
(
3
).
intValue
());
}
}
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CommonUtilsTest.java
浏览文件 @
3ff7b34a
...
...
@@ -16,6 +16,7 @@
*/
package
org.apache.dolphinscheduler.common.utils
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -28,24 +29,58 @@ import java.net.UnknownHostException;
*/
public
class
CommonUtilsTest
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CommonUtilsTest
.
class
);
@Test
public
void
getSystemEnvPath
()
{
logger
.
info
(
CommonUtils
.
getSystemEnvPath
());
Assert
.
assertTrue
(
true
);
}
@Test
public
void
getQueueImplValue
(){
logger
.
info
(
CommonUtils
.
getQueueImplValue
());
Assert
.
assertTrue
(
true
);
}
@Test
public
void
isDevelopMode
()
{
logger
.
info
(
"develop mode: {}"
,
CommonUtils
.
isDevelopMode
());
Assert
.
assertTrue
(
true
);
}
@Test
public
void
getKerberosStartupState
(){
logger
.
info
(
"kerberos startup state: {}"
,
CommonUtils
.
getKerberosStartupState
());
Assert
.
assertTrue
(
true
);
}
@Test
public
void
loadKerberosConf
(){
try
{
CommonUtils
.
loadKerberosConf
();
Assert
.
assertTrue
(
true
);
}
catch
(
Exception
e
)
{
Assert
.
fail
(
"load Kerberos Conf failed"
);
}
}
@Test
public
void
getHdfsDataBasePath
()
{
logger
.
info
(
HadoopUtils
.
getHdfsDataBasePath
());
Assert
.
assertTrue
(
true
);
}
@Test
public
void
getDownloadFilename
()
{
logger
.
info
(
FileUtils
.
getDownloadFilename
(
"a.txt"
));
Assert
.
assertTrue
(
true
);
}
@Test
public
void
getUploadFilename
()
{
logger
.
info
(
FileUtils
.
getUploadFilename
(
"1234"
,
"a.txt"
));
Assert
.
assertTrue
(
true
);
}
@Test
public
void
getHdfsDir
()
{
logger
.
info
(
HadoopUtils
.
getHdfsResDir
(
"1234"
));
Assert
.
assertTrue
(
true
);
}
@Test
...
...
@@ -57,5 +92,6 @@ public class CommonUtilsTest {
}
catch
(
UnknownHostException
e
)
{
e
.
printStackTrace
();
}
Assert
.
assertTrue
(
true
);
}
}
\ No newline at end of file
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
浏览文件 @
3ff7b34a
...
...
@@ -16,10 +16,16 @@
*/
package
org.apache.dolphinscheduler.common.utils
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.configuration.PropertiesConfiguration
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.yetus.audience.InterfaceAudience
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.util.List
;
public
class
OSUtilsTest
{
...
...
@@ -31,4 +37,81 @@ public class OSUtilsTest {
Assert
.
assertNotEquals
(
"System user list should not be empty"
,
userList
.
size
(),
0
);
logger
.
info
(
"OS user list : {}"
,
userList
.
toString
());
}
@Test
public
void
testOSMetric
(){
double
availablePhysicalMemorySize
=
OSUtils
.
availablePhysicalMemorySize
();
Assert
.
assertTrue
(
availablePhysicalMemorySize
>
0.0f
);
double
totalMemorySize
=
OSUtils
.
totalMemorySize
();
Assert
.
assertTrue
(
totalMemorySize
>
0.0f
);
double
loadAverage
=
OSUtils
.
loadAverage
();
logger
.
info
(
"loadAverage {}"
,
loadAverage
);
double
memoryUsage
=
OSUtils
.
memoryUsage
();
Assert
.
assertTrue
(
memoryUsage
>
0.0f
);
double
cpuUsage
=
OSUtils
.
cpuUsage
();
Assert
.
assertTrue
(
cpuUsage
>
0.0f
);
}
@Test
public
void
getGroup
()
{
if
(
OSUtils
.
isMacOS
()
||
!
OSUtils
.
isWindows
()){
try
{
String
group
=
OSUtils
.
getGroup
();
Assert
.
assertNotNull
(
group
);
}
catch
(
IOException
e
)
{
Assert
.
fail
(
"get group failed "
+
e
.
getMessage
());
}
}
}
@Test
public
void
exeCmd
()
{
if
(
OSUtils
.
isMacOS
()
||
!
OSUtils
.
isWindows
()){
try
{
String
result
=
OSUtils
.
exeCmd
(
"echo helloWorld"
);
Assert
.
assertEquals
(
"helloWorld\n"
,
result
);
}
catch
(
IOException
e
)
{
Assert
.
fail
(
"exeCmd "
+
e
.
getMessage
());
}
}
}
@Test
public
void
getProcessID
(){
int
processId
=
OSUtils
.
getProcessID
();
Assert
.
assertNotEquals
(
0
,
processId
);
}
@Test
public
void
getHost
(){
String
host
=
OSUtils
.
getHost
();
Assert
.
assertNotNull
(
host
);
Assert
.
assertNotEquals
(
""
,
host
);
}
@Test
public
void
checkResource
(){
boolean
resource
=
OSUtils
.
checkResource
(
100
,
0
);
Assert
.
assertTrue
(
resource
);
resource
=
OSUtils
.
checkResource
(
0
,
Double
.
MAX_VALUE
);
Assert
.
assertFalse
(
resource
);
Configuration
configuration
=
new
PropertiesConfiguration
();
configuration
.
setProperty
(
Constants
.
MASTER_MAX_CPULOAD_AVG
,
100
);
configuration
.
setProperty
(
Constants
.
MASTER_RESERVED_MEMORY
,
0
);
resource
=
OSUtils
.
checkResource
(
configuration
,
true
);
Assert
.
assertTrue
(
resource
);
configuration
.
setProperty
(
Constants
.
MASTER_MAX_CPULOAD_AVG
,
0
);
configuration
.
setProperty
(
Constants
.
MASTER_RESERVED_MEMORY
,
Double
.
MAX_VALUE
);
resource
=
OSUtils
.
checkResource
(
configuration
,
true
);
Assert
.
assertFalse
(
resource
);
configuration
.
setProperty
(
Constants
.
WORKER_MAX_CPULOAD_AVG
,
100
);
configuration
.
setProperty
(
Constants
.
WORKER_RESERVED_MEMORY
,
0
);
resource
=
OSUtils
.
checkResource
(
configuration
,
false
);
Assert
.
assertTrue
(
resource
);
configuration
.
setProperty
(
Constants
.
WORKER_MAX_CPULOAD_AVG
,
0
);
configuration
.
setProperty
(
Constants
.
WORKER_RESERVED_MEMORY
,
Double
.
MAX_VALUE
);
resource
=
OSUtils
.
checkResource
(
configuration
,
false
);
Assert
.
assertFalse
(
resource
);
}
}
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ClickHouseDataSource.java
浏览文件 @
3ff7b34a
...
...
@@ -38,7 +38,7 @@ public class ClickHouseDataSource extends BaseDataSource {
@Override
public
String
getJdbcUrl
()
{
String
jdbcUrl
=
getAddress
();
if
(
jdbcUrl
.
lastIndexOf
(
"/"
)
!=
(
jdbcUrl
.
length
()
-
1
))
{
if
(
jdbcUrl
.
lastIndexOf
(
'/'
)
!=
(
jdbcUrl
.
length
()
-
1
))
{
jdbcUrl
+=
"/"
;
}
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
浏览文件 @
3ff7b34a
...
...
@@ -20,8 +20,6 @@ import com.alibaba.druid.pool.DruidDataSource;
import
com.baomidou.mybatisplus.core.MybatisConfiguration
;
import
com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor
;
import
com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean
;
import
org.apache.commons.configuration.ConfigurationException
;
import
org.apache.commons.configuration.PropertiesConfiguration
;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.ibatis.mapping.Environment
;
import
org.apache.ibatis.session.SqlSession
;
...
...
@@ -31,8 +29,6 @@ import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import
org.mybatis.spring.SqlSessionTemplate
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.stereotype.Service
;
import
javax.sql.DataSource
;
...
...
@@ -117,7 +113,6 @@ public class ConnectionFactory extends SpringConnectionFactory{
sqlSessionFactoryBean
.
setTypeEnumsPackage
(
"org.apache.dolphinscheduler.*.enums"
);
sqlSessionFactory
=
sqlSessionFactoryBean
.
getObject
();
return
sqlSessionFactory
;
}
}
}
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
浏览文件 @
3ff7b34a
...
...
@@ -16,13 +16,10 @@
*/
package
org.apache.dolphinscheduler.dao.mapper
;
import
com.baomidou.mybatisplus.core.mapper.BaseMapper
;
import
org.apache.dolphinscheduler.dao.entity.Command
;
import
org.apache.dolphinscheduler.dao.entity.CommandCount
;
import
com.baomidou.mybatisplus.core.conditions.Wrapper
;
import
com.baomidou.mybatisplus.core.mapper.BaseMapper
;
import
com.baomidou.mybatisplus.core.toolkit.Constants
;
import
org.apache.ibatis.annotations.Param
;
import
org.apache.ibatis.annotations.Select
;
import
java.util.Date
;
import
java.util.List
;
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ConnectionFactoryTest.java
浏览文件 @
3ff7b34a
...
...
@@ -32,6 +32,6 @@ public class ConnectionFactoryTest {
@Test
public
void
testConnection
()
throws
Exception
{
Connection
connection
=
ConnectionFactory
.
getDataSource
().
getPooledConnection
().
getConnection
();
Assert
.
assert
Equals
(
connection
!=
null
,
true
);
Assert
.
assert
True
(
connection
!=
null
);
}
}
\ No newline at end of file
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
浏览文件 @
3ff7b34a
...
...
@@ -112,7 +112,7 @@ public class MasterServer implements IStoppable {
masterSchedulerService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Master-Scheduler-Thread"
);
heartbeatMasterService
=
ThreadUtils
.
newDaemonThreadScheduledExecutor
(
"Master-Main-Thread"
,
Constants
.
defaulMasterHeartbeatThreadNum
);
heartbeatMasterService
=
ThreadUtils
.
newDaemonThreadScheduledExecutor
(
"Master-Main-Thread"
,
Constants
.
DEFAULT_MASTER_HEARTBEAT_THREAD_NUM
);
// heartbeat thread implement
Runnable
heartBeatThread
=
heartBeatThread
();
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
浏览文件 @
3ff7b34a
...
...
@@ -87,7 +87,7 @@ public class FlinkArgsUtils {
args
.
add
(
taskManagerMemory
);
}
args
.
add
(
Constants
.
FLINK_
detach
);
//-d
args
.
add
(
Constants
.
FLINK_
DETACH
);
//-d
}
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
浏览文件 @
3ff7b34a
...
...
@@ -149,7 +149,7 @@ public class WorkerServer implements IStoppable {
this
.
fetchTaskExecutorService
=
ThreadUtils
.
newDaemonSingleThreadExecutor
(
"Worker-Fetch-Thread-Executor"
);
heartbeatWorkerService
=
ThreadUtils
.
newDaemonThreadScheduledExecutor
(
"Worker-Heartbeat-Thread-Executor"
,
Constants
.
defaulWorkerHeartbeatThreadNum
);
heartbeatWorkerService
=
ThreadUtils
.
newDaemonThreadScheduledExecutor
(
"Worker-Heartbeat-Thread-Executor"
,
Constants
.
DEFAUL_WORKER_HEARTBEAT_THREAD_NUM
);
// heartbeat thread implement
Runnable
heartBeatThread
=
heartBeatThread
();
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
浏览文件 @
3ff7b34a
...
...
@@ -535,7 +535,7 @@ public abstract class AbstractCommandExecutor {
/**
* when log buffer siz or flush time reach condition , then flush
*/
if
(
logBuffer
.
size
()
>=
Constants
.
defaultLogRowsNum
||
now
-
lastFlushTime
>
Constants
.
defaultLogFlushInterval
)
{
if
(
logBuffer
.
size
()
>=
Constants
.
DEFAULT_LOG_ROWS_NUM
||
now
-
lastFlushTime
>
Constants
.
DEFAULT_LOG_FLUSH_INTERVAL
)
{
lastFlushTime
=
now
;
/** log handle */
logHandler
.
accept
(
logBuffer
);
...
...
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
浏览文件 @
3ff7b34a
...
...
@@ -38,7 +38,9 @@ import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.*;
* cron utils
*/
public
class
CronUtils
{
private
CronUtils
()
{
throw
new
IllegalStateException
(
"CronUtils class"
);
}
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CronUtils
.
class
);
...
...
@@ -169,7 +171,7 @@ public class CronUtils {
cronExpression
=
parse2CronExpression
(
cron
);
}
catch
(
ParseException
e
){
logger
.
error
(
e
.
getMessage
(),
e
);
return
Collections
.
EMPTY_LIST
;
return
Collections
.
emptyList
()
;
}
return
getSelfFireDateList
(
startTime
,
endTime
,
cronExpression
);
}
...
...
@@ -202,7 +204,7 @@ public class CronUtils {
calendar
.
add
(
Calendar
.
DATE
,
1
);
break
;
default
:
logger
.
error
(
"Dependent process definition's cycleEnum is {},not support!!"
,
cycleEnum
.
name
()
);
logger
.
error
(
"Dependent process definition's cycleEnum is {},not support!!"
,
cycleEnum
);
break
;
}
maxExpirationTime
=
calendar
.
getTime
();
...
...
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleFactory.java
浏览文件 @
3ff7b34a
...
...
@@ -25,7 +25,9 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum;
* Crontab Cycle Tool Factory
*/
public
class
CycleFactory
{
private
CycleFactory
()
{
throw
new
IllegalStateException
(
"CycleFactory class"
);
}
/**
* min
* @param cron cron
...
...
dolphinscheduler-service/src/test/java/cron/CronUtilsTest.java
浏览文件 @
3ff7b34a
...
...
@@ -61,7 +61,7 @@ public class CronUtilsTest {
String
cronAsString
=
cron
.
asString
();
// 0 */5 * * * ? * Every five minutes(once every 5 minutes)
Assert
.
assertEquals
(
cronAsString
,
"0 */5 * * * ? *"
);
Assert
.
assertEquals
(
"0 */5 * * * ? *"
,
cronAsString
);
}
...
...
@@ -74,12 +74,12 @@ public class CronUtilsTest {
String
strCrontab
=
"0 1 2 3 * ? *"
;
Cron
depCron
=
CronUtils
.
parse2Cron
(
strCrontab
);
Assert
.
assertEquals
(
depCron
.
retrieve
(
CronFieldName
.
SECOND
).
getExpression
().
asString
(),
"0"
);
Assert
.
assertEquals
(
depCron
.
retrieve
(
CronFieldName
.
MINUTE
).
getExpression
().
asString
(),
"1"
);
Assert
.
assertEquals
(
depCron
.
retrieve
(
CronFieldName
.
HOUR
).
getExpression
().
asString
(),
"2"
);
Assert
.
assertEquals
(
depCron
.
retrieve
(
CronFieldName
.
DAY_OF_MONTH
).
getExpression
().
asString
(),
"3"
);
Assert
.
assertEquals
(
depCron
.
retrieve
(
CronFieldName
.
MONTH
).
getExpression
().
asString
(),
"*"
);
Assert
.
assertEquals
(
depCron
.
retrieve
(
CronFieldName
.
YEAR
).
getExpression
().
asString
(),
"*"
);
Assert
.
assertEquals
(
"0"
,
depCron
.
retrieve
(
CronFieldName
.
SECOND
).
getExpression
().
asString
()
);
Assert
.
assertEquals
(
"1"
,
depCron
.
retrieve
(
CronFieldName
.
MINUTE
).
getExpression
().
asString
()
);
Assert
.
assertEquals
(
"2"
,
depCron
.
retrieve
(
CronFieldName
.
HOUR
).
getExpression
().
asString
()
);
Assert
.
assertEquals
(
"3"
,
depCron
.
retrieve
(
CronFieldName
.
DAY_OF_MONTH
).
getExpression
().
asString
()
);
Assert
.
assertEquals
(
"*"
,
depCron
.
retrieve
(
CronFieldName
.
MONTH
).
getExpression
().
asString
()
);
Assert
.
assertEquals
(
"*"
,
depCron
.
retrieve
(
CronFieldName
.
YEAR
).
getExpression
().
asString
()
);
}
/**
...
...
@@ -89,13 +89,13 @@ public class CronUtilsTest {
@Test
public
void
testScheduleType
()
throws
ParseException
{
CycleEnum
cycleEnum
=
CronUtils
.
getMaxCycle
(
CronUtils
.
parse2Cron
(
"0 */1 * * * ? *"
));
Assert
.
assertEquals
(
cycleEnum
.
name
(),
"MINUTE"
);
Assert
.
assertEquals
(
"MINUTE"
,
cycleEnum
.
name
()
);
CycleEnum
cycleEnum2
=
CronUtils
.
getMaxCycle
(
"0 * * * * ? *"
);
Assert
.
assertEquals
(
cycleEnum2
.
name
(),
"MINUTE"
);
Assert
.
assertEquals
(
"MINUTE"
,
cycleEnum2
.
name
()
);
CycleEnum
cycleEnum3
=
CronUtils
.
getMiniCycle
(
CronUtils
.
parse2Cron
(
"0 * * * * ? *"
));
Assert
.
assertEquals
(
cycleEnum3
.
name
(),
"MINUTE"
);
Assert
.
assertEquals
(
"MINUTE"
,
cycleEnum3
.
name
()
);
}
/**
...
...
@@ -164,6 +164,7 @@ public class CronUtilsTest {
logger
.
info
(
"can't get scheduleType"
);
}
}
Assert
.
assertTrue
(
true
);
}
@Test
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录