未验证 提交 d74f6e71 编写于 作者: Q qiaozhanwei 提交者: GitHub

[Fix-3616][Server] when worker akc/response master exception , async retry (#3776)

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry

* [Fix-3616][Server] when worker akc/response master exception , async retry
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 b2e78cbf
......@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
......@@ -69,6 +70,8 @@ public class TaskCallbackService {
public TaskCallbackService(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
}
/**
......
......@@ -41,7 +41,7 @@ public class RetryReportTaskStatusThread implements Runnable {
/**
* every 5 minutes
*/
private static long RETRY_REPORT_TASK_STATUS_TIME = 5 * 60 * 1000L;
private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
/**
* task callback service
*/
......@@ -64,6 +64,10 @@ public class RetryReportTaskStatusThread implements Runnable {
ResponceCache responceCache = ResponceCache.get();
while (Stopper.isRunning()){
// sleep 5 minutes
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
if (!responceCache.getAckCache().isEmpty()){
Map<Integer,Command> ackCache = responceCache.getAckCache();
......@@ -79,14 +83,12 @@ public class RetryReportTaskStatusThread implements Runnable {
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
taskCallbackService.sendAck(taskInstanceId,responseCommand);
taskCallbackService.sendResult(taskInstanceId,responseCommand);
}
}
}catch (Exception e){
logger.warn("retry report task status error", e);
}
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_TIME);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册