Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
d83a94a3
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
706
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
d83a94a3
编写于
3月 26, 2020
作者:
G
gabry.wu
提交者:
GitHub
3月 26, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Adapting partial code(file name start with T) to the sonar cloud rule (#2271)
上级
74525c28
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
68 addition
and
76 deletion
+68
-76
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
...ava/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
+18
-18
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
...che/dolphinscheduler/api/service/TaskInstanceService.java
+0
-4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java
...pache/dolphinscheduler/api/service/TaskRecordService.java
+0
-4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
...rg/apache/dolphinscheduler/api/service/TenantService.java
+2
-2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
...g/apache/dolphinscheduler/common/enums/TaskStateType.java
+1
-1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
...va/org/apache/dolphinscheduler/common/model/TaskNode.java
+3
-3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadPoolExecutors.java
...e/dolphinscheduler/common/thread/ThreadPoolExecutors.java
+18
-18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java
...eduler/common/utils/placeholder/TimePlaceholderUtils.java
+4
-4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
...n/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
+2
-2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
...e/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+2
-2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TenantMapperTest.java
.../apache/dolphinscheduler/dao/mapper/TenantMapperTest.java
+2
-2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
...hinscheduler/server/worker/runner/TaskScheduleThread.java
+1
-1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
...pache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
+3
-3
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskQueueZKImplTest.java
...e/dolphinscheduler/service/queue/TaskQueueZKImplTest.java
+12
-12
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
浏览文件 @
d83a94a3
...
...
@@ -43,36 +43,36 @@ public class TaskCountDto {
}
private
void
countTaskDtos
(
List
<
ExecuteStatusCount
>
taskInstanceStateCounts
){
int
submitted
_s
uccess
=
0
;
int
running
_e
xeution
=
0
;
int
ready
_p
ause
=
0
;
int
submitted
S
uccess
=
0
;
int
running
E
xeution
=
0
;
int
ready
P
ause
=
0
;
int
pause
=
0
;
int
ready
_s
top
=
0
;
int
ready
S
top
=
0
;
int
stop
=
0
;
int
failure
=
0
;
int
success
=
0
;
int
need
_fault_t
olerance
=
0
;
int
need
FaultT
olerance
=
0
;
int
kill
=
0
;
int
waitting
_t
hread
=
0
;
int
waitting
T
hread
=
0
;
for
(
ExecuteStatusCount
taskInstanceStateCount
:
taskInstanceStateCounts
){
ExecutionStatus
status
=
taskInstanceStateCount
.
getExecutionStatus
();
totalCount
+=
taskInstanceStateCount
.
getCount
();
switch
(
status
){
case
SUBMITTED_SUCCESS:
submitted
_s
uccess
+=
taskInstanceStateCount
.
getCount
();
submitted
S
uccess
+=
taskInstanceStateCount
.
getCount
();
break
;
case
RUNNING_EXEUTION:
running
_e
xeution
+=
taskInstanceStateCount
.
getCount
();
running
E
xeution
+=
taskInstanceStateCount
.
getCount
();
break
;
case
READY_PAUSE:
ready
_p
ause
+=
taskInstanceStateCount
.
getCount
();
ready
P
ause
+=
taskInstanceStateCount
.
getCount
();
break
;
case
PAUSE:
pause
+=
taskInstanceStateCount
.
getCount
();
break
;
case
READY_STOP:
ready
_s
top
+=
taskInstanceStateCount
.
getCount
();
ready
S
top
+=
taskInstanceStateCount
.
getCount
();
break
;
case
STOP:
stop
+=
taskInstanceStateCount
.
getCount
();
...
...
@@ -84,13 +84,13 @@ public class TaskCountDto {
success
+=
taskInstanceStateCount
.
getCount
();
break
;
case
NEED_FAULT_TOLERANCE:
need
_fault_t
olerance
+=
taskInstanceStateCount
.
getCount
();
need
FaultT
olerance
+=
taskInstanceStateCount
.
getCount
();
break
;
case
KILL:
kill
+=
taskInstanceStateCount
.
getCount
();
break
;
case
WAITTING_THREAD:
waitting
_t
hread
+=
taskInstanceStateCount
.
getCount
();
waitting
T
hread
+=
taskInstanceStateCount
.
getCount
();
break
;
default
:
...
...
@@ -98,17 +98,17 @@ public class TaskCountDto {
}
}
this
.
taskCountDtos
=
new
ArrayList
<>();
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
SUBMITTED_SUCCESS
,
submitted
_s
uccess
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
RUNNING_EXEUTION
,
running
_e
xeution
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
READY_PAUSE
,
ready
_p
ause
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
SUBMITTED_SUCCESS
,
submitted
S
uccess
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
RUNNING_EXEUTION
,
running
E
xeution
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
READY_PAUSE
,
ready
P
ause
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
PAUSE
,
pause
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
READY_STOP
,
ready
_s
top
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
READY_STOP
,
ready
S
top
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
STOP
,
stop
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
FAILURE
,
failure
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
SUCCESS
,
success
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
NEED_FAULT_TOLERANCE
,
need
_fault_t
olerance
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
NEED_FAULT_TOLERANCE
,
need
FaultT
olerance
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
KILL
,
kill
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
WAITTING_THREAD
,
waitting
_t
hread
));
this
.
taskCountDtos
.
add
(
new
TaskStateCount
(
ExecutionStatus
.
WAITTING_THREAD
,
waitting
T
hread
));
}
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
浏览文件 @
d83a94a3
...
...
@@ -32,8 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import
org.apache.dolphinscheduler.dao.mapper.ProjectMapper
;
import
org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper
;
import
org.apache.dolphinscheduler.service.process.ProcessService
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
...
...
@@ -46,8 +44,6 @@ import java.util.*;
@Service
public
class
TaskInstanceService
extends
BaseService
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TaskInstanceService
.
class
);
@Autowired
ProjectMapper
projectMapper
;
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java
浏览文件 @
d83a94a3
...
...
@@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.dao.TaskRecordDao
;
import
org.apache.dolphinscheduler.dao.entity.TaskRecord
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Service
;
import
java.util.HashMap
;
...
...
@@ -37,8 +35,6 @@ import static org.apache.dolphinscheduler.common.Constants.*;
@Service
public
class
TaskRecordService
extends
BaseService
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TaskRecordService
.
class
);
/**
* query task record list paging
*
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
浏览文件 @
d83a94a3
...
...
@@ -310,7 +310,7 @@ public class TenantService extends BaseService{
Map
<
String
,
Object
>
result
=
new
HashMap
<>(
5
);
List
<
Tenant
>
resourceList
=
tenantMapper
.
queryByTenantCode
(
tenantCode
);
if
(
resourceList
!=
null
&&
resourceList
.
size
()
>
0
)
{
if
(
CollectionUtils
.
isNotEmpty
(
resourceList
)
)
{
result
.
put
(
Constants
.
DATA_LIST
,
resourceList
);
putMsg
(
result
,
Status
.
SUCCESS
);
}
else
{
...
...
@@ -346,6 +346,6 @@ public class TenantService extends BaseService{
*/
private
boolean
checkTenantExists
(
String
tenantCode
)
{
List
<
Tenant
>
tenants
=
tenantMapper
.
queryByTenantCode
(
tenantCode
);
return
(
tenants
!=
null
&&
tenants
.
size
()
>
0
);
return
CollectionUtils
.
isNotEmpty
(
tenants
);
}
}
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
浏览文件 @
d83a94a3
...
...
@@ -60,7 +60,7 @@ public enum TaskStateType {
default
:
break
;
}
return
n
ull
;
return
n
ew
int
[
0
]
;
}
}
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
浏览文件 @
d83a94a3
...
...
@@ -293,14 +293,14 @@ public class TaskNode {
public
TaskTimeoutParameter
getTaskTimeoutParameter
()
{
if
(
StringUtils
.
isNotEmpty
(
this
.
getTimeout
())){
String
formatStr
=
String
.
format
(
"%s,%s"
,
TaskTimeoutStrategy
.
WARN
.
name
(),
TaskTimeoutStrategy
.
FAILED
.
name
());
String
timeout
=
this
.
getTimeout
().
replace
(
formatStr
,
TaskTimeoutStrategy
.
WARNFAILED
.
name
());
return
JSON
.
parseObject
(
timeout
,
TaskTimeoutParameter
.
class
);
String
t
askT
imeout
=
this
.
getTimeout
().
replace
(
formatStr
,
TaskTimeoutStrategy
.
WARNFAILED
.
name
());
return
JSON
.
parseObject
(
t
askT
imeout
,
TaskTimeoutParameter
.
class
);
}
return
new
TaskTimeoutParameter
(
false
);
}
public
boolean
isConditionsTask
(){
return
this
.
getType
().
toUpperCase
().
equals
(
TaskType
.
CONDITIONS
.
toString
());
return
TaskType
.
CONDITIONS
.
toString
().
equalsIgnoreCase
(
this
.
getType
());
}
@Override
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadPoolExecutors.java
浏览文件 @
d83a94a3
...
...
@@ -74,21 +74,21 @@ public class ThreadPoolExecutors {
* @param event
*/
public
void
execute
(
final
Runnable
event
)
{
Executor
executor
=
getExecutor
();
if
(
executor
==
null
)
{
logger
.
error
(
"Cannot execute [
"
+
event
+
"] because the executor is missing."
);
Executor
e
ventE
xecutor
=
getExecutor
();
if
(
e
ventE
xecutor
==
null
)
{
logger
.
error
(
"Cannot execute [
{}}] because the executor is missing."
,
event
);
}
else
{
executor
.
execute
(
event
);
e
ventE
xecutor
.
execute
(
event
);
}
}
public
Future
<?>
submit
(
Runnable
event
)
{
Executor
executor
=
getExecutor
();
if
(
executor
==
null
)
{
logger
.
error
(
"Cannot submit [
"
+
event
+
"] because the executor is missing."
);
Executor
e
ventE
xecutor
=
getExecutor
();
if
(
e
ventE
xecutor
==
null
)
{
logger
.
error
(
"Cannot submit [
{}}] because the executor is missing."
,
event
);
}
else
{
return
executor
.
submit
(
event
);
return
e
ventE
xecutor
.
submit
(
event
);
}
return
null
;
...
...
@@ -97,11 +97,11 @@ public class ThreadPoolExecutors {
public
Future
<?>
submit
(
Callable
<?>
task
)
{
Executor
e
xecutor
=
getExecutor
();
if
(
e
xecutor
==
null
)
{
logger
.
error
(
"Cannot submit [
"
+
task
+
"] because the executor is missing."
);
Executor
taskE
xecutor
=
getExecutor
();
if
(
taskE
xecutor
==
null
)
{
logger
.
error
(
"Cannot submit [
{}] because the executor is missing."
,
task
);
}
else
{
return
e
xecutor
.
submit
(
task
);
return
taskE
xecutor
.
submit
(
task
);
}
return
null
;
...
...
@@ -110,8 +110,8 @@ public class ThreadPoolExecutors {
public
void
printStatus
()
{
Executor
e
xecutor
=
getExecutor
();
e
xecutor
.
getStatus
().
dumpInfo
();
Executor
printE
xecutor
=
getExecutor
();
printE
xecutor
.
getStatus
().
dumpInfo
();
}
...
...
@@ -125,7 +125,7 @@ public class ThreadPoolExecutors {
List
<
Runnable
>
wasRunning
=
executor
.
threadPoolExecutor
.
shutdownNow
();
if
(!
wasRunning
.
isEmpty
())
{
logger
.
info
(
executor
+
" had "
+
wasRunning
+
" on shutdown"
);
logger
.
info
(
"{} had {} on shutdown"
,
executor
,
wasRunning
);
}
}
}
...
...
@@ -138,7 +138,7 @@ public class ThreadPoolExecutors {
/**
* how long to retain excess threads
*/
final
long
keepAliveTimeInMillis
=
1000
;
static
final
long
KEEP_ALIVE_TIME_IN_MILLIS
=
1000
;
/**
* the thread pool executor that services the requests
*/
...
...
@@ -146,7 +146,7 @@ public class ThreadPoolExecutors {
/**
* work queue to use - unbounded queue
*/
final
BlockingQueue
<
Runnable
>
q
=
new
LinkedBlockingQueue
<
Runnable
>();
final
BlockingQueue
<
Runnable
>
q
=
new
LinkedBlockingQueue
<>();
private
final
String
name
;
private
static
final
AtomicLong
seqids
=
new
AtomicLong
(
0
);
private
final
long
id
;
...
...
@@ -156,7 +156,7 @@ public class ThreadPoolExecutors {
this
.
name
=
name
;
//create the thread pool executor
this
.
threadPoolExecutor
=
new
TrackingThreadPoolExecutor
(
maxThreads
,
maxThreads
,
keepAliveTimeInMillis
,
maxThreads
,
maxThreads
,
KEEP_ALIVE_TIME_IN_MILLIS
,
TimeUnit
.
MILLISECONDS
,
q
);
// name the threads for this threadpool
ThreadFactoryBuilder
tfb
=
new
ThreadFactoryBuilder
();
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java
浏览文件 @
d83a94a3
...
...
@@ -35,12 +35,12 @@ public class TimePlaceholderUtils {
/**
* Prefix of the position to be replaced
*/
public
static
final
String
placeholderPrefix
=
"$["
;
public
static
final
String
PLACEHOLDER_PREFIX
=
"$["
;
/**
* The suffix of the position to be replaced
*/
public
static
final
String
placeholderSuffix
=
"]"
;
public
static
final
String
PLACEHOLDER_SUFFIX
=
"]"
;
/**
* Replaces all placeholders of format {@code ${name}} with the value returned
...
...
@@ -66,7 +66,7 @@ public class TimePlaceholderUtils {
* be ignored ({@code true}) or cause an exception ({@code false})
*/
private
static
PropertyPlaceholderHelper
getPropertyPlaceholderHelper
(
boolean
ignoreUnresolvablePlaceholders
)
{
return
new
PropertyPlaceholderHelper
(
placeholderPrefix
,
placeholderSuffix
,
null
,
ignoreUnresolvablePlaceholders
);
return
new
PropertyPlaceholderHelper
(
PLACEHOLDER_PREFIX
,
PLACEHOLDER_SUFFIX
,
null
,
ignoreUnresolvablePlaceholders
);
}
/**
...
...
@@ -503,7 +503,7 @@ public class TimePlaceholderUtils {
* @return calculate need minutes
*/
public
static
Integer
calcMinutes
(
String
minuteExpression
)
{
int
index
=
minuteExpression
.
indexOf
(
"/"
);
int
index
=
minuteExpression
.
indexOf
(
'/'
);
String
calcExpression
;
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
浏览文件 @
d83a94a3
...
...
@@ -84,9 +84,9 @@ public class TaskRecordDao {
Class
.
forName
(
driver
);
conn
=
DriverManager
.
getConnection
(
url
,
username
,
password
);
}
catch
(
ClassNotFoundException
e
)
{
logger
.
error
(
"Exception "
,
e
);
logger
.
error
(
"
Class not found
Exception "
,
e
);
}
catch
(
SQLException
e
)
{
logger
.
error
(
"Exception "
,
e
);
logger
.
error
(
"
SQL
Exception "
,
e
);
}
return
conn
;
}
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
浏览文件 @
d83a94a3
...
...
@@ -78,7 +78,7 @@ public class TaskInstanceMapperTest {
TaskInstance
taskInstance
=
insertOne
();
//update
int
update
=
taskInstanceMapper
.
updateById
(
taskInstance
);
Assert
.
assertEquals
(
update
,
1
);
Assert
.
assertEquals
(
1
,
update
);
taskInstanceMapper
.
deleteById
(
taskInstance
.
getId
());
}
...
...
@@ -89,7 +89,7 @@ public class TaskInstanceMapperTest {
public
void
testDelete
(){
TaskInstance
taskInstance
=
insertOne
();
int
delete
=
taskInstanceMapper
.
deleteById
(
taskInstance
.
getId
());
Assert
.
assertEquals
(
delete
,
1
);
Assert
.
assertEquals
(
1
,
delete
);
}
/**
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TenantMapperTest.java
浏览文件 @
d83a94a3
...
...
@@ -64,7 +64,7 @@ public class TenantMapperTest {
tenant
.
setUpdateTime
(
new
Date
());
//update
int
update
=
tenantMapper
.
updateById
(
tenant
);
Assert
.
assertEquals
(
update
,
1
);
Assert
.
assertEquals
(
1
,
update
);
tenantMapper
.
deleteById
(
tenant
.
getId
());
}
...
...
@@ -75,7 +75,7 @@ public class TenantMapperTest {
public
void
testDelete
(){
Tenant
tenant
=
insertOne
();
int
delete
=
tenantMapper
.
deleteById
(
tenant
.
getId
());
Assert
.
assertEquals
(
delete
,
1
);
Assert
.
assertEquals
(
1
,
delete
);
}
/**
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
浏览文件 @
d83a94a3
...
...
@@ -230,7 +230,7 @@ public class TaskScheduleThread implements Runnable {
taskInstance
.
getId
()
+
".log"
;
}
}
catch
(
Exception
e
){
logger
.
error
(
"logger
"
+
e
);
logger
.
error
(
"logger
{}"
,
e
.
getMessage
(),
e
);
logPath
=
""
;
}
return
logPath
;
...
...
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
浏览文件 @
d83a94a3
...
...
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.queue;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.utils.CollectionUtils
;
import
org.apache.dolphinscheduler.common.utils.IpUtils
;
import
org.apache.dolphinscheduler.common.utils.OSUtils
;
import
org.apache.dolphinscheduler.service.zk.ZookeeperOperator
;
...
...
@@ -67,8 +68,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
@Override
public
List
<
String
>
getAllTasks
(
String
key
)
{
try
{
List
<
String
>
list
=
zookeeperOperator
.
getChildrenKeys
(
getTasksPath
(
key
));
return
list
;
return
zookeeperOperator
.
getChildrenKeys
(
getTasksPath
(
key
));
}
catch
(
Exception
e
)
{
logger
.
error
(
"get all tasks from tasks queue exception"
,
e
);
}
...
...
@@ -141,7 +141,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
try
{
List
<
String
>
list
=
zookeeperOperator
.
getChildrenKeys
(
getTasksPath
(
key
));
if
(
list
!=
null
&&
list
.
size
()
>
0
){
if
(
CollectionUtils
.
isNotEmpty
(
list
)
){
String
workerIp
=
OSUtils
.
getHost
();
String
workerIpLongStr
=
String
.
valueOf
(
IpUtils
.
ipToLong
(
workerIp
));
...
...
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskQueueZKImplTest.java
浏览文件 @
d83a94a3
...
...
@@ -58,11 +58,11 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
init
();
// get all
List
<
String
>
allTasks
=
tasksQueue
.
getAllTasks
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
);
assertEquals
(
allTasks
.
size
(),
2
);
assertEquals
(
2
,
allTasks
.
size
()
);
//delete all
tasksQueue
.
delete
();
allTasks
=
tasksQueue
.
getAllTasks
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
);
assertEquals
(
allTasks
.
size
(),
0
);
assertEquals
(
0
,
allTasks
.
size
()
);
}
@Test
public
void
hasTask
(){
...
...
@@ -126,10 +126,10 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
//add
init
();
List
<
String
>
taskList
=
tasksQueue
.
poll
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
2
);
assertEquals
(
taskList
.
size
(),
2
);
assertEquals
(
2
,
taskList
.
size
()
);
assertEquals
(
taskList
.
get
(
0
),
"0_1_1_1_-1"
);
assertEquals
(
taskList
.
get
(
1
),
"1_0_1_1_-1"
);
assertEquals
(
"0_1_1_1_-1"
,
taskList
.
get
(
0
)
);
assertEquals
(
"1_0_1_1_-1"
,
taskList
.
get
(
1
)
);
}
/**
...
...
@@ -153,7 +153,7 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
String
task
=
"1_0_1_1_-1"
;
tasksQueue
.
sadd
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
task
);
//check size
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
(),
1
);
assertEquals
(
1
,
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
()
);
}
...
...
@@ -166,10 +166,10 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
String
task
=
"1_0_1_1_-1"
;
tasksQueue
.
sadd
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
task
);
//check size
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
(),
1
);
assertEquals
(
1
,
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
()
);
//remove and get size
tasksQueue
.
srem
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
task
);
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
(),
0
);
assertEquals
(
0
,
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
()
);
}
/**
...
...
@@ -179,17 +179,17 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
public
void
smembers
(){
//first init
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
(),
0
);
assertEquals
(
0
,
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
()
);
//add
String
task
=
"1_0_1_1_-1"
;
tasksQueue
.
sadd
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
task
);
//check size
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
(),
1
);
assertEquals
(
1
,
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
()
);
//add
task
=
"0_1_1_1_"
;
tasksQueue
.
sadd
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
task
);
//check size
assertEquals
(
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
(),
2
);
assertEquals
(
2
,
tasksQueue
.
smembers
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
).
size
()
);
}
...
...
@@ -222,7 +222,7 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
}
String
node1
=
tasksQueue
.
poll
(
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
1
).
get
(
0
);
assertEquals
(
node1
,
"0"
);
assertEquals
(
"0"
,
node1
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录