diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6c05554d8b5420a9e4a626a4cad8830edcbcd18d..016c8c980ec58691ca0da505c920cc85ec10aafb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; +import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; @@ -117,7 +117,8 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.start(); // self tolerant - this.zkMasterClient.start(this); + this.zkMasterClient.start(); + this.zkMasterClient.setStoppable(this); // scheduler start this.masterSchedulerService.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 5a0647fd670792be7f426c57c6a40b973a5c87b4..bb1e314f6e3ff83d453f95753a22557bcdc58975 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.commons.collections.CollectionUtils; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 95f97deb5b7e2e3329f3d40841af3bb8d40635a8..8f73dd101337bc8525f34394a441ad3f07f0a142 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import java.util.ArrayList; import java.util.Collection; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java similarity index 98% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index a487c21218b184b190a1203abe64e5c3d0c7e4c7..d713c8366ff53803fd80adad55acab88a34010c8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; +package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 6a3cc60ad24613f9a0a6a709ca4ef2f984fefdb6..1112bcacd553154addd3cce49621d3214adbb6cb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.utils.AlertManager; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.curator.framework.imps.CuratorFrameworkState; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java index f19bfa2208a8788a764bc53385f456887b196f2f..976aa22b93e3c2718ba4fa5aaa3907af25d5dfac 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.zk; +package org.apache.dolphinscheduler.server.master.zk; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; @@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private MasterRegistry masterRegistry; - public void start(IStoppable stoppable) { + public void start() { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters @@ -83,7 +83,6 @@ public class ZKMasterClient extends AbstractZKClient { // master registry masterRegistry.registry(); - masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable); String registryPath = this.masterRegistry.getMasterPath(); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); @@ -107,6 +106,10 @@ public class ZKMasterClient extends AbstractZKClient { } } + public void setStoppable(IStoppable stoppable) { + masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable); + } + @Override public void close() { masterRegistry.unRegistry(); @@ -139,7 +142,7 @@ public class ZKMasterClient extends AbstractZKClient { * @param failover is failover */ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { - logger.info("{} node deleted : {}", zkNodeType.toString(), path); + logger.info("{} node deleted : {}", zkNodeType, path); InterProcessMutex mutex = null; try { String failoverPath = getFailoverLockPath(zkNodeType); @@ -162,7 +165,7 @@ public class ZKMasterClient extends AbstractZKClient { failoverServerWhenDown(serverHost, zkNodeType); } } catch (Exception e) { - logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("{} server failover failed.", zkNodeType); logger.error("failover exception ", e); } finally { releaseMutex(mutex); @@ -174,9 +177,8 @@ public class ZKMasterClient extends AbstractZKClient { * * @param serverHost server host * @param zkNodeType zookeeper node type - * @throws Exception exception */ - private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) { switch (zkNodeType) { case MASTER: failoverMaster(serverHost); @@ -196,7 +198,6 @@ public class ZKMasterClient extends AbstractZKClient { * @return fail over lock path */ private String getFailoverLockPath(ZKNodeType zkNodeType) { - switch (zkNodeType) { case MASTER: return getMasterFailoverLockPath(); @@ -252,7 +253,7 @@ public class ZKMasterClient extends AbstractZKClient { * @param taskInstance task instance * @return true if task instance need fail over */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { + private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) { boolean taskNeedFailover = true; @@ -304,9 +305,8 @@ public class ZKMasterClient extends AbstractZKClient { * * @param workerHost worker host * @param needCheckWorkerAlive need check worker alive - * @throws Exception exception */ - private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { + private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) { logger.info("start worker[{}] failover ...", workerHost); List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); for (TaskInstance taskInstance : needFailoverTaskInstanceList) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 67e1ee7b0a43ed2cef13d95097c972e7a2bffd5f..caa3db0c8e50e7d80606b56810d2f1df9afbd0e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -55,9 +55,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @ComponentScan.Filter(type = FilterType.REGEX, pattern = { "org.apache.dolphinscheduler.server.master.*", "org.apache.dolphinscheduler.server.monitor.*", - "org.apache.dolphinscheduler.server.log.*", - "org.apache.dolphinscheduler.server.zk.ZKMasterClient", - "org.apache.dolphinscheduler.server.registry.ServerNodeManager" + "org.apache.dolphinscheduler.server.log.*" }) }) @EnableTransactionManagement diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 60c9053b0968590c729a9fa952aa88b213ae22a9..5997722d2a2001625fb37446027cdd38d85d42f3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -38,11 +38,11 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java index ecbfed21b1deb2be529ea8c7edb252c5d0a8078b..d10fd6fe88fff288672140179fb1ee3fd034312b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java @@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index 8c53560804b5d7b464d034cfbc5fba6a59315b82..c512f4ef582896458d9c99b73084b4d32cd0c28c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -30,8 +30,8 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java index 661e329778fbf9b467b9dc400768d2148192e4c1..8c2ac01ba711d67aee247089fa7b967f1ffeec73 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.junit.Assert; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java similarity index 80% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java rename to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java index 4c8e4cffea3cb4318a36bb17eda6074fdf03f525..1b94174ea632e86919b01831662f2ce37e5fbf0b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; +package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; -import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; @@ -43,9 +43,9 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; * server node manager test */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, +@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class, CuratorZookeeperClient.class}) + ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class}) public class ServerNodeManagerTest { @Autowired @@ -57,9 +57,6 @@ public class ServerNodeManagerTest { @Autowired private WorkerRegistry workerRegistry; - @Autowired - private ZookeeperRegistryCenter zookeeperRegistryCenter; - @Autowired private WorkerConfig workerConfig; @@ -67,46 +64,39 @@ public class ServerNodeManagerTest { private MasterConfig masterConfig; @Test - public void testGetMasterNodes(){ + public void testGetMasterNodes() { masterRegistry.registry(); try { //let the serverNodeManager catch the registry event Thread.sleep(2000); } catch (InterruptedException ignore) { + //ignore } Set masterNodes = serverNodeManager.getMasterNodes(); Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); Assert.assertEquals(1, masterNodes.size()); Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); - workerRegistry.unRegistry(); + masterRegistry.unRegistry(); } @Test - public void testGetWorkerGroupNodes(){ + public void testGetWorkerGroupNodes() { workerRegistry.registry(); try { //let the serverNodeManager catch the registry event - Thread.sleep(2000); + Thread.sleep(3000); } catch (InterruptedException ignore) { + //ignore } Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); Assert.assertEquals(1, workerGroupNodes.size()); Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next()); - workerRegistry.unRegistry(); - } - @Test - public void testGetWorkerGroupNodesWithParam(){ - workerRegistry.registry(); - try { - //let the serverNodeManager catch the registry event - Thread.sleep(3000); - } catch (InterruptedException ignore) { - } Set workerNodes = serverNodeManager.getWorkerGroupNodes("default"); Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes)); Assert.assertEquals(1, workerNodes.size()); Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next()); workerRegistry.unRegistry(); } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..3ff6daa60659dd3edd2e636c10c4ec03d9e76f3b --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.zk; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * zookeeper master client test + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class, + ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class, + ZKMasterClient.class, RegisterOperator.class}) +public class ZKMasterClientTest { + + @Autowired + private ZKMasterClient zkMasterClient; + + @Autowired + private ServerNodeManager serverNodeManager; + + @Autowired + private MasterConfig masterConfig; + + @Test + public void testZKMasterClient() { + zkMasterClient.start(); + try { + //let the serverNodeManager catch the registry event + Thread.sleep(2000); + } catch (InterruptedException ignore) { + //ignore + } + Set masterNodes = serverNodeManager.getMasterNodes(); + Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); + Assert.assertEquals(1, masterNodes.size()); + Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); + zkMasterClient.close(); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index f237bca11234c0e632331d38880cd60e03c2d7f8..4429e7cd72d5ed157d9508027f17d588566f88f2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; @@ -81,6 +82,11 @@ public class DependencyConfig { return Mockito.mock(MasterConfig.class); } + @Bean + public WorkerConfig workerConfig() { + return Mockito.mock(WorkerConfig.class); + } + @Bean public UserMapper userMapper() { return Mockito.mock(UserMapper.class); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index ac340e92ed4edafabd679d76d685dcbc314587ba..71af1f8aec0a288e4ecca64b3c768ed4ff06f856 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -35,7 +35,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; diff --git a/pom.xml b/pom.xml index ee15f71f3a2c883ab42cb7516a4378e03e5127a9..fe9baba7f2f89dc2dcf4b9fca656ebb0f29189af 100644 --- a/pom.xml +++ b/pom.xml @@ -922,6 +922,7 @@ **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java **/server/master/dispatch/host/assign/HostWorkerTest.java **/server/master/register/MasterRegistryTest.java + **/server/master/registry/ServerNodeManagerTest.java **/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java @@ -933,7 +934,7 @@ **/server/master/processor/TaskAckProcessorTest.java **/server/master/processor/TaskKillResponseProcessorTest.java **/server/master/processor/queue/TaskResponseServiceTest.java - **/server/register/ServerNodeManagerTest.java + **/server/master/zk/ZKMasterClientTest.java **/server/register/ZookeeperRegistryCenterTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java