Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
4bdb60f2
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
707
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4bdb60f2
编写于
4月 17, 2019
作者:
G
gongzijian
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'upstream/dev-20190415' into dev-20190415
上级
3fde5cef
fb0cc5b3
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
110 addition
and
31 deletion
+110
-31
escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java
...in/java/cn/escheduler/api/controller/UsersController.java
+8
-6
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
...src/main/java/cn/escheduler/api/service/UsersService.java
+11
-2
escheduler-dao/readme.txt
escheduler-dao/readme.txt
+1
-0
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
...duler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
+16
-4
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java
...ao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java
+9
-0
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java
...ain/java/cn/escheduler/dao/mapper/UserMapperProvider.java
+18
-0
escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java
...duler-dao/src/main/java/cn/escheduler/dao/model/User.java
+32
-17
escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java
...rc/test/java/cn/escheduler/dao/mapper/UserMapperTest.java
+6
-0
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
...n/escheduler/server/worker/runner/TaskScheduleThread.java
+9
-2
未找到文件。
escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java
浏览文件 @
4bdb60f2
...
...
@@ -64,13 +64,14 @@ public class UsersController extends BaseController{
@RequestParam
(
value
=
"userName"
)
String
userName
,
@RequestParam
(
value
=
"userPassword"
)
String
userPassword
,
@RequestParam
(
value
=
"tenantId"
)
int
tenantId
,
@RequestParam
(
value
=
"queue"
)
String
queue
,
@RequestParam
(
value
=
"email"
)
String
email
,
@RequestParam
(
value
=
"phone"
,
required
=
false
)
String
phone
)
{
logger
.
info
(
"login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {},
proxyUsers
: {}"
,
loginUser
.
getUserName
(),
userName
,
email
,
tenantId
,
Constants
.
PASSWORD_DEFAULT
,
phone
);
logger
.
info
(
"login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {},
user queue
: {}"
,
loginUser
.
getUserName
(),
userName
,
email
,
tenantId
,
Constants
.
PASSWORD_DEFAULT
,
phone
,
queue
);
try
{
Map
<
String
,
Object
>
result
=
usersService
.
createUser
(
loginUser
,
userName
,
userPassword
,
email
,
tenantId
,
phon
e
);
Map
<
String
,
Object
>
result
=
usersService
.
createUser
(
loginUser
,
userName
,
userPassword
,
email
,
tenantId
,
phone
,
queu
e
);
return
returnDataList
(
result
);
}
catch
(
Exception
e
){
logger
.
error
(
CREATE_USER_ERROR
.
getMsg
(),
e
);
...
...
@@ -127,13 +128,14 @@ public class UsersController extends BaseController{
@RequestParam
(
value
=
"id"
)
int
id
,
@RequestParam
(
value
=
"userName"
)
String
userName
,
@RequestParam
(
value
=
"userPassword"
)
String
userPassword
,
@RequestParam
(
value
=
"queue"
)
String
queue
,
@RequestParam
(
value
=
"email"
)
String
email
,
@RequestParam
(
value
=
"tenantId"
)
int
tenantId
,
@RequestParam
(
value
=
"phone"
,
required
=
false
)
String
phone
)
{
logger
.
info
(
"login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {},
proxyUsers
: {}"
,
loginUser
.
getUserName
(),
userName
,
email
,
tenantId
,
Constants
.
PASSWORD_DEFAULT
,
phone
);
logger
.
info
(
"login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {},
user queue
: {}"
,
loginUser
.
getUserName
(),
userName
,
email
,
tenantId
,
Constants
.
PASSWORD_DEFAULT
,
phone
,
queue
);
try
{
Map
<
String
,
Object
>
result
=
usersService
.
updateUser
(
id
,
userName
,
userPassword
,
email
,
tenantId
,
phone
);
Map
<
String
,
Object
>
result
=
usersService
.
updateUser
(
id
,
userName
,
userPassword
,
email
,
tenantId
,
phone
,
queue
);
return
returnDataList
(
result
);
}
catch
(
Exception
e
){
logger
.
error
(
UPDATE_USER_ERROR
.
getMsg
(),
e
);
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
浏览文件 @
4bdb60f2
...
...
@@ -87,7 +87,8 @@ public class UsersService extends BaseService {
String
userPassword
,
String
email
,
int
tenantId
,
String
phone
)
throws
Exception
{
String
phone
,
String
queue
)
throws
Exception
{
Map
<
String
,
Object
>
result
=
new
HashMap
<>(
5
);
result
=
CheckUtils
.
checkUserParams
(
userName
,
userPassword
,
email
,
phone
);
...
...
@@ -114,6 +115,7 @@ public class UsersService extends BaseService {
user
.
setUserType
(
UserType
.
GENERAL_USER
);
user
.
setCreateTime
(
now
);
user
.
setUpdateTime
(
now
);
user
.
setQueue
(
queue
);
// save user
userMapper
.
insert
(
user
);
...
...
@@ -194,7 +196,13 @@ public class UsersService extends BaseService {
* @param phone
* @return
*/
public
Map
<
String
,
Object
>
updateUser
(
int
userId
,
String
userName
,
String
userPassword
,
String
email
,
int
tenantId
,
String
phone
)
throws
Exception
{
public
Map
<
String
,
Object
>
updateUser
(
int
userId
,
String
userName
,
String
userPassword
,
String
email
,
int
tenantId
,
String
phone
,
String
queue
)
throws
Exception
{
Map
<
String
,
Object
>
result
=
new
HashMap
<>(
5
);
result
.
put
(
Constants
.
STATUS
,
false
);
...
...
@@ -218,6 +226,7 @@ public class UsersService extends BaseService {
if
(
StringUtils
.
isNotEmpty
(
email
))
{
user
.
setEmail
(
email
);
}
user
.
setQueue
(
queue
);
user
.
setPhone
(
phone
);
user
.
setUpdateTime
(
now
);
...
...
escheduler-dao/readme.txt
0 → 100644
浏览文件 @
4bdb60f2
alter table t_escheduler_user add queue varchar(64);
\ No newline at end of file
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
浏览文件 @
4bdb60f2
...
...
@@ -59,7 +59,7 @@ public class ProcessDao extends AbstractBaseDao {
ExecutionStatus
.
READY_STOP
.
ordinal
()};
@Autowired
private
ProjectMapper
project
Mapper
;
private
UserMapper
user
Mapper
;
@Autowired
private
ProcessDefinitionMapper
processDefineMapper
;
...
...
@@ -102,7 +102,7 @@ public class ProcessDao extends AbstractBaseDao {
*/
@Override
protected
void
init
()
{
projectMapper
=
getMapper
(
Project
Mapper
.
class
);
userMapper
=
getMapper
(
User
Mapper
.
class
);
processDefineMapper
=
getMapper
(
ProcessDefinitionMapper
.
class
);
processInstanceMapper
=
getMapper
(
ProcessInstanceMapper
.
class
);
dataSourceMapper
=
getMapper
(
DataSourceMapper
.
class
);
...
...
@@ -261,7 +261,7 @@ public class ProcessDao extends AbstractBaseDao {
public
ProcessInstance
findProcessInstanceByScheduleTime
(
int
defineId
,
Date
scheduleTime
){
return
processInstanceMapper
.
queryByScheduleTime
(
defineId
,
DateUtils
.
dateToString
(
scheduleTime
),
0
,
null
,
null
);
DateUtils
.
dateToString
(
scheduleTime
),
0
,
null
,
null
);
}
/**
...
...
@@ -1210,7 +1210,7 @@ public class ProcessDao extends AbstractBaseDao {
public
int
updateProcessInstance
(
Integer
processInstanceId
,
String
processJson
,
String
globalParams
,
Date
scheduleTime
,
Flag
flag
,
String
locations
,
String
connects
){
return
processInstanceMapper
.
updateProcessInstance
(
processInstanceId
,
processJson
,
return
processInstanceMapper
.
updateProcessInstance
(
processInstanceId
,
processJson
,
globalParams
,
scheduleTime
,
locations
,
connects
,
flag
);
}
...
...
@@ -1554,4 +1554,16 @@ public class ProcessDao extends AbstractBaseDao {
DateUtils
.
dateToString
(
dateInterval
.
getEndTime
()),
stateArray
);
}
/**
* query user queue by process instance id
* @param processInstanceId
* @return
*/
public
String
queryQueueByProcessInstanceId
(
int
processInstanceId
){
return
userMapper
.
queryQueueByProcessInstanceId
(
processInstanceId
);
}
}
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java
浏览文件 @
4bdb60f2
...
...
@@ -222,4 +222,13 @@ public interface UserMapper {
})
@SelectProvider
(
type
=
UserMapperProvider
.
class
,
method
=
"queryTenantCodeByUserId"
)
User
queryTenantCodeByUserId
(
@Param
(
"userId"
)
int
userId
);
/**
* query user queue by process instance id
* @param processInstanceId
* @return
*/
@SelectProvider
(
type
=
UserMapperProvider
.
class
,
method
=
"queryQueueByProcessInstanceId"
)
String
queryQueueByProcessInstanceId
(
@Param
(
"processInstanceId"
)
int
processInstanceId
);
}
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java
浏览文件 @
4bdb60f2
...
...
@@ -47,6 +47,7 @@ public class UserMapperProvider {
VALUES
(
"`phone`"
,
"#{user.phone}"
);
VALUES
(
"`user_type`"
,
EnumFieldUtil
.
genFieldStr
(
"user.userType"
,
UserType
.
class
));
VALUES
(
"`tenant_id`"
,
"#{user.tenantId}"
);
VALUES
(
"`queue`"
,
"#{user.queue}"
);
VALUES
(
"`create_time`"
,
"#{user.createTime}"
);
VALUES
(
"`update_time`"
,
"#{user.updateTime}"
);
}
...
...
@@ -86,6 +87,7 @@ public class UserMapperProvider {
SET
(
"`phone`=#{user.phone}"
);
SET
(
"`user_type`="
+
EnumFieldUtil
.
genFieldStr
(
"user.userType"
,
UserType
.
class
));
SET
(
"`tenant_id`=#{user.tenantId}"
);
SET
(
"`queue`=#{user.queue}"
);
SET
(
"`create_time`=#{user.createTime}"
);
SET
(
"`update_time`=#{user.updateTime}"
);
...
...
@@ -247,4 +249,20 @@ public class UserMapperProvider {
}.
toString
();
}
/**
* query tenant code by user id
* @param parameter
* @return
*/
public
String
queryQueueByProcessInstanceId
(
Map
<
String
,
Object
>
parameter
)
{
return
new
SQL
()
{
{
SELECT
(
"queue"
);
FROM
(
TABLE_NAME
+
" u,t_escheduler_process_instance p"
);
WHERE
(
"u.id = p.executor_id and p.id=#{processInstanceId}"
);
}
}.
toString
();
}
}
escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java
浏览文件 @
4bdb60f2
...
...
@@ -79,6 +79,12 @@ public class User {
* alert group
*/
private
String
alertGroup
;
/**
* user specified queue
*/
private
String
queue
;
/**
* create time
*/
...
...
@@ -194,23 +200,12 @@ public class User {
this
.
tenantCode
=
tenantCode
;
}
@Override
public
String
toString
()
{
return
"User{"
+
"id="
+
id
+
", userName='"
+
userName
+
'\''
+
", userPassword='"
+
userPassword
+
'\''
+
", email='"
+
email
+
'\''
+
", phone='"
+
phone
+
'\''
+
", userType="
+
userType
+
", tenantId="
+
tenantId
+
", tenantCode='"
+
tenantCode
+
'\''
+
", tenantName='"
+
tenantName
+
'\''
+
", queueName='"
+
queueName
+
'\''
+
", alertGroup='"
+
alertGroup
+
'\''
+
", createTime="
+
createTime
+
", updateTime="
+
updateTime
+
'}'
;
public
String
getQueue
()
{
return
queue
;
}
public
void
setQueue
(
String
queue
)
{
this
.
queue
=
queue
;
}
@Override
...
...
@@ -237,4 +232,24 @@ public class User {
result
=
31
*
result
+
userName
.
hashCode
();
return
result
;
}
@Override
public
String
toString
()
{
return
"User{"
+
"id="
+
id
+
", userName='"
+
userName
+
'\''
+
", userPassword='"
+
userPassword
+
'\''
+
", email='"
+
email
+
'\''
+
", phone='"
+
phone
+
'\''
+
", userType="
+
userType
+
", tenantId="
+
tenantId
+
", tenantCode='"
+
tenantCode
+
'\''
+
", tenantName='"
+
tenantName
+
'\''
+
", queueName='"
+
queueName
+
'\''
+
", alertGroup='"
+
alertGroup
+
'\''
+
", queue='"
+
queue
+
'\''
+
", createTime="
+
createTime
+
", updateTime="
+
updateTime
+
'}'
;
}
}
escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java
浏览文件 @
4bdb60f2
...
...
@@ -60,4 +60,10 @@ public class UserMapperTest {
}
@Test
public
void
queryQueueByProcessInstanceId
(){
String
queue
=
userMapper
.
queryQueueByProcessInstanceId
(
41388
);
Assert
.
assertEquals
(
queue
,
"ait"
);
}
}
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
浏览文件 @
4bdb60f2
...
...
@@ -36,6 +36,7 @@ import cn.escheduler.server.worker.task.AbstractTask;
import
cn.escheduler.server.worker.task.TaskManager
;
import
cn.escheduler.server.worker.task.TaskProps
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -154,12 +155,18 @@ public class TaskScheduleThread implements Callable<Boolean> {
taskProps
.
setTenantCode
(
taskInstance
.
getProcessInstance
().
getTenantCode
());
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskInstance
.
getId
());
String
queue
=
processDao
.
queryQueueByProcessInstanceId
(
processInstance
.
getId
());
taskProps
.
setScheduleTime
(
processInstance
.
getScheduleTime
());
taskProps
.
setNodeName
(
taskInstance
.
getName
());
taskProps
.
setTaskInstId
(
taskInstance
.
getId
());
taskProps
.
setEnvFile
(
CommonUtils
.
getSystemEnvPath
());
// set queue
taskProps
.
setQueue
(
taskInstance
.
getProcessInstance
().
getQueue
());
if
(
StringUtils
.
isEmpty
(
queue
)){
taskProps
.
setQueue
(
taskInstance
.
getProcessInstance
().
getQueue
());
}
else
{
taskProps
.
setQueue
(
queue
);
}
taskProps
.
setTaskStartTime
(
taskInstance
.
getStartTime
());
taskProps
.
setDefinedParams
(
allParamMap
);
...
...
@@ -188,7 +195,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
task
.
handle
();
logger
.
info
(
"task : {} exit status code : {}"
,
taskProps
.
getTaskAppId
(),
task
.
getExitStatusCode
());
logger
.
info
(
"task : {} exit status code : {}"
,
taskProps
.
getTaskAppId
(),
task
.
getExitStatusCode
());
if
(
task
.
getExitStatusCode
()
==
Constants
.
EXIT_CODE_SUCCESS
){
status
=
ExecutionStatus
.
SUCCESS
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录