未验证 提交 e39409a4 编写于 作者: S Shiwen Cheng 提交者: GitHub

[Improvement-5275][Server] Fix wrong excludeFilters and move zk/ZKMasterClient...

[Improvement-5275][Server] Fix wrong excludeFilters and move zk/ZKMasterClient and registry/ServerNodeManager into master (#5278)

* [Improvement-5275][Server] Move zk/ZKMasterClient and registry/ServerNodeManager into master

* [Improvement-5275][Server] Fix code smells

* [Improvement-5275][Server] Add ZKMasterClientTest
上级 f3dfaacc
......@@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
......@@ -117,7 +117,8 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.start();
// self tolerant
this.zkMasterClient.start(this);
this.zkMasterClient.start();
this.zkMasterClient.setStoppable(this);
// scheduler start
this.masterSchedulerService.start();
......
......@@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
......
......@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import java.util.ArrayList;
import java.util.Collection;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
......@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
......
......@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.curator.framework.imps.CuratorFrameworkState;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.zk;
package org.apache.dolphinscheduler.server.master.zk;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
......@@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired
private MasterRegistry masterRegistry;
public void start(IStoppable stoppable) {
public void start() {
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
......@@ -83,7 +83,6 @@ public class ZKMasterClient extends AbstractZKClient {
// master registry
masterRegistry.registry();
masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable);
String registryPath = this.masterRegistry.getMasterPath();
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
......@@ -107,6 +106,10 @@ public class ZKMasterClient extends AbstractZKClient {
}
}
public void setStoppable(IStoppable stoppable) {
masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable);
}
@Override
public void close() {
masterRegistry.unRegistry();
......@@ -139,7 +142,7 @@ public class ZKMasterClient extends AbstractZKClient {
* @param failover is failover
*/
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
logger.info("{} node deleted : {}", zkNodeType.toString(), path);
logger.info("{} node deleted : {}", zkNodeType, path);
InterProcessMutex mutex = null;
try {
String failoverPath = getFailoverLockPath(zkNodeType);
......@@ -162,7 +165,7 @@ public class ZKMasterClient extends AbstractZKClient {
failoverServerWhenDown(serverHost, zkNodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed.", zkNodeType.toString());
logger.error("{} server failover failed.", zkNodeType);
logger.error("failover exception ", e);
} finally {
releaseMutex(mutex);
......@@ -174,9 +177,8 @@ public class ZKMasterClient extends AbstractZKClient {
*
* @param serverHost server host
* @param zkNodeType zookeeper node type
* @throws Exception exception
*/
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) {
switch (zkNodeType) {
case MASTER:
failoverMaster(serverHost);
......@@ -196,7 +198,6 @@ public class ZKMasterClient extends AbstractZKClient {
* @return fail over lock path
*/
private String getFailoverLockPath(ZKNodeType zkNodeType) {
switch (zkNodeType) {
case MASTER:
return getMasterFailoverLockPath();
......@@ -252,7 +253,7 @@ public class ZKMasterClient extends AbstractZKClient {
* @param taskInstance task instance
* @return true if task instance need fail over
*/
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
boolean taskNeedFailover = true;
......@@ -304,9 +305,8 @@ public class ZKMasterClient extends AbstractZKClient {
*
* @param workerHost worker host
* @param needCheckWorkerAlive need check worker alive
* @throws Exception exception
*/
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) {
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
......
......@@ -55,9 +55,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.server.master.*",
"org.apache.dolphinscheduler.server.monitor.*",
"org.apache.dolphinscheduler.server.log.*",
"org.apache.dolphinscheduler.server.zk.ZKMasterClient",
"org.apache.dolphinscheduler.server.registry.ServerNodeManager"
"org.apache.dolphinscheduler.server.log.*"
})
})
@EnableTransactionManagement
......
......@@ -38,11 +38,11 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
......
......@@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......
......@@ -30,8 +30,8 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
......
......@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.junit.Assert;
......
......@@ -15,17 +15,17 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
......@@ -43,9 +43,9 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
* server node manager test
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class,
@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class, CuratorZookeeperClient.class})
ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class})
public class ServerNodeManagerTest {
@Autowired
......@@ -57,9 +57,6 @@ public class ServerNodeManagerTest {
@Autowired
private WorkerRegistry workerRegistry;
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Autowired
private WorkerConfig workerConfig;
......@@ -67,46 +64,39 @@ public class ServerNodeManagerTest {
private MasterConfig masterConfig;
@Test
public void testGetMasterNodes(){
public void testGetMasterNodes() {
masterRegistry.registry();
try {
//let the serverNodeManager catch the registry event
Thread.sleep(2000);
} catch (InterruptedException ignore) {
//ignore
}
Set<String> masterNodes = serverNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
workerRegistry.unRegistry();
masterRegistry.unRegistry();
}
@Test
public void testGetWorkerGroupNodes(){
public void testGetWorkerGroupNodes() {
workerRegistry.registry();
try {
//let the serverNodeManager catch the registry event
Thread.sleep(2000);
Thread.sleep(3000);
} catch (InterruptedException ignore) {
//ignore
}
Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes();
Assert.assertEquals(1, workerGroupNodes.size());
Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next());
workerRegistry.unRegistry();
}
@Test
public void testGetWorkerGroupNodesWithParam(){
workerRegistry.registry();
try {
//let the serverNodeManager catch the registry event
Thread.sleep(3000);
} catch (InterruptedException ignore) {
}
Set<String> workerNodes = serverNodeManager.getWorkerGroupNodes("default");
Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes));
Assert.assertEquals(1, workerNodes.size());
Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next());
workerRegistry.unRegistry();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.zk;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* zookeeper master client test
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class,
ZKMasterClient.class, RegisterOperator.class})
public class ZKMasterClientTest {
@Autowired
private ZKMasterClient zkMasterClient;
@Autowired
private ServerNodeManager serverNodeManager;
@Autowired
private MasterConfig masterConfig;
@Test
public void testZKMasterClient() {
zkMasterClient.start();
try {
//let the serverNodeManager catch the registry event
Thread.sleep(2000);
} catch (InterruptedException ignore) {
//ignore
}
Set<String> masterNodes = serverNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
zkMasterClient.close();
}
}
......@@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
......@@ -81,6 +82,11 @@ public class DependencyConfig {
return Mockito.mock(MasterConfig.class);
}
@Bean
public WorkerConfig workerConfig() {
return Mockito.mock(WorkerConfig.class);
}
@Bean
public UserMapper userMapper() {
return Mockito.mock(UserMapper.class);
......
......@@ -35,7 +35,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......
......@@ -922,6 +922,7 @@
<include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
<include>**/server/master/dispatch/host/assign/HostWorkerTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/registry/ServerNodeManagerTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
......@@ -933,7 +934,7 @@
<include>**/server/master/processor/TaskAckProcessorTest.java</include>
<include>**/server/master/processor/TaskKillResponseProcessorTest.java</include>
<include>**/server/master/processor/queue/TaskResponseServiceTest.java</include>
<include>**/server/register/ServerNodeManagerTest.java</include>
<include>**/server/master/zk/ZKMasterClientTest.java</include>
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册