Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
lhongjum2003
DolphinScheduler
提交
c8c92e4b
DolphinScheduler
项目概览
lhongjum2003
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
c8c92e4b
编写于
7月 26, 2019
作者:
journey2018
提交者:
GitHub
7月 26, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #634 from qiaozhanwei/dev
worker code optimization
上级
73bb6d5f
b6343ee9
变更
33
展开全部
隐藏空白更改
内联
并排
Showing
33 changed file
with
1002 addition
and
744 deletion
+1002
-744
escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java
...in/java/cn/escheduler/api/controller/LoginController.java
+1
-1
escheduler-api/src/main/java/cn/escheduler/api/log/LogClient.java
...er-api/src/main/java/cn/escheduler/api/log/LogClient.java
+3
-3
escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java
...c/main/java/cn/escheduler/api/service/SessionService.java
+3
-3
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
...src/main/java/cn/escheduler/api/service/UsersService.java
+1
-1
escheduler-common/src/main/java/cn/escheduler/common/Constants.java
...-common/src/main/java/cn/escheduler/common/Constants.java
+15
-10
escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java
.../src/main/java/cn/escheduler/common/enums/ServerEnum.java
+29
-0
escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java
...n/java/cn/escheduler/common/job/db/DataSourceFactory.java
+38
-1
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
...src/main/java/cn/escheduler/common/utils/CommonUtils.java
+17
-0
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
...c/main/java/cn/escheduler/common/zk/AbstractZKClient.java
+13
-0
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
...duler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
+20
-1
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java
...src/main/java/cn/escheduler/dao/mapper/SessionMapper.java
+2
-4
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java
.../java/cn/escheduler/dao/mapper/SessionMapperProvider.java
+0
-2
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
...rc/main/java/cn/escheduler/dao/model/ProcessInstance.java
+39
-34
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java
...o/src/main/java/cn/escheduler/dao/model/TaskInstance.java
+8
-0
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
...rc/main/java/cn/escheduler/server/utils/ProcessUtils.java
+1
-2
escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java
...c/main/java/cn/escheduler/server/worker/WorkerServer.java
+1
-1
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java
...java/cn/escheduler/server/worker/log/TaskLogAppender.java
+0
-2
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
...a/cn/escheduler/server/worker/runner/FetchTaskThread.java
+108
-93
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
...n/escheduler/server/worker/runner/TaskScheduleThread.java
+109
-174
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
...scheduler/server/worker/task/AbstractCommandExecutor.java
+7
-1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
...n/java/cn/escheduler/server/worker/task/AbstractTask.java
+112
-1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
...va/cn/escheduler/server/worker/task/AbstractYarnTask.java
+14
-11
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
.../escheduler/server/worker/task/PythonCommandExecutor.java
+27
-14
escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
...n/escheduler/server/worker/task/ShellCommandExecutor.java
+10
-6
escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
...main/java/cn/escheduler/server/worker/task/TaskProps.java
+70
-16
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
...eduler/server/worker/task/dependent/DependentExecute.java
+0
-2
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
...scheduler/server/worker/task/dependent/DependentTask.java
+1
-1
escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
...va/cn/escheduler/server/worker/task/mr/MapReduceTask.java
+4
-3
escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
...heduler/server/worker/task/processdure/ProcedureTask.java
+172
-165
escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
...a/cn/escheduler/server/worker/task/python/PythonTask.java
+23
-49
escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
...ava/cn/escheduler/server/worker/task/shell/ShellTask.java
+17
-15
escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
...ava/cn/escheduler/server/worker/task/spark/SparkTask.java
+2
-4
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
...in/java/cn/escheduler/server/worker/task/sql/SqlTask.java
+135
-124
未找到文件。
escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java
浏览文件 @
c8c92e4b
...
...
@@ -116,7 +116,7 @@ public class LoginController extends BaseController {
response
.
setStatus
(
HttpStatus
.
SC_OK
);
response
.
addCookie
(
new
Cookie
(
Constants
.
SESSION_ID
,
sessionId
));
logger
.
info
(
"sessionId
= "
+
sessionId
);
logger
.
info
(
"sessionId
: {}"
,
sessionId
);
return
success
(
LOGIN_SUCCESS
.
getMsg
(),
sessionId
);
}
catch
(
Exception
e
)
{
logger
.
error
(
USER_LOGIN_FAILURE
.
getMsg
(),
e
);
...
...
escheduler-api/src/main/java/cn/escheduler/api/log/LogClient.java
浏览文件 @
c8c92e4b
...
...
@@ -100,7 +100,7 @@ public class LogClient {
* @return
*/
public
String
viewLog
(
String
path
)
{
logger
.
info
(
"view
queryL
og path {}"
,
path
);
logger
.
info
(
"view
l
og path {}"
,
path
);
PathParameter
pathParameter
=
PathParameter
.
newBuilder
().
setPath
(
path
).
build
();
RetStrInfo
retStrInfo
;
try
{
...
...
@@ -119,14 +119,14 @@ public class LogClient {
* @return
*/
public
byte
[]
getLogBytes
(
String
path
)
{
logger
.
info
(
"
get
log path {}"
,
path
);
logger
.
info
(
"log path {}"
,
path
);
PathParameter
pathParameter
=
PathParameter
.
newBuilder
().
setPath
(
path
).
build
();
RetByteInfo
retByteInfo
;
try
{
retByteInfo
=
blockingStub
.
getLogBytes
(
pathParameter
);
return
retByteInfo
.
getData
().
toByteArray
();
}
catch
(
StatusRuntimeException
e
)
{
logger
.
error
(
"
get
log size error"
,
e
);
logger
.
error
(
"log size error"
,
e
);
return
null
;
}
}
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java
浏览文件 @
c8c92e4b
...
...
@@ -68,7 +68,7 @@ public class SessionService extends BaseService{
String
ip
=
BaseController
.
getClientIpAddress
(
request
);
logger
.
info
(
"get session: {}, ip: {}"
,
sessionId
,
ip
);
return
sessionMapper
.
queryByIdAndIp
(
sessionId
,
ip
);
return
sessionMapper
.
queryByIdAndIp
(
sessionId
);
}
/**
...
...
@@ -80,7 +80,7 @@ public class SessionService extends BaseService{
*/
public
String
createSession
(
User
user
,
String
ip
)
{
// logined
Session
session
=
sessionMapper
.
queryByUserIdAndIp
(
user
.
getId
()
,
ip
);
Session
session
=
sessionMapper
.
queryByUserIdAndIp
(
user
.
getId
());
Date
now
=
new
Date
();
/**
...
...
@@ -126,7 +126,7 @@ public class SessionService extends BaseService{
/**
* query session by user id and ip
*/
Session
session
=
sessionMapper
.
queryByUserIdAndIp
(
loginUser
.
getId
()
,
ip
);
Session
session
=
sessionMapper
.
queryByUserIdAndIp
(
loginUser
.
getId
());
//delete session
sessionMapper
.
deleteById
(
session
.
getId
());
}
...
...
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
浏览文件 @
c8c92e4b
...
...
@@ -322,7 +322,7 @@ public class UsersService extends BaseService {
if
(
user
!=
null
)
{
if
(
PropertyUtils
.
getResUploadStartupState
())
{
String
userPath
=
HadoopUtils
.
getHdfs
DataBasePath
()
+
"/"
+
user
.
getTenantCode
()
+
"/home/"
+
id
;
String
userPath
=
HadoopUtils
.
getHdfs
UserDir
(
user
.
getTenantCode
(),
id
)
;
if
(
HadoopUtils
.
getInstance
().
exists
(
userPath
))
{
HadoopUtils
.
getInstance
().
delete
(
userPath
,
true
);
}
...
...
escheduler-common/src/main/java/cn/escheduler/common/Constants.java
浏览文件 @
c8c92e4b
...
...
@@ -119,10 +119,6 @@ public final class Constants {
*/
public
static
final
String
ESCHEDULER_ENV_PATH
=
"escheduler.env.path"
;
/**
* escheduler.env.sh
*/
public
static
final
String
ESCHEDULER_ENV_SH
=
".escheduler_env.sh"
;
/**
* python home
...
...
@@ -220,9 +216,9 @@ public final class Constants {
public
static
final
String
SEMICOLON
=
";"
;
/**
*
DOT .
*
EQUAL SIGN
*/
public
static
final
String
DOT
=
".
"
;
public
static
final
String
EQUAL_SIGN
=
"=
"
;
/**
* ZOOKEEPER_SESSION_TIMEOUT
...
...
@@ -283,10 +279,6 @@ public final class Constants {
*/
public
static
final
String
YYYY_MM_DD_HH_MM_SS
=
"yyyy-MM-dd HH:mm:ss"
;
/**
* date format of yyyyMMdd
*/
public
static
final
String
YYYYMMDD
=
"yyyyMMdd"
;
/**
* date format of yyyyMMddHHmmss
...
...
@@ -489,6 +481,7 @@ public final class Constants {
public
static
final
String
TASK_RECORD_PWD
=
"task.record.datasource.password"
;
public
static
final
String
DEFAULT
=
"Default"
;
public
static
final
String
USER
=
"user"
;
public
static
final
String
PASSWORD
=
"password"
;
public
static
final
String
XXXXXX
=
"******"
;
...
...
@@ -499,6 +492,7 @@ public final class Constants {
public
static
final
String
STATUS
=
"status"
;
/**
* command parameter keys
*/
...
...
@@ -866,6 +860,11 @@ public final class Constants {
*/
public
static
final
int
PREVIEW_SCHEDULE_EXECUTE_COUNT
=
5
;
/**
* kerberos
*/
public
static
final
String
KERBEROS
=
"kerberos"
;
/**
* java.security.krb5.conf
*/
...
...
@@ -901,4 +900,10 @@ public final class Constants {
* loginUserFromKeytab path
*/
public
static
final
String
LOGIN_USER_KEY_TAB_PATH
=
"login.user.keytab.path"
;
/**
* hive conf
*/
public
static
final
String
HIVE_CONF
=
"hiveconf:"
;
}
escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java
0 → 100644
浏览文件 @
c8c92e4b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
cn.escheduler.common.enums
;
/**
* cycle enums
*/
public
enum
ServerEnum
{
/**
* master server , worker server
*/
MASTER_SERVER
,
WORKER_SERVER
}
escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java
浏览文件 @
c8c92e4b
...
...
@@ -21,6 +21,8 @@ import cn.escheduler.common.utils.JSONUtils;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
static
cn
.
escheduler
.
common
.
Constants
.*;
/**
* produce datasource in this custom defined datasource factory.
*/
...
...
@@ -49,8 +51,43 @@ public class DataSourceFactory {
return
null
;
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"
G
et datasource object error"
,
e
);
logger
.
error
(
"
g
et datasource object error"
,
e
);
return
null
;
}
}
/**
* load class
* @param dbType
* @throws Exception
*/
public
static
void
loadClass
(
DbType
dbType
)
throws
Exception
{
switch
(
dbType
){
case
MYSQL
:
Class
.
forName
(
JDBC_MYSQL_CLASS_NAME
);
break
;
case
POSTGRESQL
:
Class
.
forName
(
JDBC_POSTGRESQL_CLASS_NAME
);
break
;
case
HIVE
:
Class
.
forName
(
JDBC_HIVE_CLASS_NAME
);
break
;
case
SPARK
:
Class
.
forName
(
JDBC_SPARK_CLASS_NAME
);
break
;
case
CLICKHOUSE
:
Class
.
forName
(
JDBC_CLICKHOUSE_CLASS_NAME
);
break
;
case
ORACLE
:
Class
.
forName
(
JDBC_ORACLE_CLASS_NAME
);
break
;
case
SQLSERVER:
Class
.
forName
(
JDBC_SQLSERVER_CLASS_NAME
);
break
;
default
:
logger
.
error
(
"not support sql type: {},can't load class"
,
dbType
);
throw
new
IllegalArgumentException
(
"not support sql type,can't load class"
);
}
}
}
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
浏览文件 @
c8c92e4b
...
...
@@ -19,6 +19,8 @@ package cn.escheduler.common.utils;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.enums.ResUploadType
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -74,4 +76,19 @@ public class CommonUtils {
Boolean
kerberosStartupState
=
getBoolean
(
cn
.
escheduler
.
common
.
Constants
.
HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE
);
return
resUploadType
==
ResUploadType
.
HDFS
&&
kerberosStartupState
;
}
/**
* load kerberos configuration
* @throws Exception
*/
public
static
void
loadKerberosConf
()
throws
Exception
{
if
(
CommonUtils
.
getKerberosStartupState
())
{
System
.
setProperty
(
JAVA_SECURITY_KRB5_CONF
,
getString
(
JAVA_SECURITY_KRB5_CONF_PATH
));
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
HADOOP_SECURITY_AUTHENTICATION
,
KERBEROS
);
UserGroupInformation
.
setConfiguration
(
configuration
);
UserGroupInformation
.
loginUserFromKeytab
(
getString
(
LOGIN_USER_KEY_TAB_USERNAME
),
getString
(
cn
.
escheduler
.
common
.
Constants
.
LOGIN_USER_KEY_TAB_PATH
));
}
}
}
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
浏览文件 @
c8c92e4b
...
...
@@ -18,6 +18,7 @@ package cn.escheduler.common.zk;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.IStoppable
;
import
cn.escheduler.common.enums.ServerEnum
;
import
cn.escheduler.common.utils.DateUtils
;
import
cn.escheduler.common.utils.OSUtils
;
import
org.apache.commons.configuration.Configuration
;
...
...
@@ -27,6 +28,7 @@ import org.apache.curator.RetryPolicy;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.CuratorFrameworkFactory
;
import
org.apache.curator.framework.imps.CuratorFrameworkState
;
import
org.apache.curator.framework.recipes.locks.InterProcessMutex
;
import
org.apache.curator.framework.state.ConnectionState
;
import
org.apache.curator.framework.state.ConnectionStateListener
;
import
org.apache.curator.retry.ExponentialBackoffRetry
;
...
...
@@ -415,6 +417,17 @@ public abstract class AbstractZKClient {
return
conf
.
getString
(
Constants
.
ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS
);
}
/**
* acquire zk lock
* @param zkClient
* @param zNodeLockPath
* @throws Exception
*/
public
InterProcessMutex
acquireZkLock
(
CuratorFramework
zkClient
,
String
zNodeLockPath
)
throws
Exception
{
InterProcessMutex
mutex
=
new
InterProcessMutex
(
zkClient
,
zNodeLockPath
);
mutex
.
acquire
();
return
mutex
;
}
@Override
public
String
toString
()
{
...
...
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
浏览文件 @
c8c92e4b
...
...
@@ -1220,6 +1220,26 @@ public class ProcessDao extends AbstractBaseDao {
return
taskInstanceMapper
.
queryById
(
taskId
);
}
/**
* package task instance,associate processInstance and processDefine
* @param taskInstId
* @return
*/
public
TaskInstance
getTaskInstanceRelationByTaskId
(
int
taskInstId
){
// get task instance
TaskInstance
taskInstance
=
findTaskInstanceById
(
taskInstId
);
// get process instance
ProcessInstance
processInstance
=
findProcessInstanceDetailById
(
taskInstance
.
getProcessInstanceId
());
// get process define
ProcessDefinition
processDefine
=
findProcessDefineById
(
taskInstance
.
getProcessDefinitionId
());
taskInstance
.
setProcessInstance
(
processInstance
);
taskInstance
.
setProcessDefine
(
processDefine
);
return
taskInstance
;
}
/**
* get id list by task state
* @param instanceId
...
...
@@ -1324,7 +1344,6 @@ public class ProcessDao extends AbstractBaseDao {
String
executePath
,
String
logPath
,
int
taskInstId
)
{
TaskInstance
taskInstance
=
taskInstanceMapper
.
queryById
(
taskInstId
);
taskInstance
.
setState
(
state
);
taskInstance
.
setStartTime
(
startTime
);
...
...
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java
浏览文件 @
c8c92e4b
...
...
@@ -75,7 +75,6 @@ public interface SessionMapper {
* query by session id and ip
*
* @param sessionId
* @param ip
* @return
*/
@Results
(
value
=
{
...
...
@@ -85,13 +84,12 @@ public interface SessionMapper {
@Result
(
property
=
"lastLoginTime"
,
column
=
"last_login_time"
,
javaType
=
Timestamp
.
class
,
jdbcType
=
JdbcType
.
DATE
)
})
@SelectProvider
(
type
=
SessionMapperProvider
.
class
,
method
=
"queryByIdAndIp"
)
Session
queryByIdAndIp
(
@Param
(
"sessionId"
)
String
sessionId
,
@Param
(
"ip"
)
String
ip
);
Session
queryByIdAndIp
(
@Param
(
"sessionId"
)
String
sessionId
);
/**
* query by user id and ip
* @param userId
* @param ip
* @return
*/
@Results
(
value
=
{
...
...
@@ -101,6 +99,6 @@ public interface SessionMapper {
@Result
(
property
=
"lastLoginTime"
,
column
=
"last_login_time"
,
javaType
=
Timestamp
.
class
,
jdbcType
=
JdbcType
.
DATE
)
})
@SelectProvider
(
type
=
SessionMapperProvider
.
class
,
method
=
"queryByUserIdAndIp"
)
Session
queryByUserIdAndIp
(
@Param
(
"userId"
)
int
userId
,
@Param
(
"ip"
)
String
ip
);
Session
queryByUserIdAndIp
(
@Param
(
"userId"
)
int
userId
);
}
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java
浏览文件 @
c8c92e4b
...
...
@@ -114,7 +114,6 @@ public class SessionMapperProvider {
FROM
(
TABLE_NAME
);
WHERE
(
"`id` = #{sessionId}"
);
WHERE
(
"`ip` = #{ip}"
);
}}.
toString
();
}
...
...
@@ -130,7 +129,6 @@ public class SessionMapperProvider {
FROM
(
TABLE_NAME
);
WHERE
(
"`user_id` = #{userId}"
);
WHERE
(
"`ip` = #{ip}"
);
}}.
toString
();
}
}
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
浏览文件 @
c8c92e4b
...
...
@@ -529,6 +529,39 @@ public class ProcessInstance {
this
.
timeout
=
timeout
;
}
public
void
setTenantId
(
int
tenantId
)
{
this
.
tenantId
=
tenantId
;
}
public
int
getTenantId
()
{
return
this
.
tenantId
;
}
public
String
getWorkerGroupName
()
{
return
workerGroupName
;
}
public
void
setWorkerGroupName
(
String
workerGroupName
)
{
this
.
workerGroupName
=
workerGroupName
;
}
public
String
getReceivers
()
{
return
receivers
;
}
public
void
setReceivers
(
String
receivers
)
{
this
.
receivers
=
receivers
;
}
public
String
getReceiversCc
()
{
return
receiversCc
;
}
public
void
setReceiversCc
(
String
receiversCc
)
{
this
.
receiversCc
=
receiversCc
;
}
@Override
public
String
toString
()
{
return
"ProcessInstance{"
+
...
...
@@ -555,7 +588,6 @@ public class ProcessInstance {
", processInstanceJson='"
+
processInstanceJson
+
'\''
+
", executorId="
+
executorId
+
", tenantCode='"
+
tenantCode
+
'\''
+
", tenantId='"
+
tenantId
+
'\''
+
", queue='"
+
queue
+
'\''
+
", isSubProcess="
+
isSubProcess
+
", locations='"
+
locations
+
'\''
+
...
...
@@ -563,40 +595,13 @@ public class ProcessInstance {
", historyCmd='"
+
historyCmd
+
'\''
+
", dependenceScheduleTimes='"
+
dependenceScheduleTimes
+
'\''
+
", duration="
+
duration
+
", timeout="
+
timeout
+
", processInstancePriority="
+
processInstancePriority
+
", workerGroupId="
+
workerGroupId
+
", timeout="
+
timeout
+
", tenantId="
+
tenantId
+
", workerGroupName='"
+
workerGroupName
+
'\''
+
", receivers='"
+
receivers
+
'\''
+
", receiversCc='"
+
receiversCc
+
'\''
+
'}'
;
}
public
void
setTenantId
(
int
tenantId
)
{
this
.
tenantId
=
tenantId
;
}
public
int
getTenantId
()
{
return
this
.
tenantId
;
}
public
String
getWorkerGroupName
()
{
return
workerGroupName
;
}
public
void
setWorkerGroupName
(
String
workerGroupName
)
{
this
.
workerGroupName
=
workerGroupName
;
}
public
String
getReceivers
()
{
return
receivers
;
}
public
void
setReceivers
(
String
receivers
)
{
this
.
receivers
=
receivers
;
}
public
String
getReceiversCc
()
{
return
receiversCc
;
}
public
void
setReceiversCc
(
String
receiversCc
)
{
this
.
receiversCc
=
receiversCc
;
}
}
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java
浏览文件 @
c8c92e4b
...
...
@@ -189,6 +189,14 @@ public class TaskInstance {
private
int
workerGroupId
;
public
void
init
(
String
host
,
Date
startTime
,
String
executePath
){
this
.
host
=
host
;
this
.
startTime
=
startTime
;
this
.
executePath
=
executePath
;
}
public
ProcessInstance
getProcessInstance
()
{
return
processInstance
;
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
浏览文件 @
c8c92e4b
...
...
@@ -314,8 +314,7 @@ public class ProcessUtils {
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"kill yarn job failed : "
+
e
.
getMessage
(),
e
);
// throw new RuntimeException("kill yarn job fail");
logger
.
error
(
"kill yarn job failure"
,
e
);
}
}
}
escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java
浏览文件 @
c8c92e4b
...
...
@@ -254,7 +254,7 @@ public class WorkerServer implements IStoppable {
try
{
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
}
catch
(
InterruptedException
e
)
{
logger
.
error
(
"interrupted exception
: "
+
e
.
getMessage
()
,
e
);
logger
.
error
(
"interrupted exception
"
,
e
);
}
// if set is null , return
if
(
CollectionUtils
.
isNotEmpty
(
taskInfoSet
)){
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java
浏览文件 @
c8c92e4b
...
...
@@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
*/
public
class
TaskLogAppender
extends
FileAppender
<
ILoggingEvent
>
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TaskLogAppender
.
class
);
private
String
currentlyActiveFile
;
@Override
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
浏览文件 @
c8c92e4b
...
...
@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import
cn.escheduler.common.queue.ITaskQueue
;
import
cn.escheduler.common.thread.Stopper
;
import
cn.escheduler.common.thread.ThreadUtils
;
import
cn.escheduler.common.utils.CollectionUtils
;
import
cn.escheduler.common.utils.FileUtils
;
import
cn.escheduler.common.utils.OSUtils
;
import
cn.escheduler.dao.ProcessDao
;
...
...
@@ -27,7 +28,6 @@ import cn.escheduler.dao.model.*;
import
cn.escheduler.server.zk.ZKWorkerClient
;
import
org.apache.commons.configuration.Configuration
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.recipes.locks.InterProcessMutex
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -74,8 +74,20 @@ public class FetchTaskThread implements Runnable{
*/
private
int
workerExecNums
;
/**
* conf
*/
private
Configuration
conf
;
/**
* task instance
*/
private
TaskInstance
taskInstance
;
/**
* task instance id
*/
Integer
taskInstId
;
public
FetchTaskThread
(
int
taskNum
,
ZKWorkerClient
zkWorkerClient
,
ProcessDao
processDao
,
Configuration
conf
,
...
...
@@ -124,126 +136,101 @@ public class FetchTaskThread implements Runnable{
@Override
public
void
run
()
{
while
(
Stopper
.
isRunning
()){
InterProcessMutex
mutex
=
null
;
try
{
ThreadPoolExecutor
poolExecutor
=
(
ThreadPoolExecutor
)
workerExecService
;
//check memory and cpu usage and threads
if
(
OSUtils
.
checkResource
(
this
.
conf
,
false
)
&&
checkThreadCount
(
poolExecutor
))
{
//whether have tasks, if no tasks , no need lock //get all tasks
List
<
String
>
tasksQueueList
=
taskQueue
.
getAllTasks
(
Constants
.
SCHEDULER_TASKS_QUEUE
);
if
(
tasksQueueList
.
size
()
>
0
){
// creating distributed locks, lock path /escheduler/lock/worker
String
zNodeLockPath
=
zkWorkerClient
.
getWorkerLockPath
();
mutex
=
new
InterProcessMutex
(
zkWorkerClient
.
getZkClient
(),
zNodeLockPath
);
mutex
.
acquire
();
boolean
runCheckFlag
=
OSUtils
.
checkResource
(
this
.
conf
,
false
)
&&
checkThreadCount
(
poolExecutor
);
// task instance id str
List
<
String
>
taskQueueStrArr
=
taskQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskNum
);
for
(
String
taskQueueStr
:
taskQueueStrArr
){
if
(
StringUtils
.
isNotBlank
(
taskQueueStr
))
{
if
(!
checkThreadCount
(
poolExecutor
))
{
break
;
}
String
[]
taskStringArray
=
taskQueueStr
.
split
(
Constants
.
UNDERLINE
);
String
taskInstIdStr
=
taskStringArray
[
3
];
Date
now
=
new
Date
();
Integer
taskId
=
Integer
.
parseInt
(
taskInstIdStr
);
// find task instance by task id
TaskInstance
taskInstance
=
processDao
.
findTaskInstanceById
(
taskId
);
logger
.
info
(
"worker fetch taskId : {} from queue "
,
taskId
);
int
retryTimes
=
30
;
// mainly to wait for the master insert task to succeed
while
(
taskInstance
==
null
&&
retryTimes
>
0
)
{
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
taskInstance
=
processDao
.
findTaskInstanceById
(
taskId
);
retryTimes
--;
}
if
(
taskInstance
==
null
)
{
logger
.
error
(
"task instance is null. task id : {} "
,
taskId
);
continue
;
}
if
(!
checkWorkerGroup
(
taskInstance
,
OSUtils
.
getHost
())){
continue
;
}
taskQueue
.
removeNode
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskQueueStr
);
logger
.
info
(
"remove task:{} from queue"
,
taskQueueStr
);
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
// set execute task worker host
taskInstance
.
setHost
(
OSUtils
.
getHost
())
;
taskInstance
.
setStartTime
(
now
);
if
(!
runCheckFlag
)
{
continue
;
}
//whether have tasks, if no tasks , no need lock //get all tasks
List
<
String
>
tasksQueueList
=
taskQueue
.
getAllTasks
(
Constants
.
SCHEDULER_TASKS_QUEUE
);
if
(
CollectionUtils
.
isEmpty
(
tasksQueueList
)){
continue
;
}
// creating distributed locks, lock path /escheduler/lock/worker
mutex
=
zkWorkerClient
.
acquireZkLock
(
zkWorkerClient
.
getZkClient
(),
zkWorkerClient
.
getWorkerLockPath
());
// get process instance
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceDetailById
(
taskInstance
.
getProcessInstanceId
());
// get process define
ProcessDefinition
processDefine
=
processDao
.
findProcessDefineById
(
taskInstance
.
getProcessDefinitionId
()
);
// task instance id str
List
<
String
>
taskQueueStrArr
=
taskQueue
.
poll
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskNum
);
for
(
String
taskQueueStr
:
taskQueueStrArr
){
if
(
StringUtils
.
isEmpty
(
taskQueueStr
))
{
continue
;
}
taskInstance
.
setProcessInstance
(
processInstance
);
taskInstance
.
setProcessDefine
(
processDefine
);
if
(!
checkThreadCount
(
poolExecutor
))
{
break
;
}
// get task instance id
taskInstId
=
Integer
.
parseInt
(
taskQueueStr
.
split
(
Constants
.
UNDERLINE
)[
3
]);
// get task instance relation
taskInstance
=
processDao
.
getTaskInstanceRelationByTaskId
(
taskInstId
);
Tenant
tenant
=
processDao
.
getTenantForProcess
(
taskInstance
.
getProcessInstance
().
getTenantId
(),
taskInstance
.
getProcessDefine
().
getUserId
());
if
(
tenant
==
null
){
logger
.
error
(
"tenant not exists,process define id : {},process instance id : {},task instance id : {}"
,
taskInstance
.
getProcessDefine
().
getId
(),
taskInstance
.
getProcessInstance
().
getId
(),
taskInstance
.
getId
());
taskQueue
.
removeNode
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskQueueStr
);
continue
;
}
// get local execute path
String
execLocalPath
=
FileUtils
.
getProcessExecDir
(
processDefine
.
getProjectId
(),
processDefine
.
getId
(),
processInstance
.
getId
(),
taskInstance
.
getId
());
logger
.
info
(
"task instance local execute path : {} "
,
execLocalPath
);
logger
.
info
(
"worker fetch taskId : {} from queue "
,
taskInstId
);
// mainly to wait for the master insert task to succeed
waitForMasterEnterQueue
();
// set task execute path
taskInstance
.
setExecutePath
(
execLocalPath
);
if
(
taskInstance
==
null
)
{
logger
.
error
(
"task instance is null. task id : {} "
,
taskInstId
);
taskQueue
.
removeNode
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskQueueStr
);
continue
;
}
Tenant
tenant
=
processDao
.
getTenantForProcess
(
processInstance
.
getTenantId
(),
processDefine
.
getUserId
());
if
(
tenant
==
null
){
logger
.
error
(
"cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}"
,
taskInstance
.
getName
(),
processInstance
.
getTenantId
(),
processDefine
.
getTenantId
());
continue
;
}
if
(!
checkWorkerGroup
(
taskInstance
,
OSUtils
.
getHost
())){
continue
;
}
// check and create Linux users
FileUtils
.
createWorkDirAndUserIfAbsent
(
execLocalPath
,
tenant
.
getTenantCode
(),
logger
);
// get local execute path
logger
.
info
(
"task instance local execute path : {} "
,
getExecLocalPath
());
logger
.
info
(
"task : {} ready to submit to task scheduler thread"
,
taskId
);
// submit task
workerExecService
.
submit
(
new
TaskScheduleThread
(
taskInstance
,
processDao
));
// init task
taskInstance
.
init
(
OSUtils
.
getHost
(),
new
Date
(),
getExecLocalPath
());
}
}
// check and create Linux users
FileUtils
.
createWorkDirAndUserIfAbsent
(
getExecLocalPath
(),
tenant
.
getTenantCode
(),
logger
);
}
logger
.
info
(
"task : {} ready to submit to task scheduler thread"
,
taskInstId
);
// submit task
workerExecService
.
submit
(
new
TaskScheduleThread
(
taskInstance
,
processDao
));
// remove node from zk
taskQueue
.
removeNode
(
Constants
.
SCHEDULER_TASKS_QUEUE
,
taskQueueStr
);
}
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
}
catch
(
Exception
e
){
logger
.
error
(
"fetch task thread
exception : "
+
e
.
getMessage
()
,
e
);
logger
.
error
(
"fetch task thread
failure"
,
e
);
}
finally
{
if
(
mutex
!=
null
){
try
{
mutex
.
release
();
}
catch
(
Exception
e
)
{
if
(
e
.
getMessage
().
equals
(
"instance must be started before calling this method"
)){
logger
.
warn
(
"fetch task lock release"
);
}
else
{
logger
.
error
(
"fetch task lock release failed : "
+
e
.
getMessage
(),
e
);
}
logger
.
error
(
"fetch task lock release failure "
,
e
);
}
}
}
...
...
@@ -251,16 +238,44 @@ public class FetchTaskThread implements Runnable{
}
/**
*
* get execute local path
* @return
*/
private
String
getExecLocalPath
(){
return
FileUtils
.
getProcessExecDir
(
taskInstance
.
getProcessDefine
().
getProjectId
(),
taskInstance
.
getProcessDefine
().
getId
(),
taskInstance
.
getProcessInstance
().
getId
(),
taskInstance
.
getId
());
}
/**
* check
* @param poolExecutor
* @return
*/
private
boolean
checkThreadCount
(
ThreadPoolExecutor
poolExecutor
)
{
int
activeCount
=
poolExecutor
.
getActiveCount
();
if
(
activeCount
>=
workerExecNums
)
{
logger
.
info
(
"thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource"
,
activeCount
,
workerExecNums
,
Constants
.
SLEEP_TIME_MILLIS
);
logger
.
info
(
"thread insufficient , activeCount : {} , "
+
"workerExecNums : {}, will sleep : {} millis for thread resource"
,
activeCount
,
workerExecNums
,
Constants
.
SLEEP_TIME_MILLIS
);
return
false
;
}
return
true
;
}
/**
* mainly to wait for the master insert task to succeed
* @throws Exception
*/
private
void
waitForMasterEnterQueue
()
throws
Exception
{
int
retryTimes
=
30
;
while
(
taskInstance
==
null
&&
retryTimes
>
0
)
{
Thread
.
sleep
(
Constants
.
SLEEP_TIME_MILLIS
);
taskInstance
=
processDao
.
findTaskInstanceById
(
taskInstId
);
retryTimes
--;
}
}
}
\ No newline at end of file
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
浏览文件 @
c8c92e4b
...
...
@@ -59,13 +59,16 @@ import java.util.stream.Collectors;
/**
* task scheduler thread
*/
public
class
TaskScheduleThread
implements
Callable
<
Boolean
>
{
public
class
TaskScheduleThread
implements
Runnable
{
/**
* logger
*/
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TaskScheduleThread
.
class
);
/**
* task prefix
*/
private
static
final
String
TASK_PREFIX
=
"TASK"
;
/**
...
...
@@ -79,7 +82,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
private
final
ProcessDao
processDao
;
/**
*
execute task info
*
abstract task
*/
private
AbstractTask
task
;
...
...
@@ -89,115 +92,55 @@ public class TaskScheduleThread implements Callable<Boolean> {
}
@Override
public
Boolean
call
()
throws
Exception
{
public
void
run
()
{
// get task type
String
taskType
=
taskInstance
.
getTaskType
();
// set task state
taskInstance
.
setState
(
ExecutionStatus
.
RUNNING_EXEUTION
);
// update task state
if
(
taskType
.
equals
(
TaskType
.
SQL
.
name
())
||
taskType
.
equals
(
TaskType
.
PROCEDURE
.
name
())){
processDao
.
changeTaskState
(
taskInstance
.
getState
(),
taskInstance
.
getStartTime
(),
taskInstance
.
getHost
(),
null
,
System
.
getProperty
(
"user.dir"
)
+
"/logs/"
+
taskInstance
.
getProcessDefinitionId
()
+
"/"
+
taskInstance
.
getProcessInstanceId
()
+
"/"
+
taskInstance
.
getId
()
+
".log"
,
taskInstance
.
getId
());
}
else
{
processDao
.
changeTaskState
(
taskInstance
.
getState
(),
taskInstance
.
getStartTime
(),
taskInstance
.
getHost
(),
taskInstance
.
getExecutePath
(),
System
.
getProperty
(
"user.dir"
)
+
"/logs/"
+
taskInstance
.
getProcessDefinitionId
()
+
"/"
+
taskInstance
.
getProcessInstanceId
()
+
"/"
+
taskInstance
.
getId
()
+
".log"
,
taskInstance
.
getId
());
}
ExecutionStatus
status
=
ExecutionStatus
.
SUCCESS
;
// update task state is running according to task type
updateTaskState
(
taskInstance
.
getTaskType
());
try
{
logger
.
info
(
"script path : {}"
,
taskInstance
.
getExecutePath
());
// task node
TaskNode
taskNode
=
JSONObject
.
parseObject
(
taskInstance
.
getTaskJson
(),
TaskNode
.
class
);
// custom param str
String
customParamStr
=
taskInstance
.
getProcessInstance
().
getGlobalParams
();
Map
<
String
,
String
>
allParamMap
=
new
HashMap
<>();
if
(
customParamStr
!=
null
)
{
List
<
Property
>
customParamMap
=
JSONObject
.
parseArray
(
customParamStr
,
Property
.
class
);
Map
<
String
,
String
>
userDefinedParamMap
=
customParamMap
.
stream
().
collect
(
Collectors
.
toMap
(
Property:
:
getProp
,
Property:
:
getValue
));
allParamMap
.
putAll
(
userDefinedParamMap
);
}
logger
.
info
(
"script path : {}"
,
taskInstance
.
getExecutePath
());
TaskProps
taskProps
=
new
TaskProps
();
taskProps
.
setTaskDir
(
taskInstance
.
getExecutePath
());
String
taskJson
=
taskInstance
.
getTaskJson
();
TaskNode
taskNode
=
JSONObject
.
parseObject
(
taskJson
,
TaskNode
.
class
);
List
<
String
>
projectRes
=
createProjectResFiles
(
taskNode
);
// copy hdfs file to local
// copy hdfs/minio file to local
copyHdfsToLocal
(
processDao
,
taskInstance
.
getExecutePath
(),
projectRes
,
createProjectResFiles
(
taskNode
)
,
logger
);
// set task params
taskProps
.
setTaskParams
(
taskNode
.
getParams
());
// set tenant code , execute task linux user
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskInstance
.
getId
());
taskProps
.
setScheduleTime
(
processInstance
.
getScheduleTime
());
taskProps
.
setNodeName
(
taskInstance
.
getName
());
taskProps
.
setTaskInstId
(
taskInstance
.
getId
());
taskProps
.
setEnvFile
(
CommonUtils
.
getSystemEnvPath
());
ProcessDefinition
processDefine
=
processDao
.
findProcessDefineById
(
processInstance
.
getProcessDefinitionId
());
// get process instance according to tak instance
ProcessInstance
processInstance
=
taskInstance
.
getProcessInstance
();
// get process define according to tak instance
ProcessDefinition
processDefine
=
taskInstance
.
getProcessDefine
();
// get tenant info
Tenant
tenant
=
processDao
.
getTenantForProcess
(
processInstance
.
getTenantId
(),
processDefine
.
getUserId
());
processDefine
.
getUserId
());
if
(
tenant
==
null
){
processInstance
.
setTenantCode
(
tenant
.
getTenantCode
());
logger
.
error
(
"cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}"
,
processDefine
.
getId
(),
processDefine
.
getTenantId
(),
processDefine
.
getUserId
()
);
status
=
ExecutionStatus
.
FAILURE
;
logger
.
error
(
"cannot find the tenant, process definition id:{}, user id:{}"
,
processDefine
.
getId
(),
processDefine
.
getUserId
());
task
.
setExitStatusCode
(
Constants
.
EXIT_CODE_FAILURE
);
}
else
{
taskProps
.
setTenantCode
(
tenant
.
getTenantCode
());
String
queue
=
processDao
.
queryQueueByProcessInstanceId
(
processInstance
.
getId
());
// set queue
if
(
StringUtils
.
isEmpty
(
queue
)){
taskProps
.
setQueue
(
taskInstance
.
getProcessInstance
().
getQueue
());
}
else
{
taskProps
.
setQueue
(
tenant
.
getQueueName
());
}
taskProps
.
setTaskStartTime
(
taskInstance
.
getStartTime
());
taskProps
.
setDefinedParams
(
allParamMap
);
// set task props
TaskProps
taskProps
=
new
TaskProps
(
taskNode
.
getParams
(),
taskInstance
.
getExecutePath
(),
processInstance
.
getScheduleTime
(),
taskInstance
.
getName
(),
taskInstance
.
getTaskType
(),
taskInstance
.
getId
(),
CommonUtils
.
getSystemEnvPath
(),
tenant
.
getTenantCode
(),
tenant
.
getQueueName
(),
taskInstance
.
getStartTime
(),
getGlobalParamsMap
(),
taskInstance
.
getDependency
(),
processInstance
.
getCmdTypeIfComplement
());
// set task timeout
setTaskTimeout
(
taskProps
,
taskNode
);
taskProps
.
setDependence
(
taskInstance
.
getDependency
());
taskProps
.
setTaskAppId
(
String
.
format
(
"%s_%s_%s"
,
taskInstance
.
getProcessDefine
().
getId
(),
taskInstance
.
getProcessInstance
().
getId
(),
...
...
@@ -209,72 +152,98 @@ public class TaskScheduleThread implements Callable<Boolean> {
taskInstance
.
getProcessInstance
().
getId
(),
taskInstance
.
getId
()));
task
=
TaskManager
.
newTask
(
taskInstance
.
getTaskType
(),
taskProps
,
taskLogger
);
task
=
TaskManager
.
newTask
(
taskInstance
.
getTaskType
(),
taskProps
,
taskLogger
);
//
job
init
//
task
init
task
.
init
();
//
job
handle
//
task
handle
task
.
handle
();
logger
.
info
(
"task : {} exit status code : {}"
,
taskProps
.
getTaskAppId
(),
task
.
getExitStatusCode
());
if
(
task
.
getExitStatusCode
()
==
Constants
.
EXIT_CODE_SUCCESS
){
status
=
ExecutionStatus
.
SUCCESS
;
// task recor flat : if true , start up qianfan
if
(
TaskRecordDao
.
getTaskRecordFlag
()
&&
TaskType
.
typeIsNormalTask
(
taskInstance
.
getTaskType
())){
AbstractParameters
params
=
(
AbstractParameters
)
JSONUtils
.
parseObject
(
taskProps
.
getTaskParams
(),
getCurTaskParamsClass
());
// replace placeholder
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
taskProps
.
getUserDefParamsMap
(),
taskProps
.
getDefinedParams
(),
params
.
getLocalParametersMap
(),
processInstance
.
getCmdTypeIfComplement
(),
processInstance
.
getScheduleTime
());
if
(
paramsMap
!=
null
&&
!
paramsMap
.
isEmpty
()
&&
paramsMap
.
containsKey
(
"v_proc_date"
)){
String
vProcDate
=
paramsMap
.
get
(
"v_proc_date"
).
getValue
();
if
(!
StringUtils
.
isEmpty
(
vProcDate
)){
TaskRecordStatus
taskRecordState
=
TaskRecordDao
.
getTaskRecordState
(
taskInstance
.
getName
(),
vProcDate
);
logger
.
info
(
"task record status : {}"
,
taskRecordState
);
if
(
taskRecordState
==
TaskRecordStatus
.
FAILURE
){
status
=
ExecutionStatus
.
FAILURE
;
}
}
}
}
}
else
if
(
task
.
getExitStatusCode
()
==
Constants
.
EXIT_CODE_KILL
){
status
=
ExecutionStatus
.
KILL
;
}
else
{
status
=
ExecutionStatus
.
FAILURE
;
}
// task result process
task
.
after
();
}
}
catch
(
Exception
e
){
logger
.
error
(
"task escheduler failure : "
,
e
);
status
=
ExecutionStatus
.
FAILURE
;
logger
.
error
(
String
.
format
(
"task process exception, process id : %s , task : %s"
,
taskInstance
.
getProcessInstanceId
(),
taskInstance
.
getName
()),
e
);
logger
.
error
(
"task scheduler failure"
,
e
);
task
.
setExitStatusCode
(
Constants
.
EXIT_CODE_FAILURE
);
kill
();
}
logger
.
info
(
"task instance id : {},task final status : {}"
,
taskInstance
.
getId
(),
task
.
getExitStatus
());
// update task instance state
processDao
.
changeTaskState
(
status
,
processDao
.
changeTaskState
(
task
.
getExitStatus
()
,
new
Date
(),
taskInstance
.
getId
());
return
task
.
getExitStatusCode
()
>
Constants
.
EXIT_CODE_SUCCESS
;
}
/**
* set task time out
* get global paras map
* @return
*/
private
Map
<
String
,
String
>
getGlobalParamsMap
()
{
Map
<
String
,
String
>
globalParamsMap
=
new
HashMap
<>(
16
);
// global params string
String
globalParamsStr
=
taskInstance
.
getProcessInstance
().
getGlobalParams
();
if
(
globalParamsStr
!=
null
)
{
List
<
Property
>
globalParamsList
=
JSONObject
.
parseArray
(
globalParamsStr
,
Property
.
class
);
globalParamsMap
.
putAll
(
globalParamsList
.
stream
().
collect
(
Collectors
.
toMap
(
Property:
:
getProp
,
Property:
:
getValue
)));
}
return
globalParamsMap
;
}
/**
* update task state according to task type
* @param taskType
*/
private
void
updateTaskState
(
String
taskType
)
{
// update task status is running
if
(
taskType
.
equals
(
TaskType
.
SQL
.
name
())
||
taskType
.
equals
(
TaskType
.
PROCEDURE
.
name
())){
processDao
.
changeTaskState
(
ExecutionStatus
.
RUNNING_EXEUTION
,
taskInstance
.
getStartTime
(),
taskInstance
.
getHost
(),
null
,
getTaskLogPath
(),
taskInstance
.
getId
());
}
else
{
processDao
.
changeTaskState
(
ExecutionStatus
.
RUNNING_EXEUTION
,
taskInstance
.
getStartTime
(),
taskInstance
.
getHost
(),
taskInstance
.
getExecutePath
(),
getTaskLogPath
(),
taskInstance
.
getId
());
}
}
/**
* get task log path
* @return
*/
private
String
getTaskLogPath
()
{
return
System
.
getProperty
(
"user.dir"
)
+
Constants
.
SINGLE_SLASH
+
"logs"
+
Constants
.
SINGLE_SLASH
+
taskInstance
.
getProcessDefinitionId
()
+
Constants
.
SINGLE_SLASH
+
taskInstance
.
getProcessInstanceId
()
+
Constants
.
SINGLE_SLASH
+
taskInstance
.
getId
()
+
".log"
;
}
/**
* set task timeout
* @param taskProps
* @param taskNode
*/
private
void
setTaskTimeout
(
TaskProps
taskProps
,
TaskNode
taskNode
)
{
// the default timeout is the maximum value of the integer
taskProps
.
setTaskTimeout
(
Integer
.
MAX_VALUE
);
TaskTimeoutParameter
taskTimeoutParameter
=
taskNode
.
getTaskTimeoutParameter
();
if
(
taskTimeoutParameter
.
getEnable
()){
// get timeout strategy
taskProps
.
setTaskTimeoutStrategy
(
taskTimeoutParameter
.
getStrategy
());
switch
(
taskTimeoutParameter
.
getStrategy
()){
case
WARN:
...
...
@@ -298,38 +267,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
}
/**
* get current task parameter class
* @return
*/
private
Class
getCurTaskParamsClass
(){
Class
paramsClass
=
null
;
TaskType
taskType
=
TaskType
.
valueOf
(
taskInstance
.
getTaskType
());
switch
(
taskType
){
case
SHELL:
paramsClass
=
ShellParameters
.
class
;
break
;
case
SQL:
paramsClass
=
SqlParameters
.
class
;
break
;
case
PROCEDURE:
paramsClass
=
ProcedureParameters
.
class
;
break
;
case
MR:
paramsClass
=
MapreduceParameters
.
class
;
break
;
case
SPARK:
paramsClass
=
SparkParameters
.
class
;
break
;
case
PYTHON:
paramsClass
=
PythonParameters
.
class
;
break
;
default
:
logger
.
error
(
"not support this task type: {}"
,
taskType
);
throw
new
IllegalArgumentException
(
"not support this task type"
);
}
return
paramsClass
;
}
/**
* kill task
...
...
@@ -376,9 +314,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
File
resFile
=
new
File
(
execLocalPath
,
res
);
if
(!
resFile
.
exists
())
{
try
{
/**
* query the tenant code of the resource according to the name of the resource
*/
// query the tenant code of the resource according to the name of the resource
String
tentnCode
=
processDao
.
queryTenantCodeByResName
(
res
);
String
resHdfsPath
=
HadoopUtils
.
getHdfsFilename
(
tentnCode
,
res
);
...
...
@@ -388,7 +324,6 @@ public class TaskScheduleThread implements Callable<Boolean> {
logger
.
error
(
e
.
getMessage
(),
e
);
throw
new
RuntimeException
(
e
.
getMessage
());
}
}
else
{
logger
.
info
(
"file : {} exists "
,
resFile
.
getName
());
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
浏览文件 @
c8c92e4b
...
...
@@ -67,6 +67,11 @@ public abstract class AbstractCommandExecutor {
*/
protected
final
String
taskAppId
;
/**
* task appId
*/
protected
final
int
taskInstId
;
/**
* tenant code , execute task linux user
*/
...
...
@@ -99,11 +104,12 @@ public abstract class AbstractCommandExecutor {
public
AbstractCommandExecutor
(
Consumer
<
List
<
String
>>
logHandler
,
String
taskDir
,
String
taskAppId
,
String
tenantCode
,
String
envFile
,
String
taskDir
,
String
taskAppId
,
int
taskInstId
,
String
tenantCode
,
String
envFile
,
Date
startTime
,
int
timeout
,
Logger
logger
){
this
.
logHandler
=
logHandler
;
this
.
taskDir
=
taskDir
;
this
.
taskAppId
=
taskAppId
;
this
.
taskInstId
=
taskInstId
;
this
.
tenantCode
=
tenantCode
;
this
.
envFile
=
envFile
;
this
.
startTime
=
startTime
;
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
浏览文件 @
c8c92e4b
...
...
@@ -16,10 +16,26 @@
*/
package
cn.escheduler.server.worker.task
;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.enums.ExecutionStatus
;
import
cn.escheduler.common.enums.TaskRecordStatus
;
import
cn.escheduler.common.enums.TaskType
;
import
cn.escheduler.common.process.Property
;
import
cn.escheduler.common.task.AbstractParameters
;
import
cn.escheduler.common.task.mr.MapreduceParameters
;
import
cn.escheduler.common.task.procedure.ProcedureParameters
;
import
cn.escheduler.common.task.python.PythonParameters
;
import
cn.escheduler.common.task.shell.ShellParameters
;
import
cn.escheduler.common.task.spark.SparkParameters
;
import
cn.escheduler.common.task.sql.SqlParameters
;
import
cn.escheduler.common.utils.JSONUtils
;
import
cn.escheduler.dao.TaskRecordDao
;
import
cn.escheduler.server.utils.ParamUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.slf4j.Logger
;
import
java.util.List
;
import
java.util.Map
;
/**
* executive task
...
...
@@ -70,7 +86,7 @@ public abstract class AbstractTask {
public
void
cancelApplication
(
boolean
status
)
throws
Exception
{
cancel
=
true
;
this
.
cancel
=
status
;
}
/**
...
...
@@ -89,6 +105,9 @@ public abstract class AbstractTask {
return
exitStatusCode
;
}
public
void
setExitStatusCode
(
int
exitStatusCode
)
{
this
.
exitStatusCode
=
exitStatusCode
;
}
/**
* get task parameters
...
...
@@ -96,4 +115,96 @@ public abstract class AbstractTask {
public
abstract
AbstractParameters
getParameters
();
/**
* result processing
*/
public
void
after
(){
if
(
getExitStatusCode
()
==
Constants
.
EXIT_CODE_SUCCESS
){
// task recor flat : if true , start up qianfan
if
(
TaskRecordDao
.
getTaskRecordFlag
()
&&
TaskType
.
typeIsNormalTask
(
taskProps
.
getTaskType
())){
AbstractParameters
params
=
(
AbstractParameters
)
JSONUtils
.
parseObject
(
taskProps
.
getTaskParams
(),
getCurTaskParamsClass
());
// replace placeholder
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
taskProps
.
getUserDefParamsMap
(),
taskProps
.
getDefinedParams
(),
params
.
getLocalParametersMap
(),
taskProps
.
getCmdTypeIfComplement
(),
taskProps
.
getScheduleTime
());
if
(
paramsMap
!=
null
&&
!
paramsMap
.
isEmpty
()
&&
paramsMap
.
containsKey
(
"v_proc_date"
)){
String
vProcDate
=
paramsMap
.
get
(
"v_proc_date"
).
getValue
();
if
(!
StringUtils
.
isEmpty
(
vProcDate
)){
TaskRecordStatus
taskRecordState
=
TaskRecordDao
.
getTaskRecordState
(
taskProps
.
getNodeName
(),
vProcDate
);
logger
.
info
(
"task record status : {}"
,
taskRecordState
);
if
(
taskRecordState
==
TaskRecordStatus
.
FAILURE
){
setExitStatusCode
(
Constants
.
EXIT_CODE_FAILURE
);
}
}
}
}
}
else
if
(
getExitStatusCode
()
==
Constants
.
EXIT_CODE_KILL
){
setExitStatusCode
(
Constants
.
EXIT_CODE_KILL
);
}
else
{
setExitStatusCode
(
Constants
.
EXIT_CODE_FAILURE
);
}
}
/**
* get current task parameter class
* @return
*/
private
Class
getCurTaskParamsClass
(){
Class
paramsClass
=
null
;
// get task type
TaskType
taskType
=
TaskType
.
valueOf
(
taskProps
.
getTaskType
());
switch
(
taskType
){
case
SHELL:
paramsClass
=
ShellParameters
.
class
;
break
;
case
SQL:
paramsClass
=
SqlParameters
.
class
;
break
;
case
PROCEDURE:
paramsClass
=
ProcedureParameters
.
class
;
break
;
case
MR:
paramsClass
=
MapreduceParameters
.
class
;
break
;
case
SPARK:
paramsClass
=
SparkParameters
.
class
;
break
;
case
PYTHON:
paramsClass
=
PythonParameters
.
class
;
break
;
default
:
logger
.
error
(
"not support this task type: {}"
,
taskType
);
throw
new
IllegalArgumentException
(
"not support this task type"
);
}
return
paramsClass
;
}
/**
* get exit status according to exitCode
* @return
*/
public
ExecutionStatus
getExitStatus
(){
ExecutionStatus
status
;
switch
(
getExitStatusCode
()){
case
Constants
.
EXIT_CODE_SUCCESS
:
status
=
ExecutionStatus
.
SUCCESS
;
break
;
case
Constants
.
EXIT_CODE_KILL
:
status
=
ExecutionStatus
.
KILL
;
break
;
default
:
status
=
ExecutionStatus
.
FAILURE
;
break
;
}
return
status
;
}
}
\ No newline at end of file
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
浏览文件 @
c8c92e4b
...
...
@@ -38,7 +38,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
/**
* process task
*/
private
ShellCommandExecutor
processTask
;
private
ShellCommandExecutor
shellCommandExecutor
;
/**
* process database access
...
...
@@ -53,21 +53,25 @@ public abstract class AbstractYarnTask extends AbstractTask {
public
AbstractYarnTask
(
TaskProps
taskProps
,
Logger
logger
)
{
super
(
taskProps
,
logger
);
this
.
processDao
=
DaoFactory
.
getDaoInstance
(
ProcessDao
.
class
);
// find process instance by taskId
this
.
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskProps
.
getTaskInstId
());
this
.
processTask
=
new
ShellCommandExecutor
(
this
::
logHandle
,
taskProps
.
getTaskDir
(),
taskProps
.
getTaskAppId
(),
taskProps
.
getTenantCode
(),
taskProps
.
getEnvFile
(),
taskProps
.
getTaskStartTime
(),
taskProps
.
getTaskTimeout
(),
logger
);
this
.
shellCommandExecutor
=
new
ShellCommandExecutor
(
this
::
logHandle
,
taskProps
.
getTaskDir
(),
taskProps
.
getTaskAppId
(),
taskProps
.
getTaskInstId
(),
taskProps
.
getTenantCode
(),
taskProps
.
getEnvFile
(),
taskProps
.
getTaskStartTime
(),
taskProps
.
getTaskTimeout
(),
logger
);
}
@Override
public
void
handle
()
throws
Exception
{
try
{
// construct process
exitStatusCode
=
processTask
.
run
(
buildCommand
(),
processDao
);
exitStatusCode
=
shellCommandExecutor
.
run
(
buildCommand
(),
processDao
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"yarn process fail
ed : "
+
e
.
getMessage
()
,
e
);
logger
.
error
(
"yarn process fail
ure"
,
e
);
exitStatusCode
=
-
1
;
}
}
...
...
@@ -76,9 +80,8 @@ public abstract class AbstractYarnTask extends AbstractTask {
public
void
cancelApplication
(
boolean
status
)
throws
Exception
{
cancel
=
true
;
// cancel process
processTask
.
cancelApplication
();
int
taskInstId
=
taskProps
.
getTaskInstId
();
TaskInstance
taskInstance
=
processDao
.
findTaskInstanceById
(
taskInstId
);
shellCommandExecutor
.
cancelApplication
();
TaskInstance
taskInstance
=
processDao
.
findTaskInstanceById
(
taskProps
.
getTaskInstId
());
if
(
status
&&
taskInstance
!=
null
){
ProcessUtils
.
killYarnJob
(
taskInstance
);
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
浏览文件 @
c8c92e4b
...
...
@@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task;
import
cn.escheduler.common.Constants
;
import
cn.escheduler.common.utils.FileUtils
;
import
cn.escheduler.common.utils.PropertyUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -43,9 +42,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
public
PythonCommandExecutor
(
Consumer
<
List
<
String
>>
logHandler
,
String
taskDir
,
String
taskAppId
,
String
tenantCode
,
String
envFile
,
Date
startTime
,
int
timeout
,
Logger
logger
)
{
super
(
logHandler
,
taskDir
,
taskAppId
,
tenantCode
,
envFile
,
startTime
,
timeout
,
logger
);
String
taskDir
,
String
taskAppId
,
int
taskInstId
,
String
tenantCode
,
String
envFile
,
Date
startTime
,
int
timeout
,
Logger
logger
)
{
super
(
logHandler
,
taskDir
,
taskAppId
,
taskInstId
,
tenantCode
,
envFile
,
startTime
,
timeout
,
logger
);
}
...
...
@@ -67,7 +72,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
*/
@Override
protected
void
createCommandFileIfNotExists
(
String
execCommand
,
String
commandFile
)
throws
IOException
{
logger
.
info
(
"tenant
:{}, wor
k dir:{}"
,
tenantCode
,
taskDir
);
logger
.
info
(
"tenant
Code :{}, tas
k dir:{}"
,
tenantCode
,
taskDir
);
if
(!
Files
.
exists
(
Paths
.
get
(
commandFile
)))
{
logger
.
info
(
"generate command file:{}"
,
commandFile
);
...
...
@@ -80,16 +85,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
logger
.
info
(
sb
.
toString
());
// write data to file
FileUtils
.
writeStringToFile
(
new
File
(
commandFile
),
sb
.
toString
(),
StandardCharsets
.
UTF_8
);
FileUtils
.
writeStringToFile
(
new
File
(
commandFile
),
sb
.
toString
(),
StandardCharsets
.
UTF_8
);
}
}
@Override
protected
String
commandType
()
{
String
envPath
=
PropertyUtils
.
getString
(
Constants
.
ESCHEDULER_ENV_PATH
);
String
pythonHome
=
getPythonHome
(
envPath
);
String
pythonHome
=
getPythonHome
(
envFile
);
if
(
StringUtils
.
isEmpty
(
pythonHome
)){
return
PYTHON
;
}
...
...
@@ -108,16 +112,25 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
/**
* get python home
* get the absolute path of the Python command
* note :
* common.properties
* PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
*
* for example :
* your PYTHON_HOM is /opt/python3.7/
* you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
* escheduler.env.path file.
*
* @param envPath
* @return
*/
private
static
String
getPythonHome
(
String
envPath
){
BufferedReader
br
=
null
;
String
line
=
null
;
StringBuilder
sb
=
new
StringBuilder
();
try
{
br
=
new
BufferedReader
(
new
InputStreamReader
(
new
FileInputStream
(
envPath
)));
String
line
;
while
((
line
=
br
.
readLine
())
!=
null
){
if
(
line
.
contains
(
Constants
.
PYTHON_HOME
)){
sb
.
append
(
line
);
...
...
@@ -128,13 +141,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
if
(
org
.
apache
.
commons
.
lang
.
StringUtils
.
isEmpty
(
result
)){
return
null
;
}
String
[]
arrs
=
result
.
split
(
"="
);
String
[]
arrs
=
result
.
split
(
Constants
.
EQUAL_SIGN
);
if
(
arrs
.
length
==
2
){
return
arrs
[
1
];
}
}
catch
(
IOException
e
){
logger
.
error
(
"read file fail
ed : "
+
e
.
getMessage
()
,
e
);
logger
.
error
(
"read file fail
ure"
,
e
);
}
finally
{
try
{
if
(
br
!=
null
){
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
浏览文件 @
c8c92e4b
...
...
@@ -29,9 +29,7 @@ import java.util.List;
import
java.util.function.Consumer
;
/**
* command executor
*
* 进程,真正在worker服务器上执行的任务
* shell command executor
*/
public
class
ShellCommandExecutor
extends
AbstractCommandExecutor
{
...
...
@@ -39,9 +37,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
public
ShellCommandExecutor
(
Consumer
<
List
<
String
>>
logHandler
,
String
taskDir
,
String
taskAppId
,
String
tenantCode
,
String
envFile
,
Date
startTime
,
int
timeout
,
Logger
logger
)
{
super
(
logHandler
,
taskDir
,
taskAppId
,
tenantCode
,
envFile
,
startTime
,
timeout
,
logger
);
String
taskDir
,
String
taskAppId
,
int
taskInstId
,
String
tenantCode
,
String
envFile
,
Date
startTime
,
int
timeout
,
Logger
logger
)
{
super
(
logHandler
,
taskDir
,
taskAppId
,
taskInstId
,
tenantCode
,
envFile
,
startTime
,
timeout
,
logger
);
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
浏览文件 @
c8c92e4b
...
...
@@ -16,6 +16,7 @@
*/
package
cn.escheduler.server.worker.task
;
import
cn.escheduler.common.enums.CommandType
;
import
cn.escheduler.common.enums.DataType
;
import
cn.escheduler.common.enums.Direct
;
import
cn.escheduler.common.enums.TaskTimeoutStrategy
;
...
...
@@ -46,6 +47,8 @@ public class TaskProps {
**/
private
String
tenantCode
;
private
String
taskType
;
/**
* task parameters
**/
...
...
@@ -101,6 +104,41 @@ public class TaskProps {
*/
private
Date
scheduleTime
;
/**
* command type is complement
*/
private
CommandType
cmdTypeIfComplement
;
public
TaskProps
(){}
public
TaskProps
(
String
taskParams
,
String
taskDir
,
Date
scheduleTime
,
String
nodeName
,
String
taskType
,
int
taskInstId
,
String
envFile
,
String
tenantCode
,
String
queue
,
Date
taskStartTime
,
Map
<
String
,
String
>
definedParams
,
String
dependence
,
CommandType
cmdTypeIfComplement
){
this
.
taskParams
=
taskParams
;
this
.
taskDir
=
taskDir
;
this
.
scheduleTime
=
scheduleTime
;
this
.
nodeName
=
nodeName
;
this
.
taskType
=
taskType
;
this
.
taskInstId
=
taskInstId
;
this
.
envFile
=
envFile
;
this
.
tenantCode
=
tenantCode
;
this
.
queue
=
queue
;
this
.
taskStartTime
=
taskStartTime
;
this
.
definedParams
=
definedParams
;
this
.
dependence
=
dependence
;
this
.
cmdTypeIfComplement
=
cmdTypeIfComplement
;
}
public
String
getTenantCode
()
{
return
tenantCode
;
...
...
@@ -200,22 +238,12 @@ public class TaskProps {
this
.
taskTimeoutStrategy
=
taskTimeoutStrategy
;
}
/**
* get parameters map
* @return
*/
public
Map
<
String
,
Property
>
getUserDefParamsMap
()
{
if
(
definedParams
!=
null
)
{
Map
<
String
,
Property
>
userDefParamsMaps
=
new
HashMap
<>();
Iterator
<
Map
.
Entry
<
String
,
String
>>
iter
=
definedParams
.
entrySet
().
iterator
();
while
(
iter
.
hasNext
()){
Map
.
Entry
<
String
,
String
>
en
=
iter
.
next
();
Property
property
=
new
Property
(
en
.
getKey
(),
Direct
.
IN
,
DataType
.
VARCHAR
,
en
.
getValue
());
userDefParamsMaps
.
put
(
property
.
getProp
(),
property
);
}
return
userDefParamsMaps
;
}
return
null
;
public
String
getTaskType
()
{
return
taskType
;
}
public
void
setTaskType
(
String
taskType
)
{
this
.
taskType
=
taskType
;
}
public
String
getDependence
()
{
...
...
@@ -233,4 +261,30 @@ public class TaskProps {
public
void
setScheduleTime
(
Date
scheduleTime
)
{
this
.
scheduleTime
=
scheduleTime
;
}
public
CommandType
getCmdTypeIfComplement
()
{
return
cmdTypeIfComplement
;
}
public
void
setCmdTypeIfComplement
(
CommandType
cmdTypeIfComplement
)
{
this
.
cmdTypeIfComplement
=
cmdTypeIfComplement
;
}
/**
* get parameters map
* @return
*/
public
Map
<
String
,
Property
>
getUserDefParamsMap
()
{
if
(
definedParams
!=
null
)
{
Map
<
String
,
Property
>
userDefParamsMaps
=
new
HashMap
<>();
Iterator
<
Map
.
Entry
<
String
,
String
>>
iter
=
definedParams
.
entrySet
().
iterator
();
while
(
iter
.
hasNext
()){
Map
.
Entry
<
String
,
String
>
en
=
iter
.
next
();
Property
property
=
new
Property
(
en
.
getKey
(),
Direct
.
IN
,
DataType
.
VARCHAR
,
en
.
getValue
());
userDefParamsMaps
.
put
(
property
.
getProp
(),
property
);
}
return
userDefParamsMaps
;
}
return
null
;
}
}
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
浏览文件 @
c8c92e4b
...
...
@@ -208,6 +208,4 @@ public class DependentExecute {
return
dependResultMap
;
}
}
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
浏览文件 @
c8c92e4b
...
...
@@ -104,7 +104,7 @@ public class DependentTask extends AbstractTask {
Constants
.
EXIT_CODE_SUCCESS
:
Constants
.
EXIT_CODE_FAILURE
;
}
}
catch
(
Exception
e
){
logger
.
error
(
"Exception "
+
e
);
logger
.
error
(
e
.
getMessage
(),
e
);
exitStatusCode
=
-
1
;
}
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
浏览文件 @
c8c92e4b
...
...
@@ -70,8 +70,8 @@ public class MapReduceTask extends AbstractYarnTask {
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
taskProps
.
getUserDefParamsMap
(),
taskProps
.
getDefinedParams
(),
mapreduceParameters
.
getLocalParametersMap
(),
processInstance
.
getCmdTypeIfComplement
(),
processInstance
.
getScheduleTime
());
taskProps
.
getCmdTypeIfComplement
(),
taskProps
.
getScheduleTime
());
if
(
paramsMap
!=
null
){
String
args
=
ParameterUtils
.
convertParameterPlaceholders
(
mapreduceParameters
.
getMainArgs
(),
ParamUtils
.
convert
(
paramsMap
));
mapreduceParameters
.
setMainArgs
(
args
);
...
...
@@ -86,7 +86,8 @@ public class MapReduceTask extends AbstractYarnTask {
protected
String
buildCommand
()
throws
Exception
{
List
<
String
>
parameterList
=
buildParameters
(
mapreduceParameters
);
String
command
=
ParameterUtils
.
convertParameterPlaceholders
(
String
.
join
(
" "
,
parameterList
),
taskProps
.
getDefinedParams
());
String
command
=
ParameterUtils
.
convertParameterPlaceholders
(
String
.
join
(
" "
,
parameterList
),
taskProps
.
getDefinedParams
());
logger
.
info
(
"mapreduce task command: {}"
,
command
);
return
command
;
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
浏览文件 @
c8c92e4b
此差异已折叠。
点击以展开。
escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
浏览文件 @
c8c92e4b
...
...
@@ -20,27 +20,18 @@ package cn.escheduler.server.worker.task.python;
import
cn.escheduler.common.process.Property
;
import
cn.escheduler.common.task.AbstractParameters
;
import
cn.escheduler.common.task.python.PythonParameters
;
import
cn.escheduler.common.utils.CommonUtils
;
import
cn.escheduler.common.utils.JSONUtils
;
import
cn.escheduler.common.utils.ParameterUtils
;
import
cn.escheduler.dao.DaoFactory
;
import
cn.escheduler.dao.ProcessDao
;
import
cn.escheduler.dao.model.ProcessInstance
;
import
cn.escheduler.server.utils.ParamUtils
;
import
cn.escheduler.server.worker.task.AbstractTask
;
import
cn.escheduler.server.worker.task.PythonCommandExecutor
;
import
cn.escheduler.server.worker.task.TaskProps
;
import
org.slf4j.Logger
;
import
java.io.File
;
import
java.nio.file.Files
;
import
java.nio.file.Path
;
import
java.nio.file.StandardOpenOption
;
import
java.nio.file.attribute.FileAttribute
;
import
java.nio.file.attribute.PosixFilePermission
;
import
java.nio.file.attribute.PosixFilePermissions
;
import
java.util.Map
;
import
java.util.Set
;
/**
* python task
...
...
@@ -57,7 +48,10 @@ public class PythonTask extends AbstractTask {
*/
private
String
taskDir
;
private
PythonCommandExecutor
pythonProcessTask
;
/**
* python command executor
*/
private
PythonCommandExecutor
pythonCommandExecutor
;
/**
* process database access
...
...
@@ -70,10 +64,15 @@ public class PythonTask extends AbstractTask {
this
.
taskDir
=
taskProps
.
getTaskDir
();
this
.
pythonProcessTask
=
new
PythonCommandExecutor
(
this
::
logHandle
,
taskProps
.
getTaskDir
(),
taskProps
.
getTaskAppId
(),
taskProps
.
getTenantCode
(),
null
,
taskProps
.
getTaskStartTime
(),
taskProps
.
getTaskTimeout
(),
logger
);
this
.
pythonCommandExecutor
=
new
PythonCommandExecutor
(
this
::
logHandle
,
taskProps
.
getTaskDir
(),
taskProps
.
getTaskAppId
(),
taskProps
.
getTaskInstId
(),
taskProps
.
getTenantCode
(),
taskProps
.
getEnvFile
(),
taskProps
.
getTaskStartTime
(),
taskProps
.
getTaskTimeout
(),
logger
);
this
.
processDao
=
DaoFactory
.
getDaoInstance
(
ProcessDao
.
class
);
}
...
...
@@ -92,9 +91,9 @@ public class PythonTask extends AbstractTask {
public
void
handle
()
throws
Exception
{
try
{
// construct process
exitStatusCode
=
python
ProcessTask
.
run
(
buildCommand
(),
processDao
);
exitStatusCode
=
python
CommandExecutor
.
run
(
buildCommand
(),
processDao
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"python
process exception
"
,
e
);
logger
.
error
(
"python
task failure
"
,
e
);
exitStatusCode
=
-
1
;
}
}
...
...
@@ -102,7 +101,7 @@ public class PythonTask extends AbstractTask {
@Override
public
void
cancelApplication
(
boolean
cancelApplication
)
throws
Exception
{
// cancel process
python
ProcessTask
.
cancelApplication
();
python
CommandExecutor
.
cancelApplication
();
}
/**
...
...
@@ -111,21 +110,7 @@ public class PythonTask extends AbstractTask {
* @throws Exception
*/
private
String
buildCommand
()
throws
Exception
{
// generate scripts
// String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId());
// Path path = new File(fileName).toPath();
// if (Files.exists(path)) {
// return fileName;
// }
String
rawScript
=
pythonParameters
.
getRawScript
().
replaceAll
(
"\\r\\n"
,
"\n"
);
// find process instance by task id
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskProps
.
getTaskInstId
());
String
rawPythonScript
=
pythonParameters
.
getRawScript
().
replaceAll
(
"\\r\\n"
,
"\n"
);
/**
* combining local and global parameters
...
...
@@ -133,27 +118,16 @@ public class PythonTask extends AbstractTask {
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
taskProps
.
getUserDefParamsMap
(),
taskProps
.
getDefinedParams
(),
pythonParameters
.
getLocalParametersMap
(),
processInstance
.
getCmdTypeIfComplement
(),
processInstance
.
getScheduleTime
());
taskProps
.
getCmdTypeIfComplement
(),
taskProps
.
getScheduleTime
());
if
(
paramsMap
!=
null
){
raw
Script
=
ParameterUtils
.
convertParameterPlaceholders
(
raw
Script
,
ParamUtils
.
convert
(
paramsMap
));
raw
PythonScript
=
ParameterUtils
.
convertParameterPlaceholders
(
rawPython
Script
,
ParamUtils
.
convert
(
paramsMap
));
}
// pythonParameters.setRawScript(rawScript);
logger
.
info
(
"raw script : {}"
,
pythonParameters
.
getRawScript
());
logger
.
info
(
"raw python script : {}"
,
pythonParameters
.
getRawScript
());
logger
.
info
(
"task dir : {}"
,
taskDir
);
// Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
// FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
//
// Files.createFile(path, attr);
//
// Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
//
// return fileName;
return
rawScript
;
return
rawPythonScript
;
}
@Override
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
浏览文件 @
c8c92e4b
...
...
@@ -54,7 +54,7 @@ public class ShellTask extends AbstractTask {
*/
private
String
taskDir
;
private
ShellCommandExecutor
processTask
;
private
ShellCommandExecutor
shellCommandExecutor
;
/**
* process database access
...
...
@@ -62,15 +62,19 @@ public class ShellTask extends AbstractTask {
private
ProcessDao
processDao
;
public
ShellTask
(
TaskProps
p
rops
,
Logger
logger
)
{
super
(
p
rops
,
logger
);
public
ShellTask
(
TaskProps
taskP
rops
,
Logger
logger
)
{
super
(
taskP
rops
,
logger
);
this
.
taskDir
=
p
rops
.
getTaskDir
();
this
.
taskDir
=
taskP
rops
.
getTaskDir
();
this
.
processTask
=
new
ShellCommandExecutor
(
this
::
logHandle
,
props
.
getTaskDir
(),
props
.
getTaskAppId
(),
props
.
getTenantCode
(),
props
.
getEnvFile
(),
props
.
getTaskStartTime
(),
props
.
getTaskTimeout
(),
logger
);
this
.
shellCommandExecutor
=
new
ShellCommandExecutor
(
this
::
logHandle
,
taskProps
.
getTaskDir
(),
taskProps
.
getTaskAppId
(),
taskProps
.
getTaskInstId
(),
taskProps
.
getTenantCode
(),
taskProps
.
getEnvFile
(),
taskProps
.
getTaskStartTime
(),
taskProps
.
getTaskTimeout
(),
logger
);
this
.
processDao
=
DaoFactory
.
getDaoInstance
(
ProcessDao
.
class
);
}
...
...
@@ -89,9 +93,9 @@ public class ShellTask extends AbstractTask {
public
void
handle
()
throws
Exception
{
try
{
// construct process
exitStatusCode
=
processTask
.
run
(
buildCommand
(),
processDao
);
exitStatusCode
=
shellCommandExecutor
.
run
(
buildCommand
(),
processDao
);
}
catch
(
Exception
e
)
{
logger
.
error
(
e
.
getMessage
()
,
e
);
logger
.
error
(
"shell task failure"
,
e
);
exitStatusCode
=
-
1
;
}
}
...
...
@@ -99,7 +103,7 @@ public class ShellTask extends AbstractTask {
@Override
public
void
cancelApplication
(
boolean
cancelApplication
)
throws
Exception
{
// cancel process
processTask
.
cancelApplication
();
shellCommandExecutor
.
cancelApplication
();
}
/**
...
...
@@ -118,8 +122,6 @@ public class ShellTask extends AbstractTask {
String
script
=
shellParameters
.
getRawScript
().
replaceAll
(
"\\r\\n"
,
"\n"
);
// find process instance by task id
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskProps
.
getTaskInstId
());
/**
* combining local and global parameters
...
...
@@ -127,8 +129,8 @@ public class ShellTask extends AbstractTask {
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
taskProps
.
getUserDefParamsMap
(),
taskProps
.
getDefinedParams
(),
shellParameters
.
getLocalParametersMap
(),
processInstance
.
getCmdTypeIfComplement
(),
processInstance
.
getScheduleTime
());
taskProps
.
getCmdTypeIfComplement
(),
taskProps
.
getScheduleTime
());
if
(
paramsMap
!=
null
){
script
=
ParameterUtils
.
convertParameterPlaceholders
(
script
,
ParamUtils
.
convert
(
paramsMap
));
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
浏览文件 @
c8c92e4b
...
...
@@ -66,8 +66,6 @@ public class SparkTask extends AbstractYarnTask {
if
(
StringUtils
.
isNotEmpty
(
sparkParameters
.
getMainArgs
()))
{
String
args
=
sparkParameters
.
getMainArgs
();
// get process instance by task instance id
ProcessInstance
processInstance
=
processDao
.
findProcessInstanceByTaskId
(
taskProps
.
getTaskInstId
());
/**
* combining local and global parameters
...
...
@@ -75,8 +73,8 @@ public class SparkTask extends AbstractYarnTask {
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
taskProps
.
getUserDefParamsMap
(),
taskProps
.
getDefinedParams
(),
sparkParameters
.
getLocalParametersMap
(),
processInstance
.
getCmdTypeIfComplement
(),
processInstance
.
getScheduleTime
());
taskProps
.
getCmdTypeIfComplement
(),
taskProps
.
getScheduleTime
());
if
(
paramsMap
!=
null
){
args
=
ParameterUtils
.
convertParameterPlaceholders
(
args
,
ParamUtils
.
convert
(
paramsMap
));
}
...
...
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
浏览文件 @
c8c92e4b
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录