提交 72f601cd 编写于 作者: B bao liang 提交者: qiaozhanwei

the process cannot be terminated while tasks in the status submit success. (#1070)

* update english documents

* refactor zk client

* update documents

* update zkclient

* update zkclient

* update documents

* add architecture-design

* change i18n

* update i18n

* update english documents

* add architecture-design

* update english documents

* update en-US documents

* add architecture-design

* update demo site

* add mybatis plus model

* modify mybatisplus

* modify mybatisplus

* change interface by mybatisplus

* add unit test

* refactor dao interface.

* add unit test for dao...

* add unit test for dao...

* add unit test for dao...

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ScheduleMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProcessInstanceMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectUserMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/QueueMapper.xml
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProcessInstanceMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProjectUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/QueueMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ResourceUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ScheduleMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/SessionMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/TenantMapperTest.java

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ScheduleMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProcessInstanceMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectUserMapper.xml
#	escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/QueueMapper.xml
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProcessInstanceMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProjectUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/QueueMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ResourceUserMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ScheduleMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/SessionMapperTest.java
#	escheduler-dao/src/test/java/cn/escheduler/dao/mapper/TenantMapperTest.java

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

* update some dao bugs

* update for some bugs

* update some bugs

* Merge remote-tracking branch 'upstream/dev-db' into dev-db

# Conflicts:
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
#	dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

* update

* update

* add multiply settings for application.yml

* add multiply settings for application.yml

* revert

* update configuration settings in task record dao...

* change application_master to application-master

* change application_master to application-master

* update application.yml to application.properties

* revert

* revert

* add properties

* add properties

* revert

* revert

* add api start up..
add alert send try catch

* update dao info level

* fix bug: task cannot submit when recovery failover

* fix bug: task cannot submit when recovery failover

* merge from dev-db

* revert

* revert

* fix bug: get process definition list failed.

* fix bug: process instance interval is error

* revert

* revert

* update

* support stop submit success tasks

* update kill process

* update for stop process

* update for stop process

* add some logs for stop process

* update for small bug.

* add check strategy before submit task

* revert

* update

* update

* revert

* wait task instance exists if null.

* revert

* update
上级 90f092f5
......@@ -484,6 +484,7 @@ public final class Constants {
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String XXXXXX = "******";
public static final String NULL = "NULL";
public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd";
......
......@@ -1023,13 +1023,18 @@ public class ProcessDao extends AbstractBaseDao {
* @param taskInstance
* @return
*/
private String taskZkInfo(TaskInstance taskInstance) {
public String taskZkInfo(TaskInstance taskInstance) {
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
if(processInstance == null){
logger.error("process instance is null. please check the task info, task id: " + taskInstance.getId());
return "";
}
StringBuilder sb = new StringBuilder(100);
sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
sb.append(processInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
.append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getId()).append(Constants.UNDERLINE);
......@@ -1102,7 +1107,8 @@ public class ProcessDao extends AbstractBaseDao {
// or return submit success
if( processInstanceState == ExecutionStatus.READY_PAUSE){
state = ExecutionStatus.PAUSE;
}else if(processInstanceState == ExecutionStatus.READY_STOP) {
}else if(processInstanceState == ExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance)) {
state = ExecutionStatus.KILL;
}else{
state = ExecutionStatus.SUBMITTED_SUCCESS;
......@@ -1110,6 +1116,22 @@ public class ProcessDao extends AbstractBaseDao {
return state;
}
private boolean checkProcessStrategy(TaskInstance taskInstance){
ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
FailureStrategy failureStrategy = processInstance.getFailureStrategy();
if(failureStrategy == FailureStrategy.CONTINUE){
return true;
}
List<TaskInstance> taskInstances = this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for(TaskInstance task : taskInstances){
if(task.getState() == ExecutionStatus.FAILURE){
return false;
}
}
return true;
}
/**
* check the task instance existing in queue
* @return
......@@ -1216,9 +1238,12 @@ public class ProcessDao extends AbstractBaseDao {
* @param taskInstId
* @return
*/
public TaskInstance getTaskInstanceRelationByTaskId(int taskInstId){
public TaskInstance getTaskInstanceDetailByTaskId(int taskInstId){
// get task instance
TaskInstance taskInstance = findTaskInstanceById(taskInstId);
if(taskInstance == null){
return taskInstance;
}
// get process instance
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
......
......@@ -90,4 +90,4 @@ task.record.datasource.username=xx
task.record.datasource.password=xx
# Logger Config
logging.level.org.apache.dolphinscheduler.dao=debug
#logging.level.org.apache.dolphinscheduler.dao=debug
......@@ -129,12 +129,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance(){
if(alreadyKilled || taskInstance.getHost() == null){
if(alreadyKilled){
return ;
}
alreadyKilled = true;
String host = taskInstance.getHost();
if(host == null){
host = Constants.NULL;
}
String queueValue = String.format("%s-%d",
taskInstance.getHost(), taskInstance.getId());
host, taskInstance.getId());
taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
logger.info("master add kill task :{} id:{} to kill queue",
......
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
......@@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.ProcessDao;
......@@ -302,24 +304,8 @@ public class WorkerServer extends AbstractServer {
// if set is null , return
if (CollectionUtils.isNotEmpty(taskInfoSet)){
for (String taskInfo : taskInfoSet){
// task info start with current host
if (taskInfo.startsWith(OSUtils.getHost())){
String[] taskInfoArr = taskInfo.split("-");
if (taskInfoArr.length != 2){
continue;
}else {
int taskInstId=Integer.parseInt(taskInfoArr[1]);
TaskInstance taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId);
if(taskInstance.getTaskType().equals(TaskType.DEPENDENT.toString())){
taskInstance.setState(ExecutionStatus.KILL);
processDao.saveTaskInstance(taskInstance);
}else{
ProcessUtils.kill(taskInstance);
}
taskQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_KILL,taskInfo);
}
}
killTask(taskInfo, processDao);
removeKillInfoFromQueue(taskInfo);
}
}
......@@ -330,5 +316,61 @@ public class WorkerServer extends AbstractServer {
return killProcessThread;
}
private void killTask(String taskInfo, ProcessDao pd) {
logger.info("get one kill command from tasks kill queue: " + taskInfo);
String[] taskInfoArray = taskInfo.split("-");
if(taskInfoArray.length != 2){
logger.error("error format kill info: " + taskInfo);
return ;
}
String host = taskInfoArray[0];
int taskInstanceId = Integer.parseInt(taskInfoArray[1]);
TaskInstance taskInstance = pd.getTaskInstanceDetailByTaskId(taskInstanceId);
if(taskInstance == null){
logger.error("cannot find the kill task :" + taskInfo);
return;
}
if(host.equals(Constants.NULL) && StringUtils.isEmpty(taskInstance.getHost())){
deleteTaskFromQueue(taskInstance, pd);
taskInstance.setState(ExecutionStatus.KILL);
pd.saveTaskInstance(taskInstance);
}else{
if(taskInstance.getTaskType().equals(TaskType.DEPENDENT.toString())){
taskInstance.setState(ExecutionStatus.KILL);
pd.saveTaskInstance(taskInstance);
}else if(!taskInstance.getState().typeIsFinished()){
ProcessUtils.kill(taskInstance);
}else{
logger.info("the task aleady finish: task id: " + taskInstance.getId()
+ " state: " + taskInstance.getState().toString());
}
}
}
private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessDao pd){
// creating distributed locks, lock path /dolphinscheduler/lock/worker
InterProcessMutex mutex = null;
logger.info("delete task from tasks queue: " + taskInstance.getId());
try {
mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
zkWorkerClient.getWorkerLockPath());
if(pd.checkTaskExistsInTaskQueue(taskInstance)){
String taskQueueStr = pd.taskZkInfo(taskInstance);
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
}
} catch (Exception e){
logger.error("remove task thread failure" ,e);
}finally {
AbstractZKClient.releaseMutex(mutex);
}
}
private void removeKillInfoFromQueue(String taskInfo){
taskQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_KILL,taskInfo);
}
}
......@@ -104,6 +104,7 @@ public class FetchTaskThread implements Runnable{
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Fetch-Task-Thread",workerExecNums);
this.conf = conf;
this.taskQueue = taskQueue;
this.taskInstance = null;
}
/**
......@@ -177,13 +178,15 @@ public class FetchTaskThread implements Runnable{
// get task instance id
taskInstId = getTaskInstanceId(taskQueueStr);
// get task instance relation
taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId);
// mainly to wait for the master insert task to succeed
waitForMasterEnterQueue();
taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);
// verify task instance is null
if (verifyTaskInstanceIsNull(taskInstance)) {
logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
continue;
}
......@@ -193,7 +196,7 @@ public class FetchTaskThread implements Runnable{
// verify tenant is null
if (verifyTenantIsNull(tenant)) {
logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
continue;
}
......@@ -204,8 +207,6 @@ public class FetchTaskThread implements Runnable{
logger.info("worker fetch taskId : {} from queue ", taskInstId);
// mainly to wait for the master insert task to succeed
waitForMasterEnterQueue();
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
......@@ -230,7 +231,7 @@ public class FetchTaskThread implements Runnable{
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// remove node from zk
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
}
}catch (Exception e){
......@@ -241,6 +242,10 @@ public class FetchTaskThread implements Runnable{
}
}
private void removeNodeFromTaskQueue(String taskQueueStr){
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
}
/**
* verify task instance is null
* @return
......@@ -304,7 +309,6 @@ public class FetchTaskThread implements Runnable{
*/
private void waitForMasterEnterQueue()throws Exception{
int retryTimes = 30;
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskInstId);
......
......@@ -15,7 +15,7 @@ master.task.commit.interval=100
# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2
#master.max.cpuload.avg=100
master.max.cpuload.avg=100
# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
master.reserved.memory=0.1
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册