未验证 提交 0ca30862 编写于 作者: W Wenjun Ruan 提交者: GitHub

Fix workflow instance may failover many times due to doesn't check the restart time (#11445)

上级 2240b6e7
......@@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.master.service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
......@@ -31,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
......@@ -41,9 +47,9 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
......@@ -51,14 +57,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
@Service
public class MasterFailoverService {
......@@ -70,15 +68,19 @@ public class MasterFailoverService {
private final NettyExecutorManager nettyExecutorManager;
private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager) {
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager;
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
}
......@@ -283,6 +285,15 @@ public class MasterFailoverService {
// The processInstance is newly created
return false;
}
if (processInstance.getRestartTime() != null && processInstance.getRestartTime().after(beFailoveredMasterStartupTime)) {
// the processInstance is already be failovered.
return false;
}
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
// the processInstance is a running process instance in the current master
return false;
}
return true;
}
......
......@@ -86,6 +86,9 @@ public class FailoverServiceTest {
@Mock
private NettyExecutorManager nettyExecutorManager;
@Mock
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private static int masterPort = 5678;
private static int workerPort = 1234;
......@@ -104,7 +107,7 @@ public class FailoverServiceTest {
given(masterConfig.getListenPort()).willReturn(masterPort);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager);
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册