未验证 提交 d617f9df 编写于 作者: T Tboy 提交者: GitHub

refactor worker registry (#2107)

上级 5c621009
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
...@@ -26,19 +25,16 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; ...@@ -26,19 +25,16 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -84,12 +80,6 @@ public class MasterServer { ...@@ -84,12 +80,6 @@ public class MasterServer {
@Autowired @Autowired
private MasterConfig masterConfig; private MasterConfig masterConfig;
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/** /**
* spring application context * spring application context
* only use it for initialization * only use it for initialization
...@@ -105,6 +95,7 @@ public class MasterServer { ...@@ -105,6 +95,7 @@ public class MasterServer {
/** /**
* master registry * master registry
*/ */
@Autowired
private MasterRegistry masterRegistry; private MasterRegistry masterRegistry;
/** /**
...@@ -126,7 +117,7 @@ public class MasterServer { ...@@ -126,7 +117,7 @@ public class MasterServer {
//init remoting server //init remoting server
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678); serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
...@@ -134,7 +125,6 @@ public class MasterServer { ...@@ -134,7 +125,6 @@ public class MasterServer {
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// //
this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval());
this.masterRegistry.registry(); this.masterRegistry.registry();
// //
...@@ -166,8 +156,6 @@ public class MasterServer { ...@@ -166,8 +156,6 @@ public class MasterServer {
logger.error("start Quartz failed", e); logger.error("start Quartz failed", e);
} }
TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class);
taskUpdateQueueConsumer.start();
/** /**
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
......
...@@ -46,6 +46,17 @@ public class MasterConfig { ...@@ -46,6 +46,17 @@ public class MasterConfig {
@Value("${master.host.selector:lowerWeight}") @Value("${master.host.selector:lowerWeight}")
private String hostSelector; private String hostSelector;
@Value("${master.listen.port:45678}")
private int listenPort;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public String getHostSelector() { public String getHostSelector() {
return hostSelector; return hostSelector;
} }
......
...@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory; ...@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
...@@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{ ...@@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{
@Autowired @Autowired
private ExecutorDispatcher dispatcher; private ExecutorDispatcher dispatcher;
@PostConstruct
public void init(){
super.setName("TaskUpdateQueueConsumerThread");
super.start();
}
@Override @Override
public void run() { public void run() {
while (Stopper.isRunning()){ while (Stopper.isRunning()){
......
...@@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; ...@@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Date; import java.util.Date;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
...@@ -37,6 +41,7 @@ import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; ...@@ -37,6 +41,7 @@ import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
/** /**
* master registry * master registry
*/ */
@Service
public class MasterRegistry { public class MasterRegistry {
private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);
...@@ -44,38 +49,28 @@ public class MasterRegistry { ...@@ -44,38 +49,28 @@ public class MasterRegistry {
/** /**
* zookeeper registry center * zookeeper registry center
*/ */
private final ZookeeperRegistryCenter zookeeperRegistryCenter; @Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/** /**
* port * master config
*/ */
private final int port; @Autowired
private MasterConfig masterConfig;
/**
* heartbeat interval
*/
private final long heartBeatInterval;
/** /**
* heartbeat executor * heartbeat executor
*/ */
private final ScheduledExecutorService heartBeatExecutor; private ScheduledExecutorService heartBeatExecutor;
/** /**
* worker start time * worker start time
*/ */
private final String startTime; private String startTime;
/**
* construct @PostConstruct
* @param zookeeperRegistryCenter zookeeperRegistryCenter public void init(){
* @param port port
* @param heartBeatInterval heartBeatInterval
*/
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
this.heartBeatInterval = heartBeatInterval;
this.startTime = DateUtils.dateToString(new Date()); this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
} }
...@@ -100,8 +95,9 @@ public class MasterRegistry { ...@@ -100,8 +95,9 @@ public class MasterRegistry {
} }
} }
}); });
this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
} }
/** /**
...@@ -129,7 +125,7 @@ public class MasterRegistry { ...@@ -129,7 +125,7 @@ public class MasterRegistry {
* @return * @return
*/ */
private String getLocalAddress(){ private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port; return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort();
} }
/** /**
......
...@@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker; ...@@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
...@@ -37,8 +35,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder; ...@@ -37,8 +35,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/** /**
* worker server * worker server
...@@ -51,18 +47,6 @@ public class WorkerServer { ...@@ -51,18 +47,6 @@ public class WorkerServer {
*/ */
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/** /**
* netty remote server * netty remote server
*/ */
...@@ -71,8 +55,15 @@ public class WorkerServer { ...@@ -71,8 +55,15 @@ public class WorkerServer {
/** /**
* worker registry * worker registry
*/ */
@Autowired
private WorkerRegistry workerRegistry; private WorkerRegistry workerRegistry;
/**
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
/** /**
* spring application context * spring application context
* only use it for initialization * only use it for initialization
...@@ -87,6 +78,7 @@ public class WorkerServer { ...@@ -87,6 +78,7 @@ public class WorkerServer {
* @param args arguments * @param args arguments
*/ */
public static void main(String[] args) { public static void main(String[] args) {
System.setProperty("spring.profiles.active","worker");
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
} }
...@@ -101,12 +93,13 @@ public class WorkerServer { ...@@ -101,12 +93,13 @@ public class WorkerServer {
//init remoting server //init remoting server
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); //
this.workerRegistry.registry(); this.workerRegistry.registry();
/** /**
......
...@@ -40,6 +40,17 @@ public class WorkerConfig { ...@@ -40,6 +40,17 @@ public class WorkerConfig {
@Value("${worker.group: default}") @Value("${worker.group: default}")
private String workerGroup; private String workerGroup;
@Value("${worker.listen.port: 12345}")
private int listenPort;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public String getWorkerGroup() { public String getWorkerGroup() {
return workerGroup; return workerGroup;
} }
......
...@@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; ...@@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Date; import java.util.Date;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
...@@ -41,6 +45,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLASH; ...@@ -41,6 +45,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLASH;
/** /**
* worker registry * worker registry
*/ */
@Service
public class WorkerRegistry { public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
...@@ -48,54 +53,31 @@ public class WorkerRegistry { ...@@ -48,54 +53,31 @@ public class WorkerRegistry {
/** /**
* zookeeper registry center * zookeeper registry center
*/ */
private final ZookeeperRegistryCenter zookeeperRegistryCenter; @Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/** /**
* port * worker config
*/ */
private final int port; @Autowired
private WorkerConfig workerConfig;
/**
* heartbeat interval
*/
private final long heartBeatInterval;
/** /**
* heartbeat executor * heartbeat executor
*/ */
private final ScheduledExecutorService heartBeatExecutor; private ScheduledExecutorService heartBeatExecutor;
/** /**
* worker start time * worker start time
*/ */
private final String startTime; private String startTime;
/**
* worker group
*/
private String workerGroup;
/** private String workerGroup;
* construct
*
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
* @param heartBeatInterval heartBeatInterval
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
}
/** @PostConstruct
* construct public void init(){
* @param zookeeperRegistryCenter zookeeperRegistryCenter this.workerGroup = workerConfig.getWorkerGroup();
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
this.heartBeatInterval = heartBeatInterval;
this.workerGroup = workerGroup;
this.startTime = DateUtils.dateToString(new Date()); this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
} }
...@@ -120,8 +102,9 @@ public class WorkerRegistry { ...@@ -120,8 +102,9 @@ public class WorkerRegistry {
} }
} }
}); });
this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval);
} }
...@@ -159,7 +142,7 @@ public class WorkerRegistry { ...@@ -159,7 +142,7 @@ public class WorkerRegistry {
* @return * @return
*/ */
private String getLocalAddress(){ private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port; return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort();
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册