diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 173140792b0176968343bbc4c124ccb0c082aad2..c308f3b23deb7b818c1ee89fbca5756fc970246f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -102,20 +102,23 @@ public class TaskCallbackService { while (Stopper.isRunning()) { masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); if (CollectionUtils.isEmpty(masterNodes)) { + masterNodes = null; ThreadUtils.sleep(SLEEP_TIME_MILLIS); - }else { - logger.error("find {} masters for task : {}.", - masterNodes.size(), - taskInstanceId); - break; + continue; } - } - for(String masterNode : masterNodes){ - newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); - if(newChannel != null){ - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + logger.info("find {} masters for task : {}.", + masterNodes.size(), + taskInstanceId); + for (String masterNode : masterNodes) { + newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); + if (newChannel != null) { + return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + } } + masterNodes = null; + ThreadUtils.sleep(SLEEP_TIME_MILLIS); } + throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId)); }