From 99b9c10d78bef033a658a2389640ba471ae8da29 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Sun, 16 Feb 2020 18:12:44 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=8Cadd=20sendAsync=20method=202=EF=BC=8C?= =?UTF-8?q?refactor=20LoggerClient?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../remote/NettyRemotingClient.java | 98 ++++++++++++++++++- .../remote/NettyRemotingServer.java | 2 +- .../command/ExecuteTaskRequestCommand.java | 2 +- .../command/ExecuteTaskResponseCommand.java | 2 +- .../dolphinscheduler/remote/command/Ping.java | 19 +++- .../log/GetLogBytesRequestCommand.java | 6 +- .../log/RollViewLogRequestCommand.java | 14 +++ .../command/log/ViewLogRequestCommand.java | 8 ++ .../exceptions/RemotingTimeoutException.java | 3 + .../RemotingTooMuchRequestException.java | 3 + .../remote/future/InvokeCallback.java | 5 + .../remote/future/ResponseFuture.java | 36 +++++++ .../remote/handler/NettyClientHandler.java | 37 +++++++ .../remote/utils/NamedThreadFactory.java | 14 +++ .../remote/NettyRemotingClientTest.java | 5 +- .../service/log/LogPromise.java | 28 ++++++ 16 files changed, 273 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index e07cfd640..357fd6d19 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -51,24 +51,55 @@ public class NettyRemotingClient { private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); + /** + * client bootstrap + */ private final Bootstrap bootstrap = new Bootstrap(); + /** + * encoder + */ private final NettyEncoder encoder = new NettyEncoder(); + /** + * channels + */ private final ConcurrentHashMap channels = new ConcurrentHashMap(128); + /** + * started flag + */ private final AtomicBoolean isStarted = new AtomicBoolean(false); + /** + * worker group + */ private final NioEventLoopGroup workerGroup; + /** + * client config + */ private final NettyClientConfig clientConfig; + /** + * saync semaphore + */ private final Semaphore asyncSemaphore = new Semaphore(200, true); + /** + * callback thread executor + */ private final ExecutorService callbackExecutor; + /** + * client handler + */ private final NettyClientHandler clientHandler; + /** + * client init + * @param clientConfig client config + */ public NettyRemotingClient(final NettyClientConfig clientConfig){ this.clientConfig = clientConfig; this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { @@ -80,12 +111,16 @@ public class NettyRemotingClient { } }); this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy()); + new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), + new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.start(); } + /** + * start + */ private void start(){ this.bootstrap @@ -108,16 +143,40 @@ public class NettyRemotingClient { isStarted.compareAndSet(false, true); } - public void sendAsync(final Address address, final Command command, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingException { + /** + * async send + * @param address address + * @param command command + * @param timeoutMillis timeoutMillis + * @param invokeCallback callback function + * @throws InterruptedException + * @throws RemotingException + */ + public void sendAsync(final Address address, final Command command, + final long timeoutMillis, + final InvokeCallback invokeCallback) throws InterruptedException, RemotingException { final Channel channel = getChannel(address); if (channel == null) { throw new RemotingException("network error"); } + /** + * request unique identification + */ final long opaque = command.getOpaque(); + /** + * control concurrency number + */ boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if(acquired){ final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore); + + /** + * response future + */ + final ResponseFuture responseFuture = new ResponseFuture(opaque, + timeoutMillis, + invokeCallback, + releaseSemaphore); try { channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ @@ -151,6 +210,15 @@ public class NettyRemotingClient { } } + /** + * sync send + * @param address address + * @param command command + * @param timeoutMillis timeoutMillis + * @return command + * @throws InterruptedException + * @throws RemotingException + */ public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException { final Channel channel = getChannel(address); if (channel == null) { @@ -172,6 +240,9 @@ public class NettyRemotingClient { logger.error("send command {} to address {} failed", command, address); } }); + /** + * sync wait for result + */ Command result = responseFuture.waitResponse(); if(result == null){ if(responseFuture.isSendOK()){ @@ -183,6 +254,11 @@ public class NettyRemotingClient { return result; } + /** + * get channel + * @param address + * @return + */ public Channel getChannel(Address address) { Channel channel = channels.get(address); if(channel != null && channel.isActive()){ @@ -191,6 +267,12 @@ public class NettyRemotingClient { return createChannel(address, true); } + /** + * create channel + * @param address address + * @param isSync sync flag + * @return channel + */ public Channel createChannel(Address address, boolean isSync) { ChannelFuture future; try { @@ -211,6 +293,9 @@ public class NettyRemotingClient { return null; } + /** + * close + */ public void close() { if(isStarted.compareAndSet(true, false)){ try { @@ -228,6 +313,9 @@ public class NettyRemotingClient { } } + /** + * close channels + */ private void closeChannels(){ for (Channel channel : this.channels.values()) { channel.close(); @@ -235,6 +323,10 @@ public class NettyRemotingClient { this.channels.clear(); } + /** + * close channel + * @param address address + */ public void closeChannel(Address address){ Channel channel = this.channels.remove(address); if(channel != null){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index c69bf0954..29b231763 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -49,7 +49,7 @@ public class NettyRemotingServer { private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); /** - * server bootstart + * server bootstrap */ private final ServerBootstrap serverBootstrap = new ServerBootstrap(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index b3801aceb..beec05540 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskRequestCommand implements Serializable { private String taskId; private String attemptId; private String applicationName; private String groupName; private String taskName; private int connectorPort; private String description; private String className; private String methodName; private String params; private List shardItems; public List getShardItems() { return shardItems; } public void setShardItems(List shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ private String applicationName; /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connector port */ private int connectorPort; /** * description info */ private String description; /** * class name */ private String className; /** * method name */ private String methodName; /** * parameters */ private String params; /** * shard itemds */ private List shardItems; public List getShardItems() { return shardItems; } public void setShardItems(List shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java index aeb5f7d85..7e35fa6e7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { private String taskId; private String attemptId; private Object result; private long receivedTime; private int executeCount; private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * return result */ private Object result; /** * received time */ private long receivedTime; /** * execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java index 38a8b1487..c50413e98 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java @@ -23,11 +23,19 @@ import io.netty.buffer.Unpooled; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; - +/** + * ping machine + */ public class Ping implements Serializable { + /** + * ping body + */ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; + /** + * request command body + */ private static byte[] EMPTY_BODY_ARRAY = new byte[0]; private static final ByteBuf PING_BUF; @@ -42,10 +50,19 @@ public class Ping implements Serializable { PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); } + /** + * ping content + * @return result + */ public static ByteBuf pingContent(){ return PING_BUF.duplicate(); } + /** + * create ping command + * + * @return command + */ public static Command create(){ Command command = new Command(); command.setType(CommandType.PING); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java index 088bdd674..4cc32ed42 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java @@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong; */ public class GetLogBytesRequestCommand implements Serializable { + /** + * log path + */ private String path; public GetLogBytesRequestCommand() { @@ -47,8 +50,9 @@ public class GetLogBytesRequestCommand implements Serializable { } /** + * package request command * - * @return + * @return command */ public Command convert2Command(){ Command command = new Command(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java index 339c1a7f9..621d35a80 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java @@ -29,10 +29,19 @@ import java.util.concurrent.atomic.AtomicLong; */ public class RollViewLogRequestCommand implements Serializable { + /** + * log path + */ private String path; + /** + * skip line number + */ private int skipLineNum; + /** + * query line number + */ private int limit; public RollViewLogRequestCommand() { @@ -68,6 +77,11 @@ public class RollViewLogRequestCommand implements Serializable { this.limit = limit; } + /** + * package request command + * + * @return command + */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java index 69f600925..8835348ee 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java @@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ViewLogRequestCommand implements Serializable { + /** + * log path + */ private String path; public ViewLogRequestCommand() { @@ -46,6 +49,11 @@ public class ViewLogRequestCommand implements Serializable { this.path = path; } + /** + * package request command + * + * @return command + */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java index aaf917078..3d91ba57f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java @@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.remote.exceptions; +/** + * timeout exception + */ public class RemotingTimeoutException extends RemotingException{ public RemotingTimeoutException(String message) { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java index 5ee11a04a..82cc3f4db 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java @@ -16,6 +16,9 @@ */ package org.apache.dolphinscheduler.remote.exceptions; +/** + * too much request exception + */ public class RemotingTooMuchRequestException extends RemotingException{ public RemotingTooMuchRequestException(String message) { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java index 7cf875b00..84cdae867 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java @@ -21,6 +21,11 @@ package org.apache.dolphinscheduler.remote.future; */ public interface InvokeCallback { + /** + * operation + * + * @param responseFuture responseFuture + */ void operationComplete(final ResponseFuture responseFuture); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java index a9bdb39ad..caff34236 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -28,18 +28,33 @@ public class ResponseFuture { private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); + /** + * request unique identification + */ private final long opaque; + /** + * timeout + */ private final long timeoutMillis; + /** + * invokeCallback function + */ private final InvokeCallback invokeCallback; + /** + * releaseSemaphore + */ private final ReleaseSemaphore releaseSemaphore; private final CountDownLatch latch = new CountDownLatch(1); private final long beginTimestamp = System.currentTimeMillis(); + /** + * response command + */ private volatile Command responseCommand; private volatile boolean sendOk = true; @@ -54,11 +69,22 @@ public class ResponseFuture { FUTURE_TABLE.put(opaque, this); } + /** + * wait for response + * + * @return command + * @throws InterruptedException + */ public Command waitResponse() throws InterruptedException { this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } + /** + * put response + * + * @param responseCommand responseCommand + */ public void putResponse(final Command responseCommand) { this.responseCommand = responseCommand; this.latch.countDown(); @@ -69,11 +95,18 @@ public class ResponseFuture { return FUTURE_TABLE.get(opaque); } + /** + * whether timeout + * @return timeout + */ public boolean isTimeout() { long diff = System.currentTimeMillis() - this.beginTimestamp; return diff > this.timeoutMillis; } + /** + * execute invoke callback + */ public void executeInvokeCallback() { if (invokeCallback != null) { invokeCallback.operationComplete(this); @@ -120,6 +153,9 @@ public class ResponseFuture { return invokeCallback; } + /** + * release + */ public void release() { if(this.releaseSemaphore != null){ this.releaseSemaphore.release(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 97f6632fb..d5d0d4df8 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -34,8 +34,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); + /** + * netty client + */ private final NettyRemotingClient nettyRemotingClient; + /** + * callback thread executor + */ private final ExecutorService callbackExecutor; public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ @@ -43,17 +49,36 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { this.callbackExecutor = callbackExecutor; } + /** + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * + * @param ctx channel handler context + * @throws Exception + */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); ctx.channel().close(); } + /** + * The current channel reads data from the remote + * + * @param ctx channel handler context + * @param msg message + * @throws Exception + */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { processReceived((Command)msg); } + /** + * process received logic + * + * @param responseCommand responseCommand + */ private void processReceived(final Command responseCommand) { ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque()); if(future != null){ @@ -74,6 +99,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } } + /** + * caught exception + * @param ctx channel handler context + * @param cause cause + * @throws Exception + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("exceptionCaught : {}", cause); @@ -81,6 +112,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ctx.channel().close(); } + /** + * channel write changed + * + * @param ctx channel handler context + * @throws Exception + */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java index bef64c7dc..2f0d05ebd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java @@ -19,12 +19,21 @@ package org.apache.dolphinscheduler.remote.utils; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +/** + * thread factory + */ public class NamedThreadFactory implements ThreadFactory { private final AtomicInteger increment = new AtomicInteger(1); + /** + * name + */ private final String name; + /** + * count + */ private final int count; public NamedThreadFactory(String name){ @@ -36,6 +45,11 @@ public class NamedThreadFactory implements ThreadFactory { this.count = count; } + /** + * create thread + * @param r runnable + * @return thread + */ @Override public Thread newThread(Runnable r) { final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement()) diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index 1ad6734b4..b6f8e2a8d 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -41,7 +41,7 @@ public class NettyRemotingClientTest { /** - * test ping + * test sned sync */ @Test public void testSendSync(){ @@ -69,6 +69,9 @@ public class NettyRemotingClientTest { } } + /** + * test sned async + */ @Test public void testSendAsync(){ NettyServerConfig serverConfig = new NettyServerConfig(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java index 8920b8a52..98ee3fdbb 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java @@ -28,14 +28,29 @@ public class LogPromise { private static final ConcurrentHashMap PROMISES = new ConcurrentHashMap<>(); + /** + * request unique identification + */ private long opaque; + /** + * start timemillis + */ private final long start; + /** + * timeout + */ private final long timeout; + /** + * latch + */ private final CountDownLatch latch; + /** + * result + */ private Object result; public LogPromise(long opaque, long timeout){ @@ -59,15 +74,28 @@ public class LogPromise { } } + /** + * countdown + * + * @param result result + */ private void doCountDown(Object result){ this.result = result; this.latch.countDown(); } + /** + * whether timeout + * @return timeout + */ public boolean isTimeout(){ return System.currentTimeMillis() - start > timeout; } + /** + * get result + * @return + */ public Object getResult(){ try { latch.await(timeout, TimeUnit.MILLISECONDS); -- GitLab