未验证 提交 e4679109 编写于 作者: T Tboy 提交者: GitHub

Refactor worker (#1997)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>

* updates
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>

* updates
Co-authored-by: Nqiaozhanwei <qiaozhanwei@outlook.com>
上级 1b953149
......@@ -34,6 +34,6 @@ public class Stopper {
}
public static final void stop(){
signal.getAndSet(true);
signal.set(true);
}
}
......@@ -25,6 +25,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
......@@ -33,7 +34,8 @@ import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.slf4j.Logger;
......@@ -64,7 +66,7 @@ public class NettyRemotingClient {
/**
* channels
*/
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(128);
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
/**
* started flag
......@@ -158,17 +160,17 @@ public class NettyRemotingClient {
/**
* async send
* @param address address
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/
public void sendAsync(final Address address, final Command command,
public void sendAsync(final Host host, final Command command,
final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException("network error");
}
......@@ -214,7 +216,7 @@ public class NettyRemotingClient {
});
} catch (Throwable ex){
responseFuture.release();
throw new RemotingException(String.format("send command to address: %s failed", address), ex);
throw new RemotingException(String.format("send command to host: %s failed", host), ex);
}
} else{
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
......@@ -225,17 +227,17 @@ public class NettyRemotingClient {
/**
* sync send
* @param address address
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
* @throws RemotingException
*/
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", address));
throw new RemotingException(String.format("connect to : %s fail", host));
}
final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
......@@ -250,7 +252,7 @@ public class NettyRemotingClient {
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to address {} failed", command, address);
logger.error("send command {} to host {} failed", command, host);
}
});
/**
......@@ -259,49 +261,89 @@ public class NettyRemotingClient {
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else{
throw new RemotingException(address.toString(), responseFuture.getCause());
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
return result;
}
public void send(final Host host, final Command command) throws RemotingException {
Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
try {
ChannelFuture future = channel.writeAndFlush(command).await();
if (future.isSuccess()) {
logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
} else {
String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
logger.error(msg, future.cause());
throw new RemotingException(msg);
}
} catch (Exception e) {
logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
}
}
/**
* register processor
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
/**
* get channel
* @param address
* @param host
* @return
*/
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
public Channel getChannel(Host host) {
Channel channel = channels.get(host);
if(channel != null && channel.isActive()){
return channel;
}
return createChannel(address, true);
return createChannel(host, true);
}
/**
* create channel
* @param address address
* @param host host
* @param isSync sync flag
* @return channel
*/
public Channel createChannel(Address address, boolean isSync) {
public Channel createChannel(Host host, boolean isSync) {
ChannelFuture future;
try {
synchronized (bootstrap){
future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort()));
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
}
if(isSync){
future.sync();
}
if (future.isSuccess()) {
Channel channel = future.channel();
channels.put(address, channel);
channels.put(host, channel);
return channel;
}
} catch (Exception ex) {
logger.info("connect to {} error {}", address, ex);
logger.info("connect to {} error {}", host, ex);
}
return null;
}
......@@ -341,10 +383,10 @@ public class NettyRemotingClient {
/**
* close channel
* @param address address
* @param host host
*/
public void closeChannel(Address address){
Channel channel = this.channels.remove(address);
public void closeChannel(Host host){
Channel channel = this.channels.remove(host);
if(channel != null){
channel.close();
}
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInfoJson; public String getTaskInfoJson() { return taskInfoJson; } public void setTaskInfoJson(String taskInfoJson) { this.taskInfoJson = taskInfoJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInfoJson) { this.taskInfoJson = taskInfoJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInfoJson='" + taskInfoJson + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * * @param opaque request unique identification * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
\ No newline at end of file
......
......@@ -26,9 +26,9 @@ import java.util.Date;
public class TaskExecutionContext implements Serializable{
/**
* task instance id
* task id
*/
private Integer taskId;
private Integer taskInstanceId;
/**
......@@ -107,12 +107,13 @@ public class TaskExecutionContext implements Serializable{
*/
private Integer projectId;
public Integer getTaskId() {
return taskId;
public Integer getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
public void setTaskInstanceId(Integer taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getTaskName() {
......@@ -230,7 +231,7 @@ public class TaskExecutionContext implements Serializable{
@Override
public String toString() {
return "TaskExecutionContext{" +
"taskId=" + taskId +
"taskInstanceId=" + taskInstanceId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
......
......@@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
/**
* netty client request handler
......@@ -44,9 +51,20 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
private final ExecutorService callbackExecutor;
/**
* processors
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
/**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
this.processors = new ConcurrentHashMap();
}
/**
......@@ -71,18 +89,43 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived((Command)msg);
processReceived(ctx.channel(), (Command)msg);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
executorRef = defaultExecutor;
}
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}
/**
* process received logic
*
* @param responseCommand responseCommand
* @param command command
*/
private void processReceived(final Command responseCommand) {
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
private void processReceived(final Channel channel, final Command command) {
ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
if(future != null){
future.setResponseCommand(responseCommand);
future.setResponseCommand(command);
future.release();
if(future.getInvokeCallback() != null){
this.callbackExecutor.submit(new Runnable() {
......@@ -92,10 +135,30 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
}
});
} else{
future.putResponse(responseCommand);
future.putResponse(command);
}
} else{
logger.warn("receive response {}, but not matched any request ", responseCommand);
processByCommandType(channel, command);
}
}
public void processByCommandType(final Channel channel, final Command command) {
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(command.getType());
if (pair != null) {
Runnable run = () -> {
try {
pair.getLeft().process(channel, command);
} catch (Throwable e) {
logger.error(String.format("process command %s exception", command), e);
}
};
try {
pair.getRight().submit(run);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard command {} from {}", command, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("receive response {}, but not matched any request ", command);
}
}
......@@ -112,30 +175,4 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().close();
}
/**
* channel write changed
*
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
if (!ch.isWritable()) {
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}",
new Object[]{ch, config.getWriteBufferHighWaterMark()});
}
config.setAutoRead(false);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}",
new Object[]{ch, config.getWriteBufferLowWaterMark()});
}
config.setAutoRead(true);
}
}
}
\ No newline at end of file
......@@ -98,7 +98,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if(executorRef == null){
executorRef = nettyRemotingServer.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
}
/**
......
......@@ -49,9 +49,9 @@ public class ChannelUtils {
* @param channel channel
* @return address
*/
public static Address toAddress(Channel channel){
public static Host toAddress(Channel channel){
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
}
}
......@@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
package org.apache.dolphinscheduler.server.master.host;
import java.io.Serializable;
import java.util.Objects;
public class Host {
/**
* server address
*/
public class Host implements Serializable {
private String address;
......@@ -67,7 +68,7 @@ public class Host {
public static Host of(String address){
String[] parts = address.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException(String.format("Address : %s illegal.", address));
throw new IllegalArgumentException(String.format("Host : %s illegal.", address));
}
Host host = new Host(parts[0], Integer.parseInt(parts[1]));
return host;
......
......@@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
......@@ -62,7 +62,7 @@ public class NettyRemotingClientTest {
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
Command commandPing = Ping.create();
try {
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
Command response = client.sendSync(new Host("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
} catch (Exception e) {
e.printStackTrace();
......@@ -93,7 +93,7 @@ public class NettyRemotingClientTest {
Command commandPing = Ping.create();
try {
final AtomicLong opaque = new AtomicLong(0);
client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
client.sendAsync(new Host("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
opaque.set(responseFuture.getOpaque());
......
......@@ -40,7 +40,7 @@ public class TaskExecutionContextBuilder {
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
taskExecutionContext.setTaskId(taskInstance.getId());
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
......
......@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
......@@ -106,7 +107,6 @@ public class MasterServer implements IStoppable {
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
}
/**
......@@ -121,6 +121,7 @@ public class MasterServer implements IStoppable {
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor(processService));
this.nettyRemotingServer.start();
//
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class ExecutorDispatcher implements InitializingBean {
@Autowired
private NettyExecutorManager nettyExecutorManager;
@Autowired
private RoundRobinHostManager hostManager;
private final ConcurrentHashMap<ExecutorType, ExecutorManager> executorManagers;
public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>();
}
public void dispatch(final ExecutionContext executeContext) throws ExecuteException {
ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType());
if(executorManager == null){
throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType());
}
Host host = hostManager.select(executeContext);
if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext()));
}
executeContext.setHost(host);
executorManager.beforeExecute(executeContext);
try {
executorManager.execute(executeContext);
} finally {
executorManager.afterExecute(executeContext);
}
}
@Override
public void afterPropertiesSet() throws Exception {
register(ExecutorType.WORKER, nettyExecutorManager);
register(ExecutorType.CLIENT, nettyExecutorManager);
}
public void register(ExecutorType type, ExecutorManager executorManager){
executorManagers.put(type, executorManager);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
public class ExecutionContext {
private Host host;
private final Object context;
private final ExecutorType executorType;
public ExecutionContext(Object context, ExecutorType executorType) {
this.context = context;
this.executorType = executorType;
}
public ExecutorType getExecutorType() {
return executorType;
}
public Object getContext() {
return context;
}
public Host getHost() {
return host;
}
public void setHost(Host host) {
this.host = host;
}
}
......@@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.enums;
package org.apache.dolphinscheduler.server.master.host;
public enum ExecutorType {
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
public interface HostManager {
Host select(TaskExecutionContext context);
WORKER,
CLIENT;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
public class ExecuteException extends Exception{
public ExecuteException() {
super();
}
/**
* Constructs a new exception with the specified detail message. The
* cause is not initialized, and may subsequently be initialized by
* a call to {@link #initCause}.
*
* @param message the detail message. The detail message is saved for
* later retrieval by the {@link #getMessage()} method.
*/
public ExecuteException(String message) {
super(message);
}
/**
* Constructs a new exception with the specified detail message and
* cause. <p>Note that the detail message associated with
* {@code cause} is <i>not</i> automatically incorporated in
* this exception's detail message.
*
* @param message the detail message (which is saved for later retrieval
* by the {@link #getMessage()} method).
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A <tt>null</tt> value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
* @since 1.4
*/
public ExecuteException(String message, Throwable cause) {
super(message, cause);
}
/**
* Constructs a new exception with the specified cause and a detail
* message of <tt>(cause==null ? null : cause.toString())</tt> (which
* typically contains the class and detail message of <tt>cause</tt>).
* This constructor is useful for exceptions that are little more than
* wrappers for other throwables (for example, {@link
* java.security.PrivilegedActionException}).
*
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A <tt>null</tt> value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
* @since 1.4
*/
public ExecuteException(Throwable cause) {
super(cause);
}
/**
* Constructs a new exception with the specified detail message,
* cause, suppression enabled or disabled, and writable stack
* trace enabled or disabled.
*
* @param message the detail message.
* @param cause the cause. (A {@code null} value is permitted,
* and indicates that the cause is nonexistent or unknown.)
* @param enableSuppression whether or not suppression is enabled
* or disabled
* @param writableStackTrace whether or not the stack trace should
* be writable
* @since 1.7
*/
protected ExecuteException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
public abstract class AbstractExecutorManager implements ExecutorManager{
@Override
public void beforeExecute(ExecutionContext executeContext) throws ExecuteException {
//TODO add time monitor
}
@Override
public void afterExecute(ExecutionContext executeContext) throws ExecuteException {
//TODO add dispatch monitor
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
public interface ExecutorManager {
void beforeExecute(ExecutionContext executeContext) throws ExecuteException;
void execute(ExecutionContext executeContext) throws ExecuteException;
void afterExecute(ExecutionContext executeContext) throws ExecuteException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@Service
public class NettyExecutorManager extends AbstractExecutorManager{
private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
private final NettyRemotingClient nettyRemotingClient;
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
@Override
public void execute(ExecutionContext executeContext) throws ExecuteException {
Set<String> allNodes = getAllNodes(executeContext);
Set<String> failNodeSet = new HashSet<>();
//
Command command = buildCommand(executeContext);
Host host = executeContext.getHost();
boolean success = false;
//
while (!success) {
try {
doExecute(host, command);
success = true;
executeContext.setHost(host);
} catch (ExecuteException ex) {
logger.error(String.format("execute context : %s error", executeContext.getContext()), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute context : {} host : {}", executeContext.getContext(), host);
} else {
throw new ExecuteException("fail after try all nodes");
}
} catch (Throwable t) {
throw new ExecuteException("fail after try all nodes");
}
}
}
}
private Command buildCommand(ExecutionContext context) {
ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
TaskExecutionContext taskExecutionContext = (TaskExecutionContext)context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executor type : " + executorType);
}
return requestCommand.convert2Command();
}
private void doExecute(final Host host, final Command command) throws ExecuteException {
int retryCount = 3;
boolean success = false;
do {
try {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {}
}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
}
}
private Set<String> getAllNodes(ExecutionContext context){
Set<String> nodes = Collections.EMPTY_SET;
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerNodes();
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executor type : " + executorType);
}
return nodes;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
public interface HostManager {
Host select(ExecutionContext context);
}
......@@ -15,11 +15,14 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,21 +39,36 @@ public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
@Autowired
private RoundRobinSelector<Host> selector;
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
private final Selector<Host> selector;
public RoundRobinHostManager(){
this.selector = new RoundRobinSelector<>();
}
@Override
public Host select(TaskExecutionContext context){
public Host select(ExecutionContext context){
Host host = new Host();
Collection<String> nodes = zookeeperNodeManager.getWorkerNodes();
Collection<String> nodes = null;
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerNodes();
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executorType : " + executorType);
}
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
return selector.select(candidateHosts);
}
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Random;
......
......@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class RoundRobinSelector<T> implements Selector<T> {
private final AtomicInteger index = new AtomicInteger(0);
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
......
......@@ -14,83 +14,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import java.io.Serializable;
package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* server address
* task ack processor
*/
public class Address implements Serializable {
public class TaskAckProcessor implements NettyRequestProcessor {
/**
* host
*/
private String host;
private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class);
/**
* port
* process service
*/
private int port;
public Address(){
//NOP
}
public Address(String host, int port){
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
private final ProcessService processService;
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
public TaskAckProcessor(ProcessService processService){
this.processService = processService;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand);
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
taskAckCommand.getHost(),
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Address other = (Address) obj;
if (host == null) {
if (other.host != null) {
return false;
}
} else if (!host.equals(other.host)) {
return false;
}
return port == other.port;
}
@Override
public String toString() {
return "Address [host=" + host + ", port=" + port + "]";
}
}
......@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.future.TaskFuture;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -59,7 +58,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
logger.info("received command : {}", command);
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
TaskFuture.notify(command);
}
......
......@@ -32,16 +32,23 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Callable;
......@@ -92,9 +99,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* netty remoting client
* executor dispatcher
*/
private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
private ExecutorDispatcher dispatcher;
/**
* constructor of MasterBaseTaskExecThread
......@@ -102,13 +109,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param processInstance process instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
this.processService = BeanContext.getBean(ProcessService.class);
this.alertDao = BeanContext.getBean(AlertDao.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
}
/**
......@@ -126,30 +134,17 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = true;
}
// TODO send task to worker
public void sendToWorker(TaskInstance taskInstance){
final Address address = new Address("127.0.0.1", 12346);
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
/**
* dispatch task to worker
* @param taskInstance
*/
public void dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
try {
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), 2000);
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand);
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
taskAckCommand.getHost(),
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskInstance.getId());
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
}
}
......@@ -239,7 +234,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
if(submitDB && !submitQueue){
// submit task to queue
sendToWorker(task);
dispatch(task);
submitQueue = true;
}
if(submitDB && submitQueue){
......
......@@ -33,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
......
......@@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Service
public abstract class ZookeeperNodeManager implements InitializingBean {
public class ZookeeperNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
......
......@@ -47,7 +47,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
init();
}
public void init() {
......
......@@ -76,7 +76,7 @@ public class TaskCallbackService {
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
}
/**
......@@ -87,8 +87,7 @@ public class TaskCallbackService {
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
......
......@@ -79,9 +79,9 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
......@@ -92,7 +92,7 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
......@@ -110,6 +110,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskId());
taskExecutionContext.getTaskInstanceId());
}
}
......@@ -93,12 +93,12 @@ public class TaskScheduleThread implements Runnable {
@Override
public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand);
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node
......@@ -118,7 +118,7 @@ public class TaskScheduleThread implements Runnable {
taskExecutionContext.getScheduleTime(),
taskExecutionContext.getTaskName(),
taskExecutionContext.getTaskType(),
taskExecutionContext.getTaskId(),
taskExecutionContext.getTaskInstanceId(),
CommonUtils.getSystemEnvPath(),
taskExecutionContext.getTenantCode(),
taskExecutionContext.getQueue(),
......@@ -132,13 +132,13 @@ public class TaskScheduleThread implements Runnable {
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskId()));
taskExecutionContext.getTaskInstanceId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskId()));
taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext.getTaskType(),
taskProps,
......@@ -156,14 +156,14 @@ public class TaskScheduleThread implements Runnable {
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand);
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
}
}
......@@ -213,13 +213,13 @@ public class TaskScheduleThread implements Runnable {
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskId() + ".log";
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskId() + ".log";
taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
......@@ -325,9 +325,9 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskExecutionContext.getExecutorId();
int executorId = taskExecutionContext.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,executorId,logger);
permissionCheck.checkPermission();
}
}
\ No newline at end of file
......@@ -16,12 +16,11 @@
*/
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -73,7 +72,7 @@ public class LogClientService {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
final Address address = new Address(host, port);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, logRequestTimeout);
......@@ -101,7 +100,7 @@ public class LogClientService {
logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
final Address address = new Address(host, port);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, logRequestTimeout);
......@@ -129,7 +128,7 @@ public class LogClientService {
logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null;
final Address address = new Address(host, port);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, logRequestTimeout);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册