提交 1ac1d582 编写于 作者: Q qiaozhanwei

remote module add comment

上级 3ff7b34a
......@@ -50,22 +50,51 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
* bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* channels
*/
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
/**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
*/
private final NioEventLoopGroup workerGroup;
/**
* client handler
*/
private final NettyClientHandler clientHandler = new NettyClientHandler(this);
/**
* netty client config
*/
private final NettyClientConfig clientConfig;
/**
* netty client init
*
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
......@@ -79,6 +108,9 @@ public class NettyRemotingClient {
this.start();
}
/**
* netty server start
*/
private void start(){
this.bootstrap
......@@ -97,18 +129,36 @@ public class NettyRemotingClient {
encoder);
}
});
//
isStarted.compareAndSet(false, true);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
/**
* send connect
* @param address address
* @param command command
* @throws RemotingException
*/
public void send(final Address address, final Command command) throws RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
......@@ -132,6 +182,11 @@ public class NettyRemotingClient {
}
}
/**
* get channel
* @param address address
* @return channel
*/
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
if(channel != null && channel.isActive()){
......@@ -140,6 +195,12 @@ public class NettyRemotingClient {
return createChannel(address, true);
}
/**
* create channel
* @param address address
* @param isSync is sync
* @return channel
*/
public Channel createChannel(Address address, boolean isSync) {
ChannelFuture future;
try {
......@@ -160,10 +221,17 @@ public class NettyRemotingClient {
return null;
}
/**
* get default thread executor
* @return thread executor
*/
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
/**
* close client
*/
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
......@@ -181,6 +249,9 @@ public class NettyRemotingClient {
}
}
/**
* close channel
*/
private void closeChannels(){
for (Channel channel : this.channels.values()) {
channel.close();
......@@ -188,6 +259,10 @@ public class NettyRemotingClient {
this.channels.clear();
}
/**
* remove channel
* @param address address
*/
public void removeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
......
......@@ -48,28 +48,58 @@ public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/**
* server bootstart
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
/**
* boss group
*/
private final NioEventLoopGroup bossGroup;
/**
* worker group
*/
private final NioEventLoopGroup workGroup;
/**
* server config
*/
private final NettyServerConfig serverConfig;
/**
* server handler
*/
private final NettyServerHandler serverHandler = new NettyServerHandler(this);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* server init
*
* @param serverConfig server config
*/
public NettyRemotingServer(final NettyServerConfig serverConfig){
this.serverConfig = serverConfig;
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
......@@ -78,12 +108,16 @@ public class NettyRemotingServer {
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
}
/**
* server start
*/
public void start(){
if(this.isStarted.get()){
......@@ -125,6 +159,11 @@ public class NettyRemotingServer {
isStarted.compareAndSet(false, true);
}
/**
* init netty channel
* @param ch socket channel
* @throws Exception
*/
private void initNettyChannel(NioSocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder);
......@@ -132,14 +171,30 @@ public class NettyRemotingServer {
pipeline.addLast("handler", serverHandler);
}
/**
* register processor
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor);
}
/**
* get default thread executor
* @return thread executor
*/
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
......
......@@ -38,6 +38,14 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
private final CommandHeader commandHeader = new CommandHeader();
/**
* decode
*
* @param ctx channel handler context
* @param in byte buffer
* @param out out content
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()){
......@@ -67,6 +75,11 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
}
}
/**
* get command type
* @param type type
* @return
*/
private CommandType commandType(byte type){
for(CommandType ct : CommandType.values()){
if(ct.ordinal() == type){
......@@ -76,6 +89,10 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
return null;
}
/**
* check magic
* @param magic magic
*/
private void checkMagic(byte magic) {
if (magic != Command.MAGIC) {
throw new IllegalArgumentException("illegal packet [magic]" + magic);
......
......@@ -28,6 +28,14 @@ import org.apache.dolphinscheduler.remote.command.Command;
@Sharable
public class NettyEncoder extends MessageToByteEncoder<Command> {
/**
* encode
*
* @param ctx channel handler context
* @param msg command
* @param out byte buffer
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
if(msg == null){
......
......@@ -36,7 +36,7 @@ public class Command implements Serializable {
}
/**
* comman type
* command type
*/
private CommandType type;
......@@ -45,6 +45,9 @@ public class Command implements Serializable {
*/
private long opaque;
/**
* data body
*/
private byte[] body;
public CommandType getType() {
......
......@@ -23,10 +23,19 @@ import java.io.Serializable;
*/
public class CommandHeader implements Serializable {
/**
* type
*/
private byte type;
/**
* request unique identification
*/
private long opaque;
/**
* body length
*/
private int bodyLength;
public int getBodyLength() {
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task response */ EXECUTE_TASK_RESPONSE, PING, PONG;}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG;}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.List;import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; private String attemptId; private String applicationName; private String groupName; private String taskName; private int connectorPort; private String description; private String className; private String methodName; private String params; private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.List;import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ private String applicationName; /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connect port */ private int connectorPort; /** * description info */ private String description; /** * class name */ private String className; /** * method name */ private String methodName; /** * params */ private String params; /** * shard items */ private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; private String attemptId; private Object result; private long receivedTime; private int executeCount; private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.concurrent.atomic.AtomicLong; /** * execute taks response command */public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * result info */ private Object result; /** * receive time */ private long receivedTime; /** * execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } /** * package response command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
......
......@@ -23,13 +23,21 @@ import io.netty.buffer.Unpooled;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* ping machine
*/
public class Ping implements Serializable {
private static final AtomicLong ID = new AtomicLong(1);
/**
* ping body
*/
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
/**
* request command body
*/
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
private static final ByteBuf PING_BUF;
......@@ -44,10 +52,19 @@ public class Ping implements Serializable {
PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
/**
* ping connect
* @return result
*/
public static ByteBuf pingContent(){
return PING_BUF.duplicate();
}
/**
* package ping command
*
* @return command
*/
public static Command create(){
Command command = new Command(ID.getAndIncrement());
command.setType(CommandType.PING);
......
......@@ -22,13 +22,24 @@ import io.netty.buffer.Unpooled;
import java.io.Serializable;
/**
* Pong return after ping
*/
public class Pong implements Serializable {
/**
* pong body
*/
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
/**
* pong command body
*/
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
/**
* ping byte buffer
*/
private static final ByteBuf PONG_BUF;
static {
......@@ -41,10 +52,20 @@ public class Pong implements Serializable {
PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
/**
* ping content
* @return result
*/
public static ByteBuf pingContent(){
return PONG_BUF.duplicate();
}
/**
* package pong command
*
* @param opaque request unique identification
* @return command
*/
public static Command create(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.PONG);
......
......@@ -29,8 +29,14 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class GetLogBytesRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
/**
* log path
*/
private String path;
public GetLogBytesRequestCommand() {
......@@ -49,8 +55,9 @@ public class GetLogBytesRequestCommand implements Serializable {
}
/**
* package request command
*
* @return
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
......
......@@ -28,6 +28,9 @@ import java.io.Serializable;
*/
public class GetLogBytesResponseCommand implements Serializable {
/**
* log byte data
*/
private byte[] data;
public GetLogBytesResponseCommand() {
......@@ -45,6 +48,12 @@ public class GetLogBytesResponseCommand implements Serializable {
this.data = data;
}
/**
* package response command
*
* @param opaque request unique identification
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
......
......@@ -29,12 +29,24 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class RollViewLogRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
/**
* log path
*/
private String path;
/**
* skip line number
*/
private int skipLineNum;
/**
* query log line number limit
*/
private int limit;
public RollViewLogRequestCommand() {
......@@ -70,6 +82,11 @@ public class RollViewLogRequestCommand implements Serializable {
this.limit = limit;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
......
......@@ -28,6 +28,9 @@ import java.io.Serializable;
*/
public class RollViewLogResponseCommand implements Serializable {
/**
* response data
*/
private String msg;
public RollViewLogResponseCommand() {
......@@ -45,6 +48,12 @@ public class RollViewLogResponseCommand implements Serializable {
this.msg = msg;
}
/**
* package response command
*
* @param opaque request unique identification
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
......
......@@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ViewLogRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
......@@ -48,6 +51,11 @@ public class ViewLogRequestCommand implements Serializable {
this.path = path;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
......
......@@ -28,6 +28,9 @@ import java.io.Serializable;
*/
public class ViewLogResponseCommand implements Serializable {
/**
* response data
*/
private String msg;
public ViewLogResponseCommand() {
......@@ -45,6 +48,12 @@ public class ViewLogResponseCommand implements Serializable {
this.msg = msg;
}
/**
* package response command
*
* @param opaque request unique identification
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
......
......@@ -23,14 +23,29 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
*/
public class NettyClientConfig {
/**
* worker threads,default get machine cpus
*/
private int workerThreads = Constants.CPUS;
/**
* whether tpc delay
*/
private boolean tcpNoDelay = true;
/**
* whether keep alive
*/
private boolean soKeepalive = true;
/**
* send buffer size
*/
private int sendBufferSize = 65535;
/**
* receive buffer size
*/
private int receiveBufferSize = 65535;
public int getWorkerThreads() {
......
......@@ -23,18 +23,39 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
*/
public class NettyServerConfig {
/**
* init the server connectable queue
*/
private int soBacklog = 1024;
/**
* whether tpc delay
*/
private boolean tcpNoDelay = true;
/**
* whether keep alive
*/
private boolean soKeepalive = true;
/**
* send buffer size
*/
private int sendBufferSize = 65535;
/**
* receive buffer size
*/
private int receiveBufferSize = 65535;
/**
* worker threads,default get machine cpus
*/
private int workerThread = Constants.CPUS;
/**
* listen port
*/
private int listenPort = 12346;
public int getListenPort() {
......
......@@ -38,29 +38,62 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
* netty remote client
*/
private final NettyRemotingClient nettyRemotingClient;
/**
* client processors queue
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
this.nettyRemotingClient = nettyRemotingClient;
}
/**
* When the current channel is not active,
* the current channel has reached the end of its life cycle
*
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
/**
* The current channel reads data from the remote
*
* @param ctx channel handler context
* @param msg message
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
......@@ -69,6 +102,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
}
/**
* process received logic
*
* @param channel channel
* @param msg message
*/
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
......@@ -93,6 +132,13 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* caught exception
*
* @param ctx channel handler context
* @param cause cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
......@@ -100,6 +146,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().close();
}
/**
* channel write changed
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
......
......@@ -39,28 +39,60 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/**
* netty remote server
*/
private final NettyRemotingServer nettyRemotingServer;
/**
* server processors queue
*/
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
this.nettyRemotingServer = nettyRemotingServer;
}
/**
* When the current channel is not active,
* the current channel has reached the end of its life cycle
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().close();
}
/**
* The current channel reads data from the remote end
*
* @param ctx channel handler context
* @param msg message
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
......@@ -69,11 +101,18 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
}
/**
* process received logic
* @param channel channel
* @param msg message
*/
private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = new Runnable() {
@Override
public void run() {
try {
pair.getLeft().process(channel, msg);
......@@ -92,12 +131,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* caught exception
*
* @param ctx channel handler context
* @param cause cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
ctx.channel().close();
}
/**
* channel write changed
*
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
......
......@@ -24,5 +24,10 @@ import org.apache.dolphinscheduler.remote.command.Command;
*/
public interface NettyRequestProcessor {
/**
* process logic
* @param channel channel
* @param command command
*/
void process(final Channel channel, final Command command);
}
......@@ -23,8 +23,14 @@ import java.io.Serializable;
*/
public class Address implements Serializable {
/**
* host
*/
private String host;
/**
* port
*/
private int port;
public Address(){
......
......@@ -25,14 +25,30 @@ import java.net.InetSocketAddress;
*/
public class ChannelUtils {
/**
* get local address
*
* @param channel channel
* @return local address
*/
public static String getLocalAddress(Channel channel){
return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress();
}
/**
* get remote address
* @param channel channel
* @return remote address
*/
public static String getRemoteAddress(Channel channel){
return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress();
}
/**
* channel to address
* @param channel channel
* @return address
*/
public static Address toAddress(Channel channel){
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
......
......@@ -18,14 +18,24 @@ package org.apache.dolphinscheduler.remote.utils;
import java.nio.charset.Charset;
/**
* constant
*/
public class Constants {
public static final String COMMA = ",";
public static final String SLASH = "/";
/**
* charset
*/
public static final Charset UTF8 = Charset.forName("UTF-8");
/**
* cpus
*/
public static final int CPUS = Runtime.getRuntime().availableProcessors();
}
......@@ -23,15 +23,36 @@ import com.alibaba.fastjson.JSON;
*/
public class FastJsonSerializer {
/**
* serialize to byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
String json = JSON.toJSONString(obj);
return json.getBytes(Constants.UTF8);
}
/**
* serialize to string
* @param obj object
* @param <T> object type
* @return string
*/
public static <T> String serializeToString(T obj) {
return JSON.toJSONString(obj);
}
/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
public static <T> T deserialize(byte[] src, Class<T> clazz) {
return JSON.parseObject(new String(src, Constants.UTF8), clazz);
}
......
......@@ -18,6 +18,12 @@
package org.apache.dolphinscheduler.remote.utils;
/**
* key value pair
*
* @param <L> L generic type
* @param <R> R generic type
*/
public class Pair<L, R> {
private L left;
......
......@@ -32,9 +32,15 @@ import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
/**
* netty remote client test
*/
public class NettyRemotingClientTest {
/**
* test ping
*/
@Test
public void testSend(){
NettyServerConfig serverConfig = new NettyServerConfig();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册