提交 e42970b0 编写于 作者: D duhengforever

Polish async invoke implementation, prevent the block occurred during create channel

上级 c668abb8
......@@ -18,9 +18,7 @@ public class RemotingClientFactory {
private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
static {
log.info("begin load client");
paths = ServiceProvider.loadPath(CLIENT_LOCATION);
log.info("end load client, size:{}", paths.size());
}
public static RemotingClient createInstance(String protocol) {
......
......@@ -19,9 +19,7 @@ public class RemotingServerFactory {
private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
static {
log.info("begin load server");
protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
log.info("end load server, size:{}", protocolPathMap.size());
}
......
......@@ -41,7 +41,7 @@ public class RemotingUtil {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;
public static final String DEFAULT_PROTOCOL = "http2";
public static final String DEFAULT_PROTOCOL = "rocketmq";
public static final String REMOTING_CHARSET = "UTF-8";
static {
......
......@@ -45,7 +45,6 @@ public class CodecHelper {
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
System.out.println("cmd: " + cmd);
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
......
......@@ -99,6 +99,11 @@ public abstract class NettyRemotingAbstract {
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* Used for async execute task for aysncInvokeMethod
*/
private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false);
/**
* SSL context via which to create {@link SslHandler}.
*/
......@@ -445,38 +450,66 @@ public abstract class NettyRemotingAbstract {
}
}
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
abstract protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException;
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
invokeAsyncImpl(null, channel, request, timeoutMillis, invokeCallback);
}
public void invokeAsyncImpl(final String addr, final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
invokeAsyncImpl(addr, null, request, timeoutMillis, invokeCallback);
}
public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final long beginStartTime = System.currentTimeMillis();
boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
once.release();
throw new RemotingTimeoutException("InvokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
final int opaque = request.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(currentChannel, opaque, timeoutMillis, invokeCallback, once);
responseTable.put(opaque, responseFuture);
asyncExecuteService.submit(new Runnable() {
@Override
public void run() {
Channel channel = currentChannel;
final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel);
try {
if (channel == null) {
channel = getAndCreateChannel(addr, timeoutMillis);
responseFuture.setProcessChannel(channel);
}
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", remotingAddr);
}
});
} catch (Exception ex) {
responseFuture.release();
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", ex);
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
}
});
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
......@@ -527,7 +560,8 @@ public abstract class NettyRemotingAbstract {
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
throws
InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
......@@ -625,6 +659,4 @@ public abstract class NettyRemotingAbstract {
}
}
}
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture {
private final int opaque;
private final Channel processChannel;
private Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
......@@ -121,6 +121,10 @@ public class ResponseFuture {
return processChannel;
}
public void setProcessChannel(Channel processChannel) {
this.processChannel = processChannel;
}
@Override
public String toString() {
return "ResponseFuture [responseCommand=" + responseCommand
......
......@@ -32,6 +32,7 @@ import java.util.Random;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
......@@ -39,11 +40,17 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyEvent;
import org.apache.rocketmq.remoting.netty.NettyEventType;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
......@@ -53,6 +60,10 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
private final Lock lockNamesrvChannel = new ReentrantLock();
/**
* Used for async execute task for aysncInvokeMethod
*/
private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false);
private final Lock lockChannelTables = new ReentrantLock();
private static final long LOCK_TIMEOUT_MILLIS = 3000;
......@@ -169,6 +180,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
}
}
@Override
protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel(timeout);
......
package org.apache.rocketmq.remoting.transport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
......@@ -90,4 +91,8 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract
RemotingUtil.closeChannel(ctx.channel());
}
}
@Override protected Channel getAndCreateChannel(String addr, long timeout) throws InterruptedException {
return null;
}
}
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
\ No newline at end of file
http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册