未验证 提交 6bf98146 编写于 作者: T Tboy 提交者: GitHub

Merge pull request #1981 from qiaozhanwei/refactor-worker

master/worker basic communication
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * * @param opaque request unique identification * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
\ No newline at end of file
......
......@@ -59,14 +59,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
*/
final CommandType commandType = command.getType();
switch (commandType){
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
......
......@@ -16,10 +16,18 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -75,6 +83,12 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
private MasterConfig masterConfig;
/**
* netty remoting client
*/
private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
......@@ -105,6 +119,38 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = true;
}
// TODO send task to worker
public void sendToWorker(String taskInstanceJson){
final Address address = new Address("127.0.0.1", 12346);
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(taskInstanceJson);
try {
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
logger.info("receive command : {}", responseCommand);
final CommandType commandType = responseCommand.getType();
switch (commandType){
case EXECUTE_TASK_ACK:
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand);
break;
case EXECUTE_TASK_RESPONSE:
ExecuteTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskResponseCommand.class);
logger.info("taskResponseCommand : {}",taskResponseCommand);
break;
default:
throw new IllegalArgumentException("unknown commandType");
}
logger.info("response result : {}",responseCommand);
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}
}
/**
* submit master base task exec thread
* @return TaskInstance
......@@ -128,7 +174,8 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
if(submitDB && !submitQueue){
// submit task to queue
submitQueue = processService.submitTaskToQueue(task);
sendToWorker(JSONObject.toJSONString(task));
submitQueue = true;
}
if(submitDB && submitQueue){
return task;
......
......@@ -160,20 +160,7 @@ public class MasterExecThread implements Runnable {
this.nettyRemotingClient = nettyRemotingClient;
}
//TODO
/**端口,默认是123456
* 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。
*/
private void sendToWorker(){
final Address address = new Address("localhost", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
//结果可能为空,所以不用管,能发过去,就行。
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}
}
@Override
......
......@@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -169,12 +168,12 @@ public class WorkerServer implements IStoppable {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService));
this.nettyRemotingServer.start();
//worker registry
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
this.workerRegistry.registry();
// TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry
// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
// this.workerRegistry.registry();
this.zkWorkerClient.init();
......
......@@ -19,11 +19,19 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
/**
* callback channel
*/
public class CallbackChannel {
/**
* channel
*/
private Channel channel;
/**
* equest unique identification
*/
private long opaque;
public CallbackChannel(Channel channel, long opaque) {
......
......@@ -26,14 +26,30 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import java.util.concurrent.ConcurrentHashMap;
public class TaskInstanceCallbackService {
/**
* taks callback service
*/
public class TaskCallbackService {
/**
* callback channels
*/
private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
/**
* add callback channel
* @param taskInstanceId taskInstanceId
* @param channel channel
*/
public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){
CALL_BACK_CHANNELS.put(taskInstanceId, channel);
}
/**
* get callback channel
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
public CallbackChannel getCallbackChannel(int taskInstanceId){
CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId);
if(callbackChannel.getChannel().isActive()){
......@@ -45,18 +61,34 @@ public class TaskInstanceCallbackService {
return callbackChannel;
}
/**
* remove callback channels
* @param taskInstanceId taskInstanceId
*/
public void remove(int taskInstanceId){
CALL_BACK_CHANNELS.remove(taskInstanceId);
}
/**
* send ack
* @param taskInstanceId taskInstanceId
* @param ackCommand ackCommand
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
}
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param responseCommand responseCommand
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
......@@ -68,7 +100,7 @@ public class TaskInstanceCallbackService {
});
}
//TODO
// TODO
private Channel createChannel(){
return null;
}
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
......@@ -28,6 +29,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......@@ -40,32 +43,56 @@ import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService;
/**
* worker request processor
*/
public class WorkerRequestProcessor implements NettyRequestProcessor {
public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class);
private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class);
/**
* process service
*/
private final ProcessService processService;
/**
* thread executor service
*/
private final ExecutorService workerExecService;
/**
* worker config
*/
private final WorkerConfig workerConfig;
private final TaskInstanceCallbackService taskInstanceCallbackService;
/**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
public WorkerNettyRequestProcessor(ProcessService processService){
public WorkerRequestProcessor(ProcessService processService){
this.processService = processService;
this.taskInstanceCallbackService = new TaskInstanceCallbackService();
this.taskCallbackService = new TaskCallbackService();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command);
TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
//TODO 需要干掉,然后移到master里面。
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
String taskInstanceJson = taskRequestCommand.getTaskInstanceJson();
TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class);
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
//TODO this logic need add to master
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
......@@ -77,7 +104,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
//TODO 到这里。
//TODO end
// local execute path
String execLocalPath = getExecLocalPath(taskInstance);
logger.info("task instance local execute path : {} ", execLocalPath);
......@@ -88,11 +116,21 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addCallbackChannel(taskInstance.getId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque()));
workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService));
workerExecService.submit(new TaskScheduleThread(taskInstance,
processService, taskCallbackService));
}
/**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}",
......@@ -103,6 +141,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
return false;
}
/**
* get execute local path
* @param taskInstance taskInstance
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
......
......@@ -24,20 +24,36 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* worker registry
*/
public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
/**
* zookeeper registry center
*/
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* port
*/
private final int port;
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
}
/**
* registry
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
......@@ -58,6 +74,9 @@ public class WorkerRegistry {
logger.info("scheduler node : {} registry to ZK successfully.", address);
}
/**
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
......@@ -65,12 +84,20 @@ public class WorkerRegistry {
logger.info("worker node : {} unRegistry to ZK.", address);
}
/**
* get worker path
* @return
*/
private String getWorkerPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath;
}
/**
* get local address
* @return
*/
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
......
......@@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{
logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
// workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
// remove node from zk
removeNodeFromTaskQueue(taskQueueStr);
......
......@@ -35,10 +35,9 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -80,7 +79,7 @@ public class TaskScheduleThread implements Runnable {
/**
* task instance callback service
*/
private TaskInstanceCallbackService taskInstanceCallbackService;
private TaskCallbackService taskInstanceCallbackService;
/**
* constructor
......@@ -88,7 +87,7 @@ public class TaskScheduleThread implements Runnable {
* @param taskInstance task instance
* @param processService process dao
*/
public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){
public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
this.processService = processService;
this.taskInstance = taskInstance;
this.taskInstanceCallbackService = taskInstanceCallbackService;
......@@ -97,6 +96,9 @@ public class TaskScheduleThread implements Runnable {
@Override
public void run() {
// TODO Need to be removed and kept temporarily update task instance state
updateTaskState(taskInstance.getTaskType());
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
try {
......@@ -169,6 +171,12 @@ public class TaskScheduleThread implements Runnable {
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
//TODO Need to be removed and kept temporarily update task instance state
processService.changeTaskState(ExecutionStatus.FAILURE,
new Date(),
taskInstance.getId());
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
......@@ -176,6 +184,14 @@ public class TaskScheduleThread implements Runnable {
taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
}
logger.info("task instance id : {},task final status : {}",
taskInstance.getId(),
task.getExitStatus());
// update task instance state
processService.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
}
/**
......@@ -342,4 +358,28 @@ public class TaskScheduleThread implements Runnable {
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
}
/**
* update task state according to task type
* @param taskType
*/
private void updateTaskState(String taskType) {
// update task status is running
if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
getTaskLogPath(),
taskInstance.getId());
}else{
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
getTaskLogPath(),
taskInstance.getId());
}
}
}
\ No newline at end of file
......@@ -91,7 +91,7 @@ public class MasterExecThreadTest {
processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService));
masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null));
// prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册