未验证 提交 bc629f8f 编写于 作者: W Wenjun Ruan 提交者: GitHub

[3.0.1-prepare]cherry-pick Fix kill task failed will cause the taskGroup...

[3.0.1-prepare]cherry-pick Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469) (#12013)

* Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)

(cherry picked from commit 4d13a510)
上级 c10deeed
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>dev-SNAPSHOT</version>
<version>3.0.1-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-bom</artifactId>
<name>${project.artifactId}</name>
......
......@@ -65,8 +65,6 @@
<version>${clickhouse.jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
......
......@@ -48,6 +48,13 @@
<artifactId>dolphinscheduler-datasource-hive</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
......@@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import java.util.Date;
import com.google.auto.service.AutoService;
/**
* common task processor
*/
......
......@@ -136,7 +136,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
......@@ -2903,8 +2902,10 @@ public class ProcessServiceImpl implements ProcessService {
//try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId);
if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
return true;
}
logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
......@@ -2919,11 +2920,13 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
return false;
}
......@@ -2945,26 +2948,31 @@ public class ProcessServiceImpl implements ProcessService {
TaskGroup taskGroup;
TaskGroupQueue thisTaskGroupQueue;
logger.info("Begin to release task group: {}", taskInstance.getTaskGroupId());
try {
do {
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
logger.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId());
return null;
}
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
return null;
}
} while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) {
logger.error("release the task group error", e);
return null;
}
logger.info("updateTask:{}", taskInstance.getName());
logger.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId());
logger.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue;
do {
......@@ -2973,11 +2981,13 @@ public class ProcessServiceImpl implements ProcessService {
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
logger.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId());
return null;
}
} while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId());
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}
......
......@@ -1240,6 +1240,7 @@
</dependencies>
<modules>
<module>dolphinscheduler-bom</module>
<module>dolphinscheduler-alert</module>
<module>dolphinscheduler-spi</module>
<module>dolphinscheduler-registry</module>
......
......@@ -32,7 +32,6 @@ commons-compiler-3.1.6.jar
commons-compress-1.21.jar
commons-configuration-1.10.jar
commons-daemon-1.0.13.jar
commons-beanutils-1.9.4.jar
commons-dbcp-1.4.jar
commons-httpclient-3.0.1.jar
commons-io-2.11.0.jar
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册