未验证 提交 e720ca54 编写于 作者: T Tboy 提交者: GitHub

Refactor worker (#2312)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test

* move mybatis config from application.properties to SpringConnectionFactory

* move mybatis-plus config from application.properties to SpringConnectionFactory

* refactor TaskCallbackService

* add ZookeeperNodeManagerTest

* add NettyExecutorManagerTest

* refactor TaskKillProcessor

* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest

* add RoundRobinHostManagerTest, ExecutorDispatcherTest
上级 03e29b93
......@@ -76,7 +76,7 @@ public abstract class CommonHostManager implements HostManager {
return select(candidateHosts);
}
public abstract Host select(Collection<Host> nodes);
protected abstract Host select(Collection<Host> nodes);
public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
this.zookeeperNodeManager = zookeeperNodeManager;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* executor dispatch test
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, WorkerRegistry.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
public class ExecutorDispatcherTest {
@Autowired
private ExecutorDispatcher executorDispatcher;
@Autowired
private WorkerRegistry workerRegistry;
@Autowired
private WorkerConfig workerConfig;
@Test(expected = ExecuteException.class)
public void testDispatchWithException() throws ExecuteException {
ExecutionContext executionContext = ExecutionContextTestUtils.getExecutionContext(10000);
executorDispatcher.dispatch(executionContext);
}
@Test
public void testDispatch() throws ExecuteException {
int port = 30000;
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(port);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, Mockito.mock(TaskExecuteProcessor.class));
nettyRemotingServer.start();
//
workerConfig.setListenPort(port);
workerRegistry.registry();
ExecutionContext executionContext = ExecutionContextTestUtils.getExecutionContext(port);
executorDispatcher.dispatch(executionContext);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
/**
* round robin host manager test
*/
@RunWith(SpringRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
public class RoundRobinHostManagerTest {
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
@Autowired
private WorkerRegistry workerRegistry;
@Autowired
private WorkerConfig workerConfig;
@Test
public void testSelectWithEmptyResult(){
RoundRobinHostManager roundRobinHostManager = new RoundRobinHostManager();
roundRobinHostManager.setZookeeperNodeManager(zookeeperNodeManager);
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host emptyHost = roundRobinHostManager.select(context);
Assert.assertTrue(StringUtils.isEmpty(emptyHost.getAddress()));
}
@Test
public void testSelectWithResult(){
workerRegistry.registry();
RoundRobinHostManager roundRobinHostManager = new RoundRobinHostManager();
roundRobinHostManager.setZookeeperNodeManager(zookeeperNodeManager);
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context);
Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress()));
Assert.assertTrue(host.getAddress().equalsIgnoreCase(OSUtils.getHost() + ":" + workerConfig.getListenPort()));
}
}
......@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.junit.Test;
import java.util.ArrayList;
......
......@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -20,6 +20,9 @@ package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
import org.apache.dolphinscheduler.server.master.manager.TaskManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito;
......@@ -131,4 +134,14 @@ public class DependencyConfig {
public TaskCallbackService taskCallbackService(){
return Mockito.mock(TaskCallbackService.class);
}
@Bean
public HostManager hostManager(){
return new RandomHostManager();
}
@Bean
public TaskManager taskManager(){
return Mockito.mock(TaskManager.class);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.mockito.Mockito;
/**
* for test use only
*/
public class ExecutionContextTestUtils {
public static ExecutionContext getExecutionContext(int port){
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":" + port));
return executionContext;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册