Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
pentaLiker
DolphinScheduler
提交
eef8bf4e
DolphinScheduler
项目概览
pentaLiker
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
eef8bf4e
编写于
3月 26, 2020
作者:
G
gabry.wu
提交者:
GitHub
3月 26, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Adapting partial code(file name start with P #2) to the sonar cloud rule (#2260)
上级
d8cba5ff
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
94 addition
and
96 deletion
+94
-96
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
...inscheduler/api/controller/ProcessInstanceController.java
+1
-1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
...java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
+0
-3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
...olphinscheduler/api/service/ProcessDefinitionService.java
+5
-8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
.../dolphinscheduler/api/service/ProcessInstanceService.java
+7
-8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java
...uler/common/utils/process/ProcessEnvironmentForWin32.java
+13
-7
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java
...inscheduler/common/utils/process/ProcessImplForWin32.java
+56
-57
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
...g/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+1
-1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapperTest.java
...hinscheduler/dao/mapper/ProcessInstanceMapMapperTest.java
+3
-3
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
...olphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
+7
-7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
...lphinscheduler/server/master/runner/MasterExecThread.java
+1
-1
未找到文件。
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
浏览文件 @
eef8bf4e
...
...
@@ -391,7 +391,7 @@ public class ProcessInstanceController extends BaseController{
}
}
}
if
(
deleteFailedIdList
.
size
()
>
0
){
if
(
!
deleteFailedIdList
.
isEmpty
()
){
putMsg
(
result
,
Status
.
BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR
,
String
.
join
(
","
,
deleteFailedIdList
));
}
else
{
putMsg
(
result
,
Status
.
SUCCESS
);
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
浏览文件 @
eef8bf4e
...
...
@@ -106,9 +106,6 @@ public class ProcessMeta {
*/
private
String
scheduleWorkerGroupName
;
public
ProcessMeta
()
{
}
public
String
getProjectName
()
{
return
projectName
;
}
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
浏览文件 @
eef8bf4e
...
...
@@ -148,7 +148,7 @@ public class ProcessDefinitionService extends BaseDAGService {
//custom global params
List
<
Property
>
globalParamsList
=
processData
.
getGlobalParams
();
if
(
globalParamsList
!=
null
&&
globalParamsList
.
size
()
>
0
)
{
if
(
CollectionUtils
.
isNotEmpty
(
globalParamsList
)
)
{
Set
<
Property
>
globalParamsSet
=
new
HashSet
<>(
globalParamsList
);
globalParamsList
=
new
ArrayList
<>(
globalParamsSet
);
processDefine
.
setGlobalParamList
(
globalParamsList
);
...
...
@@ -314,7 +314,7 @@ public class ProcessDefinitionService extends BaseDAGService {
//custom global params
List
<
Property
>
globalParamsList
=
new
ArrayList
<>();
if
(
processData
.
getGlobalParams
()
!=
null
&&
processData
.
getGlobalParams
().
size
()
>
0
)
{
if
(
CollectionUtils
.
isNotEmpty
(
processData
.
getGlobalParams
())
)
{
Set
<
Property
>
userDefParamsSet
=
new
HashSet
<>(
processData
.
getGlobalParams
());
globalParamsList
=
new
ArrayList
<>(
userDefParamsSet
);
}
...
...
@@ -453,12 +453,11 @@ public class ProcessDefinitionService extends BaseDAGService {
ProcessDefinition
processDefinition
=
processDefineMapper
.
selectById
(
id
);
switch
(
state
)
{
case
ONLINE:
{
case
ONLINE:
processDefinition
.
setReleaseState
(
state
);
processDefineMapper
.
updateById
(
processDefinition
);
break
;
}
case
OFFLINE:
{
case
OFFLINE:
processDefinition
.
setReleaseState
(
state
);
processDefineMapper
.
updateById
(
processDefinition
);
List
<
Schedule
>
scheduleList
=
scheduleMapper
.
selectAllByProcessDefineArray
(
...
...
@@ -473,11 +472,9 @@ public class ProcessDefinitionService extends BaseDAGService {
SchedulerService
.
deleteSchedule
(
project
.
getId
(),
schedule
.
getId
());
}
break
;
}
default
:
{
default
:
putMsg
(
result
,
Status
.
REQUEST_PARAMS_NOT_VALID_ERROR
,
"releaseState"
);
return
result
;
}
}
putMsg
(
result
,
Status
.
SUCCESS
);
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
浏览文件 @
eef8bf4e
...
...
@@ -239,7 +239,7 @@ public class ProcessInstanceService extends BaseDAGService {
}
ProcessInstance
processInstance
=
processService
.
findProcessInstanceDetailById
(
processId
);
List
<
TaskInstance
>
taskInstanceList
=
processService
.
findValidTaskListByProcessId
(
processId
);
A
ddDependResultForTaskList
(
taskInstanceList
);
a
ddDependResultForTaskList
(
taskInstanceList
);
Map
<
String
,
Object
>
resultMap
=
new
HashMap
<>();
resultMap
.
put
(
PROCESS_INSTANCE_STATE
,
processInstance
.
getState
().
toString
());
resultMap
.
put
(
TASK_LIST
,
taskInstanceList
);
...
...
@@ -253,9 +253,9 @@ public class ProcessInstanceService extends BaseDAGService {
* add dependent result for dependent task
* @param taskInstanceList
*/
private
void
A
ddDependResultForTaskList
(
List
<
TaskInstance
>
taskInstanceList
)
throws
IOException
{
private
void
a
ddDependResultForTaskList
(
List
<
TaskInstance
>
taskInstanceList
)
throws
IOException
{
for
(
TaskInstance
taskInstance:
taskInstanceList
){
if
(
taskInstance
.
getTaskType
().
toUpperCase
().
equals
(
TaskType
.
DEPENDENT
.
toString
())){
if
(
taskInstance
.
getTaskType
().
equalsIgnoreCase
(
TaskType
.
DEPENDENT
.
toString
())){
Result
logResult
=
loggerService
.
queryLog
(
taskInstance
.
getId
(),
0
,
4098
);
if
(
logResult
.
getCode
()
==
Status
.
SUCCESS
.
ordinal
()){
...
...
@@ -414,11 +414,10 @@ public class ProcessInstanceService extends BaseDAGService {
processInstance
.
setProcessInstanceJson
(
processInstanceJson
);
processInstance
.
setGlobalParams
(
globalParams
);
}
// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
// globalParams, schedule, flag, locations, connects);
int
update
=
processService
.
updateProcessInstance
(
processInstance
);
int
updateDefine
=
1
;
if
(
syncDefine
&&
StringUtils
.
isNotEmpty
(
processInstanceJson
))
{
if
(
Boolean
.
TRUE
.
equals
(
syncDefine
)
&&
StringUtils
.
isNotEmpty
(
processInstanceJson
))
{
processDefinition
.
setProcessDefinitionJson
(
processInstanceJson
);
processDefinition
.
setGlobalParams
(
originDefParams
);
processDefinition
.
setLocations
(
locations
);
...
...
@@ -544,7 +543,7 @@ public class ProcessInstanceService extends BaseDAGService {
nodeValueSb
.
append
(
ipSb
);
}
logger
.
info
(
"delete task queue node : {}"
,
nodeValueSb
.
toString
()
);
logger
.
info
(
"delete task queue node : {}"
,
nodeValueSb
);
tasksQueue
.
removeNode
(
org
.
apache
.
dolphinscheduler
.
common
.
Constants
.
DOLPHINSCHEDULER_TASKS_QUEUE
,
nodeValueSb
.
toString
());
}
...
...
@@ -621,7 +620,7 @@ public class ProcessInstanceService extends BaseDAGService {
Map
<
String
,
Object
>
localParamsMap
=
new
HashMap
<>();
localParamsMap
.
put
(
"taskType"
,
taskNode
.
getType
());
localParamsMap
.
put
(
"localParamsList"
,
localParamsList
);
if
(
localParamsList
.
size
()
>
0
)
{
if
(
CollectionUtils
.
isNotEmpty
(
localParamsList
)
)
{
localUserDefParams
.
put
(
taskNode
.
getName
(),
localParamsMap
);
}
}
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java
浏览文件 @
eef8bf4e
...
...
@@ -46,22 +46,23 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
return
(
String
)
o
;
}
@Override
public
String
put
(
String
key
,
String
value
)
{
return
super
.
put
(
validateName
(
key
),
validateValue
(
value
));
}
@Override
public
String
get
(
Object
key
)
{
return
super
.
get
(
nonNullString
(
key
));
}
@Override
public
boolean
containsKey
(
Object
key
)
{
return
super
.
containsKey
(
nonNullString
(
key
));
}
@Override
public
boolean
containsValue
(
Object
value
)
{
return
super
.
containsValue
(
nonNullString
(
value
));
}
@Override
public
String
remove
(
Object
key
)
{
return
super
.
remove
(
nonNullString
(
key
));
}
...
...
@@ -92,6 +93,7 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
public
Entry
<
String
,
String
>
next
()
{
return
new
CheckedEntry
(
i
.
next
());
}
@Override
public
void
remove
()
{
i
.
remove
();}
};
}
...
...
@@ -110,10 +112,14 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
private
final
Collection
<
String
>
c
;
public
CheckedValues
(
Collection
<
String
>
c
)
{
this
.
c
=
c
;}
public
int
size
()
{
return
c
.
size
();}
@Override
public
boolean
isEmpty
()
{
return
c
.
isEmpty
();}
@Override
public
void
clear
()
{
c
.
clear
();}
public
Iterator
<
String
>
iterator
()
{
return
c
.
iterator
();}
@Override
public
boolean
contains
(
Object
o
)
{
return
c
.
contains
(
nonNullString
(
o
));}
@Override
public
boolean
remove
(
Object
o
)
{
return
c
.
remove
(
nonNullString
(
o
));}
}
...
...
@@ -127,15 +133,15 @@ final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
public
boolean
contains
(
Object
o
)
{
return
s
.
contains
(
nonNullString
(
o
));}
public
boolean
remove
(
Object
o
)
{
return
s
.
remove
(
nonNullString
(
o
));}
}
@Override
public
Set
<
String
>
keySet
()
{
return
new
CheckedKeySet
(
super
.
keySet
());
}
@Override
public
Collection
<
String
>
values
()
{
return
new
CheckedValues
(
super
.
values
());
}
@Override
public
Set
<
Entry
<
String
,
String
>>
entrySet
()
{
return
new
CheckedEntrySet
(
super
.
entrySet
());
}
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java
浏览文件 @
eef8bf4e
...
...
@@ -113,7 +113,7 @@ public class ProcessImplForWin32 extends Process {
// System-dependent portion of ProcessBuilderForWindows.start()
static
Process
start
(
String
username
,
String
password
,
String
cmdarray
[]
,
String
[]
cmdarray
,
java
.
util
.
Map
<
String
,
String
>
environment
,
String
dir
,
ProcessBuilderForWin32
.
Redirect
[]
redirects
,
...
...
@@ -178,10 +178,10 @@ public class ProcessImplForWin32 extends Process {
private
static
class
LazyPattern
{
// Escape-support version:
// "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)"
;
// "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)"
private
static
final
Pattern
PATTERN
=
Pattern
.
compile
(
"[^\\s\"]+|\"[^\"]*\""
);
}
;
}
/* Parses the command string parameter into the executable name and
* program arguments.
...
...
@@ -204,7 +204,7 @@ public class ProcessImplForWin32 extends Process {
private
static
final
int
VERIFICATION_LEGACY
=
3
;
// See Command shell overview for documentation of special characters.
// https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb490954(v=technet.10)
private
static
final
char
ESCAPE_VERIFICATION
[][]
=
{
private
static
final
char
[][]
ESCAPE_VERIFICATION
=
{
// We guarantee the only command file execution for implicit [cmd.exe] run.
// http://technet.microsoft.com/en-us/library/bb490954.aspx
{
' '
,
'\t'
,
'<'
,
'>'
,
'&'
,
'|'
,
'^'
},
...
...
@@ -215,7 +215,7 @@ public class ProcessImplForWin32 extends Process {
private
static
String
createCommandLine
(
int
verificationType
,
final
String
executablePath
,
final
String
cmd
[]
)
final
String
[]
cmd
)
{
StringBuilder
cmdbuf
=
new
StringBuilder
(
80
);
...
...
@@ -310,7 +310,7 @@ public class ProcessImplForWin32 extends Process {
}
if
(!
argIsQuoted
)
{
char
testEscape
[]
=
ESCAPE_VERIFICATION
[
verificationType
];
char
[]
testEscape
=
ESCAPE_VERIFICATION
[
verificationType
];
for
(
int
i
=
0
;
i
<
testEscape
.
length
;
++
i
)
{
if
(
arg
.
indexOf
(
testEscape
[
i
])
>=
0
)
{
return
true
;
...
...
@@ -391,14 +391,14 @@ public class ProcessImplForWin32 extends Process {
private
static
final
char
BACKSLASH
=
'\\'
;
private
WinNT
.
HANDLE
handle
;
private
OutputStream
stdin
_s
tream
;
private
InputStream
stdout
_s
tream
;
private
InputStream
stderr
_s
tream
;
private
OutputStream
stdin
S
tream
;
private
InputStream
stdout
S
tream
;
private
InputStream
stderr
S
tream
;
private
ProcessImplForWin32
(
String
username
,
String
password
,
String
cmd
[]
,
String
[]
cmd
,
final
String
envblock
,
final
String
path
,
final
long
[]
stdHandles
,
...
...
@@ -473,44 +473,44 @@ public class ProcessImplForWin32 extends Process {
new
PrivilegedAction
<
Void
>()
{
public
Void
run
()
{
if
(
stdHandles
[
0
]
==
-
1L
)
stdin
_s
tream
=
ProcessBuilderForWin32
.
NullOutputStream
.
INSTANCE
;
stdin
S
tream
=
ProcessBuilderForWin32
.
NullOutputStream
.
INSTANCE
;
else
{
FileDescriptor
stdin
_f
d
=
new
FileDescriptor
();
setHandle
(
stdin
_f
d
,
stdHandles
[
0
]);
stdin
_s
tream
=
new
BufferedOutputStream
(
new
FileOutputStream
(
stdin
_f
d
));
FileDescriptor
stdin
F
d
=
new
FileDescriptor
();
setHandle
(
stdin
F
d
,
stdHandles
[
0
]);
stdin
S
tream
=
new
BufferedOutputStream
(
new
FileOutputStream
(
stdin
F
d
));
}
if
(
stdHandles
[
1
]
==
-
1L
)
stdout
_s
tream
=
ProcessBuilderForWin32
.
NullInputStream
.
INSTANCE
;
stdout
S
tream
=
ProcessBuilderForWin32
.
NullInputStream
.
INSTANCE
;
else
{
FileDescriptor
stdout
_f
d
=
new
FileDescriptor
();
setHandle
(
stdout
_f
d
,
stdHandles
[
1
]);
stdout
_s
tream
=
new
BufferedInputStream
(
new
FileInputStream
(
stdout
_f
d
));
FileDescriptor
stdout
F
d
=
new
FileDescriptor
();
setHandle
(
stdout
F
d
,
stdHandles
[
1
]);
stdout
S
tream
=
new
BufferedInputStream
(
new
FileInputStream
(
stdout
F
d
));
}
if
(
stdHandles
[
2
]
==
-
1L
)
stderr
_s
tream
=
ProcessBuilderForWin32
.
NullInputStream
.
INSTANCE
;
stderr
S
tream
=
ProcessBuilderForWin32
.
NullInputStream
.
INSTANCE
;
else
{
FileDescriptor
stderr
_f
d
=
new
FileDescriptor
();
setHandle
(
stderr
_f
d
,
stdHandles
[
2
]);
stderr
_stream
=
new
FileInputStream
(
stderr_f
d
);
FileDescriptor
stderr
F
d
=
new
FileDescriptor
();
setHandle
(
stderr
F
d
,
stdHandles
[
2
]);
stderr
Stream
=
new
FileInputStream
(
stderrF
d
);
}
return
null
;
}});
}
public
OutputStream
getOutputStream
()
{
return
stdin
_s
tream
;
return
stdin
S
tream
;
}
public
InputStream
getInputStream
()
{
return
stdout
_s
tream
;
return
stdout
S
tream
;
}
public
InputStream
getErrorStream
()
{
return
stderr
_s
tream
;
return
stderr
S
tream
;
}
protected
void
finalize
()
{
...
...
@@ -558,11 +558,12 @@ public class ProcessImplForWin32 extends Process {
public
void
destroy
()
{
terminateProcess
(
handle
);
}
@Override
public
Process
destroyForcibly
()
{
destroy
();
return
this
;
}
@Override
public
boolean
isAlive
()
{
return
isProcessAlive
(
handle
);
}
...
...
@@ -583,7 +584,7 @@ public class ProcessImplForWin32 extends Process {
pjhandles
.
setValue
(
thisProcessEnd
);
}
}
Kernel32
.
INSTANCE
.
SetHandleInformation
(
phStd
.
getValue
(),
Kernel32
.
HANDLE_FLAG_INHERIT
,
Kernel32
.
HANDLE_FLAG_INHERIT
);
Kernel32
.
INSTANCE
.
SetHandleInformation
(
phStd
.
getValue
(),
WinBase
.
HANDLE_FLAG_INHERIT
,
WinBase
.
HANDLE_FLAG_INHERIT
);
return
true
;
}
...
...
@@ -597,17 +598,17 @@ public class ProcessImplForWin32 extends Process {
private
static
void
prepareIOEHandleState
(
WinNT
.
HANDLE
[]
stdIOE
,
Boolean
[]
inherit
)
{
for
(
int
i
=
0
;
i
<
HANDLE_STORAGE_SIZE
;
++
i
)
{
WinNT
.
HANDLE
hstd
=
stdIOE
[
i
];
if
(!
Kernel32
.
INVALID_HANDLE_VALUE
.
equals
(
hstd
))
{
if
(!
WinBase
.
INVALID_HANDLE_VALUE
.
equals
(
hstd
))
{
inherit
[
i
]
=
Boolean
.
TRUE
;
Kernel32
.
INSTANCE
.
SetHandleInformation
(
hstd
,
Kernel32
.
HANDLE_FLAG_INHERIT
,
0
);
Kernel32
.
INSTANCE
.
SetHandleInformation
(
hstd
,
WinBase
.
HANDLE_FLAG_INHERIT
,
0
);
}
}
}
private
static
void
restoreIOEHandleState
(
WinNT
.
HANDLE
[]
stdIOE
,
Boolean
[]
inherit
)
{
for
(
int
i
=
HANDLE_STORAGE_SIZE
-
1
;
i
>=
0
;
--
i
)
{
if
(!
Kernel32
.
INVALID_HANDLE_VALUE
.
equals
(
stdIOE
[
i
]))
{
Kernel32
.
INSTANCE
.
SetHandleInformation
(
stdIOE
[
i
],
Kernel32
.
HANDLE_FLAG_INHERIT
,
inherit
[
i
]
?
Kernel32
.
HANDLE_FLAG_INHERIT
:
0
);
if
(!
WinBase
.
INVALID_HANDLE_VALUE
.
equals
(
stdIOE
[
i
]))
{
Kernel32
.
INSTANCE
.
SetHandleInformation
(
stdIOE
[
i
],
WinBase
.
HANDLE_FLAG_INHERIT
,
Boolean
.
TRUE
.
equals
(
inherit
[
i
])
?
WinBase
.
HANDLE_FLAG_INHERIT
:
0
);
}
}
}
...
...
@@ -622,12 +623,12 @@ public class ProcessImplForWin32 extends Process {
WinNT
.
HANDLE
ret
=
new
WinNT
.
HANDLE
(
Pointer
.
createConstant
(
0
));
WinNT
.
HANDLE
[]
stdIOE
=
new
WinNT
.
HANDLE
[]
{
Kernel32
.
INVALID_HANDLE_VALUE
,
Kernel32
.
INVALID_HANDLE_VALUE
,
Kernel32
.
INVALID_HANDLE_VALUE
,
WinBase
.
INVALID_HANDLE_VALUE
,
WinBase
.
INVALID_HANDLE_VALUE
,
WinBase
.
INVALID_HANDLE_VALUE
,
stdHandles
[
0
].
getValue
(),
stdHandles
[
1
].
getValue
(),
stdHandles
[
2
].
getValue
()
};
stdIOE
[
0
]
=
Kernel32
.
INSTANCE
.
GetStdHandle
(
Kernel32
.
STD_INPUT_HANDLE
);
stdIOE
[
1
]
=
Kernel32
.
INSTANCE
.
GetStdHandle
(
Kernel32
.
STD_OUTPUT_HANDLE
);
stdIOE
[
2
]
=
Kernel32
.
INSTANCE
.
GetStdHandle
(
Kernel32
.
STD_ERROR_HANDLE
);
stdIOE
[
0
]
=
Kernel32
.
INSTANCE
.
GetStdHandle
(
Wincon
.
STD_INPUT_HANDLE
);
stdIOE
[
1
]
=
Kernel32
.
INSTANCE
.
GetStdHandle
(
Wincon
.
STD_OUTPUT_HANDLE
);
stdIOE
[
2
]
=
Kernel32
.
INSTANCE
.
GetStdHandle
(
Wincon
.
STD_ERROR_HANDLE
);
Boolean
[]
inherit
=
new
Boolean
[]
{
Boolean
.
FALSE
,
Boolean
.
FALSE
,
Boolean
.
FALSE
,
...
...
@@ -639,17 +640,17 @@ public class ProcessImplForWin32 extends Process {
// input
WinNT
.
HANDLEByReference
hStdInput
=
new
WinNT
.
HANDLEByReference
();
WinNT
.
HANDLEByReference
[]
pipeIn
=
new
WinNT
.
HANDLEByReference
[]
{
new
WinNT
.
HANDLEByReference
(
Kernel32
.
INVALID_HANDLE_VALUE
),
new
WinNT
.
HANDLEByReference
(
Kernel32
.
INVALID_HANDLE_VALUE
)
};
new
WinNT
.
HANDLEByReference
(
WinBase
.
INVALID_HANDLE_VALUE
),
new
WinNT
.
HANDLEByReference
(
WinBase
.
INVALID_HANDLE_VALUE
)
};
// output
WinNT
.
HANDLEByReference
hStdOutput
=
new
WinNT
.
HANDLEByReference
();
WinNT
.
HANDLEByReference
[]
pipeOut
=
new
WinNT
.
HANDLEByReference
[]
{
new
WinNT
.
HANDLEByReference
(
Kernel32
.
INVALID_HANDLE_VALUE
),
new
WinNT
.
HANDLEByReference
(
Kernel32
.
INVALID_HANDLE_VALUE
)
};
new
WinNT
.
HANDLEByReference
(
WinBase
.
INVALID_HANDLE_VALUE
),
new
WinNT
.
HANDLEByReference
(
WinBase
.
INVALID_HANDLE_VALUE
)
};
// error
WinNT
.
HANDLEByReference
hStdError
=
new
WinNT
.
HANDLEByReference
();
WinNT
.
HANDLEByReference
[]
pipeError
=
new
WinNT
.
HANDLEByReference
[]
{
new
WinNT
.
HANDLEByReference
(
Kernel32
.
INVALID_HANDLE_VALUE
),
new
WinNT
.
HANDLEByReference
(
Kernel32
.
INVALID_HANDLE_VALUE
)
};
new
WinNT
.
HANDLEByReference
(
WinBase
.
INVALID_HANDLE_VALUE
),
new
WinNT
.
HANDLEByReference
(
WinBase
.
INVALID_HANDLE_VALUE
)
};
boolean
success
;
if
(
initHolder
(
stdHandles
[
0
],
pipeIn
,
OFFSET_READ
,
hStdInput
))
{
...
...
@@ -669,8 +670,8 @@ public class ProcessImplForWin32 extends Process {
if
(
success
)
{
WTypes
.
LPSTR
lpEnvironment
=
envblock
==
null
?
new
WTypes
.
LPSTR
()
:
new
WTypes
.
LPSTR
(
envblock
);
Kernel32
.
PROCESS_INFORMATION
pi
=
new
WinBase
.
PROCESS_INFORMATION
();
si
.
dwFlags
=
Kernel32
.
STARTF_USESTDHANDLES
;
WinBase
.
PROCESS_INFORMATION
pi
=
new
WinBase
.
PROCESS_INFORMATION
();
si
.
dwFlags
=
WinBase
.
STARTF_USESTDHANDLES
;
if
(!
Advapi32
.
INSTANCE
.
CreateProcessWithLogonW
(
username
,
null
...
...
@@ -678,7 +679,7 @@ public class ProcessImplForWin32 extends Process {
,
Advapi32
.
LOGON_WITH_PROFILE
,
null
,
cmd
,
Kernel32
.
CREATE_NO_WINDOW
,
WinBase
.
CREATE_NO_WINDOW
,
lpEnvironment
.
getPointer
()
,
path
,
si
...
...
@@ -710,13 +711,11 @@ public class ProcessImplForWin32 extends Process {
for
(
int
i
=
0
;
i
<
stdHandles
.
length
;
i
++)
{
handles
[
i
]
=
new
WinNT
.
HANDLEByReference
(
new
WinNT
.
HANDLE
(
Pointer
.
createConstant
(
stdHandles
[
i
])));
}
if
(
cmd
!=
null
)
{
if
(
username
!=
null
&&
password
!=
null
)
{
ret
=
processCreate
(
username
,
password
,
cmd
,
envblock
,
path
,
handles
,
redirectErrorStream
);
}
if
(
cmd
!=
null
&&
username
!=
null
&&
password
!=
null
)
{
ret
=
processCreate
(
username
,
password
,
cmd
,
envblock
,
path
,
handles
,
redirectErrorStream
);
}
for
(
int
i
=
0
;
i
<
stdHandles
.
length
;
i
++)
{
stdHandles
[
i
]
=
handles
[
i
].
getPointer
().
getLong
(
0
);
}
...
...
@@ -756,15 +755,15 @@ public class ProcessImplForWin32 extends Process {
* @return the native HANDLE
*/
private
static
long
openForAtomicAppend
(
String
path
)
throws
IOException
{
int
access
=
Kernel32
.
GENERIC_READ
|
Kernel32
.
GENERIC_WRITE
;
int
sharing
=
Kernel32
.
FILE_SHARE_READ
|
Kernel32
.
FILE_SHARE_WRITE
;
int
disposition
=
Kernel32
.
OPEN_ALWAYS
;
int
flagsAndAttributes
=
Kernel32
.
FILE_ATTRIBUTE_NORMAL
;
int
access
=
WinNT
.
GENERIC_READ
|
WinNT
.
GENERIC_WRITE
;
int
sharing
=
WinNT
.
FILE_SHARE_READ
|
WinNT
.
FILE_SHARE_WRITE
;
int
disposition
=
WinNT
.
OPEN_ALWAYS
;
int
flagsAndAttributes
=
WinNT
.
FILE_ATTRIBUTE_NORMAL
;
if
(
path
==
null
||
path
.
isEmpty
())
{
return
-
1
;
}
else
{
WinNT
.
HANDLE
handle
=
Kernel32
.
INSTANCE
.
CreateFile
(
path
,
access
,
sharing
,
null
,
disposition
,
flagsAndAttributes
,
null
);
if
(
handle
==
Kernel32
.
INVALID_HANDLE_VALUE
)
{
if
(
handle
==
WinBase
.
INVALID_HANDLE_VALUE
)
{
throw
new
Win32Exception
(
Kernel32
.
INSTANCE
.
GetLastError
());
}
return
handle
.
getPointer
().
getLong
(
0
);
...
...
@@ -772,15 +771,15 @@ public class ProcessImplForWin32 extends Process {
}
private
static
void
waitForInterruptibly
(
WinNT
.
HANDLE
handle
)
{
int
result
=
Kernel32
.
INSTANCE
.
WaitForMultipleObjects
(
1
,
new
WinNT
.
HANDLE
[]{
handle
},
false
,
Kernel32
.
INFINITE
);
if
(
result
==
Kernel32
.
WAIT_FAILED
)
{
int
result
=
Kernel32
.
INSTANCE
.
WaitForMultipleObjects
(
1
,
new
WinNT
.
HANDLE
[]{
handle
},
false
,
WinBase
.
INFINITE
);
if
(
result
==
WinBase
.
WAIT_FAILED
)
{
throw
new
Win32Exception
(
Kernel32
.
INSTANCE
.
GetLastError
());
}
}
private
static
void
waitForTimeoutInterruptibly
(
WinNT
.
HANDLE
handle
,
long
timeout
)
{
int
result
=
Kernel32
.
INSTANCE
.
WaitForMultipleObjects
(
1
,
new
WinNT
.
HANDLE
[]{
handle
},
false
,
(
int
)
timeout
);
if
(
result
==
Kernel32
.
WAIT_FAILED
)
{
if
(
result
==
WinBase
.
WAIT_FAILED
)
{
throw
new
Win32Exception
(
Kernel32
.
INSTANCE
.
GetLastError
());
}
}
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
浏览文件 @
eef8bf4e
...
...
@@ -366,7 +366,7 @@ public class ProcessInstance {
}
public
boolean
I
sProcessInstanceStop
(){
public
boolean
i
sProcessInstanceStop
(){
return
this
.
state
.
typeIsFinished
();
}
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapperTest.java
浏览文件 @
eef8bf4e
...
...
@@ -60,7 +60,7 @@ public class ProcessInstanceMapMapperTest {
//update
processInstanceMap
.
setParentProcessInstanceId
(
1
);
int
update
=
processInstanceMapMapper
.
updateById
(
processInstanceMap
);
Assert
.
assertEquals
(
update
,
1
);
Assert
.
assertEquals
(
1
,
update
);
processInstanceMapMapper
.
deleteById
(
processInstanceMap
.
getId
());
}
...
...
@@ -71,7 +71,7 @@ public class ProcessInstanceMapMapperTest {
public
void
testDelete
(){
ProcessInstanceMap
processInstanceMap
=
insertOne
();
int
delete
=
processInstanceMapMapper
.
deleteById
(
processInstanceMap
.
getId
());
Assert
.
assertEquals
(
delete
,
1
);
Assert
.
assertEquals
(
1
,
delete
);
}
/**
...
...
@@ -132,7 +132,7 @@ public class ProcessInstanceMapMapperTest {
int
delete
=
processInstanceMapMapper
.
deleteByParentProcessId
(
processInstanceMap
.
getParentProcessInstanceId
()
);
Assert
.
assertEquals
(
delete
,
1
);
Assert
.
assertEquals
(
1
,
delete
);
}
/**
...
...
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
浏览文件 @
eef8bf4e
...
...
@@ -74,7 +74,7 @@ public class ProcessInstanceMapperTest {
ProcessInstance
processInstanceMap
=
insertOne
();
//update
int
update
=
processInstanceMapper
.
updateById
(
processInstanceMap
);
Assert
.
assertEquals
(
update
,
1
);
Assert
.
assertEquals
(
1
,
update
);
processInstanceMapper
.
deleteById
(
processInstanceMap
.
getId
());
}
...
...
@@ -85,7 +85,7 @@ public class ProcessInstanceMapperTest {
public
void
testDelete
(){
ProcessInstance
processInstanceMap
=
insertOne
();
int
delete
=
processInstanceMapper
.
deleteById
(
processInstanceMap
.
getId
());
Assert
.
assertEquals
(
delete
,
1
);
Assert
.
assertEquals
(
1
,
delete
);
}
/**
...
...
@@ -197,7 +197,7 @@ public class ProcessInstanceMapperTest {
Assert
.
assertNotEquals
(
update
,
0
);
processInstance
=
processInstanceMapper
.
selectById
(
processInstance
.
getId
());
Assert
.
assert
Equals
(
processInstance
.
getHost
(),
null
);
Assert
.
assert
Null
(
processInstance
.
getHost
()
);
processInstanceMapper
.
deleteById
(
processInstance
.
getId
());
}
...
...
@@ -217,7 +217,7 @@ public class ProcessInstanceMapperTest {
ProcessInstance
processInstance1
=
processInstanceMapper
.
selectById
(
processInstance
.
getId
());
processInstanceMapper
.
deleteById
(
processInstance
.
getId
());
Assert
.
assertEquals
(
processInstance1
.
getState
(),
ExecutionStatus
.
SUCCESS
);
Assert
.
assertEquals
(
ExecutionStatus
.
SUCCESS
,
processInstance1
.
getState
()
);
}
...
...
@@ -261,10 +261,10 @@ public class ProcessInstanceMapperTest {
List
<
ProcessInstance
>
processInstances
=
processInstanceMapper
.
queryByProcessDefineId
(
processInstance
.
getProcessDefinitionId
(),
1
);
Assert
.
assertEquals
(
processInstances
.
size
(),
1
);
Assert
.
assertEquals
(
1
,
processInstances
.
size
()
);
processInstances
=
processInstanceMapper
.
queryByProcessDefineId
(
processInstance
.
getProcessDefinitionId
(),
2
);
Assert
.
assertEquals
(
processInstances
.
size
(),
2
);
Assert
.
assertEquals
(
2
,
processInstances
.
size
()
);
processInstanceMapper
.
deleteById
(
processInstance
.
getId
());
processInstanceMapper
.
deleteById
(
processInstance1
.
getId
());
...
...
@@ -320,7 +320,7 @@ public class ProcessInstanceMapperTest {
start
=
new
Date
(
2019
-
1900
,
1
-
1
,
01
,
1
,
0
,
0
);
processInstance1
=
processInstanceMapper
.
queryLastManualProcess
(
processInstance
.
getProcessDefinitionId
(),
start
,
end
);
Assert
.
assert
Equals
(
processInstance1
,
null
);
Assert
.
assert
Null
(
processInstance1
);
processInstanceMapper
.
deleteById
(
processInstance
.
getId
());
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
浏览文件 @
eef8bf4e
...
...
@@ -937,7 +937,7 @@ public class MasterExecThread implements Runnable {
// submit start node
submitPostNode
(
null
);
boolean
sendTimeWarning
=
false
;
while
(!
processInstance
.
I
sProcessInstanceStop
()){
while
(!
processInstance
.
i
sProcessInstanceStop
()){
// send warning email if process time out.
if
(
!
sendTimeWarning
&&
checkProcessTimeOut
(
processInstance
)
){
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录