提交 99b9c10d 编写于 作者: Q qiaozhanwei

1,add sendAsync method

2,refactor LoggerClient
上级 cfa882c4
......@@ -51,24 +51,55 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
* client bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* channels
*/
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(128);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
*/
private final NioEventLoopGroup workerGroup;
/**
* client config
*/
private final NettyClientConfig clientConfig;
/**
* saync semaphore
*/
private final Semaphore asyncSemaphore = new Semaphore(200, true);
/**
* callback thread executor
*/
private final ExecutorService callbackExecutor;
/**
* client handler
*/
private final NettyClientHandler clientHandler;
/**
* client init
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig){
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
......@@ -80,12 +111,16 @@ public class NettyRemotingClient {
}
});
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy());
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.start();
}
/**
* start
*/
private void start(){
this.bootstrap
......@@ -108,16 +143,40 @@ public class NettyRemotingClient {
isStarted.compareAndSet(false, true);
}
public void sendAsync(final Address address, final Command command, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
/**
* async send
* @param address address
* @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/
public void sendAsync(final Address address, final Command command,
final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
}
/**
* request unique identification
*/
final long opaque = command.getOpaque();
/**
* control concurrency number
*/
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if(acquired){
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore);
/**
* response future
*/
final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis,
invokeCallback,
releaseSemaphore);
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
......@@ -151,6 +210,15 @@ public class NettyRemotingClient {
}
}
/**
* sync send
* @param address address
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
* @throws RemotingException
*/
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
......@@ -172,6 +240,9 @@ public class NettyRemotingClient {
logger.error("send command {} to address {} failed", command, address);
}
});
/**
* sync wait for result
*/
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
......@@ -183,6 +254,11 @@ public class NettyRemotingClient {
return result;
}
/**
* get channel
* @param address
* @return
*/
public Channel getChannel(Address address) {
Channel channel = channels.get(address);
if(channel != null && channel.isActive()){
......@@ -191,6 +267,12 @@ public class NettyRemotingClient {
return createChannel(address, true);
}
/**
* create channel
* @param address address
* @param isSync sync flag
* @return channel
*/
public Channel createChannel(Address address, boolean isSync) {
ChannelFuture future;
try {
......@@ -211,6 +293,9 @@ public class NettyRemotingClient {
return null;
}
/**
* close
*/
public void close() {
if(isStarted.compareAndSet(true, false)){
try {
......@@ -228,6 +313,9 @@ public class NettyRemotingClient {
}
}
/**
* close channels
*/
private void closeChannels(){
for (Channel channel : this.channels.values()) {
channel.close();
......@@ -235,6 +323,10 @@ public class NettyRemotingClient {
this.channels.clear();
}
/**
* close channel
* @param address address
*/
public void closeChannel(Address address){
Channel channel = this.channels.remove(address);
if(channel != null){
......
......@@ -49,7 +49,7 @@ public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/**
* server bootstart
* server bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.List;import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskRequestCommand implements Serializable { private String taskId; private String attemptId; private String applicationName; private String groupName; private String taskName; private int connectorPort; private String description; private String className; private String methodName; private String params; private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.List;import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ private String applicationName; /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connector port */ private int connectorPort; /** * description info */ private String description; /** * class name */ private String className; /** * method name */ private String methodName; /** * parameters */ private String params; /** * shard itemds */ private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { private String taskId; private String attemptId; private Object result; private long receivedTime; private int executeCount; private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.concurrent.atomic.AtomicLong; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * return result */ private Object result; /** * received time */ private long receivedTime; /** * execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; }}
\ No newline at end of file
......
......@@ -23,11 +23,19 @@ import io.netty.buffer.Unpooled;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* ping machine
*/
public class Ping implements Serializable {
/**
* ping body
*/
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
/**
* request command body
*/
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
private static final ByteBuf PING_BUF;
......@@ -42,10 +50,19 @@ public class Ping implements Serializable {
PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
}
/**
* ping content
* @return result
*/
public static ByteBuf pingContent(){
return PING_BUF.duplicate();
}
/**
* create ping command
*
* @return command
*/
public static Command create(){
Command command = new Command();
command.setType(CommandType.PING);
......
......@@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class GetLogBytesRequestCommand implements Serializable {
/**
* log path
*/
private String path;
public GetLogBytesRequestCommand() {
......@@ -47,8 +50,9 @@ public class GetLogBytesRequestCommand implements Serializable {
}
/**
* package request command
*
* @return
* @return command
*/
public Command convert2Command(){
Command command = new Command();
......
......@@ -29,10 +29,19 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class RollViewLogRequestCommand implements Serializable {
/**
* log path
*/
private String path;
/**
* skip line number
*/
private int skipLineNum;
/**
* query line number
*/
private int limit;
public RollViewLogRequestCommand() {
......@@ -68,6 +77,11 @@ public class RollViewLogRequestCommand implements Serializable {
this.limit = limit;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
......
......@@ -29,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ViewLogRequestCommand implements Serializable {
/**
* log path
*/
private String path;
public ViewLogRequestCommand() {
......@@ -46,6 +49,11 @@ public class ViewLogRequestCommand implements Serializable {
this.path = path;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
......
......@@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.remote.exceptions;
/**
* timeout exception
*/
public class RemotingTimeoutException extends RemotingException{
public RemotingTimeoutException(String message) {
......
......@@ -16,6 +16,9 @@
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* too much request exception
*/
public class RemotingTooMuchRequestException extends RemotingException{
public RemotingTooMuchRequestException(String message) {
......
......@@ -21,6 +21,11 @@ package org.apache.dolphinscheduler.remote.future;
*/
public interface InvokeCallback {
/**
* operation
*
* @param responseFuture responseFuture
*/
void operationComplete(final ResponseFuture responseFuture);
}
......@@ -28,18 +28,33 @@ public class ResponseFuture {
private final static ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
/**
* request unique identification
*/
private final long opaque;
/**
* timeout
*/
private final long timeoutMillis;
/**
* invokeCallback function
*/
private final InvokeCallback invokeCallback;
/**
* releaseSemaphore
*/
private final ReleaseSemaphore releaseSemaphore;
private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis();
/**
* response command
*/
private volatile Command responseCommand;
private volatile boolean sendOk = true;
......@@ -54,11 +69,22 @@ public class ResponseFuture {
FUTURE_TABLE.put(opaque, this);
}
/**
* wait for response
*
* @return command
* @throws InterruptedException
*/
public Command waitResponse() throws InterruptedException {
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
/**
* put response
*
* @param responseCommand responseCommand
*/
public void putResponse(final Command responseCommand) {
this.responseCommand = responseCommand;
this.latch.countDown();
......@@ -69,11 +95,18 @@ public class ResponseFuture {
return FUTURE_TABLE.get(opaque);
}
/**
* whether timeout
* @return timeout
*/
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
/**
* execute invoke callback
*/
public void executeInvokeCallback() {
if (invokeCallback != null) {
invokeCallback.operationComplete(this);
......@@ -120,6 +153,9 @@ public class ResponseFuture {
return invokeCallback;
}
/**
* release
*/
public void release() {
if(this.releaseSemaphore != null){
this.releaseSemaphore.release();
......
......@@ -34,8 +34,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
* netty client
*/
private final NettyRemotingClient nettyRemotingClient;
/**
* callback thread executor
*/
private final ExecutorService callbackExecutor;
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
......@@ -43,17 +49,36 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
this.callbackExecutor = callbackExecutor;
}
/**
* When the current channel is not active,
* the current channel has reached the end of its life cycle
*
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}
/**
* The current channel reads data from the remote
*
* @param ctx channel handler context
* @param msg message
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived((Command)msg);
}
/**
* process received logic
*
* @param responseCommand responseCommand
*/
private void processReceived(final Command responseCommand) {
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
if(future != null){
......@@ -74,6 +99,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
}
}
/**
* caught exception
* @param ctx channel handler context
* @param cause cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause);
......@@ -81,6 +112,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
ctx.channel().close();
}
/**
* channel write changed
*
* @param ctx channel handler context
* @throws Exception
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
......
......@@ -19,12 +19,21 @@ package org.apache.dolphinscheduler.remote.utils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* thread factory
*/
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger increment = new AtomicInteger(1);
/**
* name
*/
private final String name;
/**
* count
*/
private final int count;
public NamedThreadFactory(String name){
......@@ -36,6 +45,11 @@ public class NamedThreadFactory implements ThreadFactory {
this.count = count;
}
/**
* create thread
* @param r runnable
* @return thread
*/
@Override
public Thread newThread(Runnable r) {
final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
......
......@@ -41,7 +41,7 @@ public class NettyRemotingClientTest {
/**
* test ping
* test sned sync
*/
@Test
public void testSendSync(){
......@@ -69,6 +69,9 @@ public class NettyRemotingClientTest {
}
}
/**
* test sned async
*/
@Test
public void testSendAsync(){
NettyServerConfig serverConfig = new NettyServerConfig();
......
......@@ -28,14 +28,29 @@ public class LogPromise {
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
/**
* request unique identification
*/
private long opaque;
/**
* start timemillis
*/
private final long start;
/**
* timeout
*/
private final long timeout;
/**
* latch
*/
private final CountDownLatch latch;
/**
* result
*/
private Object result;
public LogPromise(long opaque, long timeout){
......@@ -59,15 +74,28 @@ public class LogPromise {
}
}
/**
* countdown
*
* @param result result
*/
private void doCountDown(Object result){
this.result = result;
this.latch.countDown();
}
/**
* whether timeout
* @return timeout
*/
public boolean isTimeout(){
return System.currentTimeMillis() - start > timeout;
}
/**
* get result
* @return
*/
public Object getResult(){
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册