diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java index a766625a22f32d0793d513ec9b497afacf69c181..825d96b2ac487beffcc60752bd52ded0d69c0db5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java @@ -18,9 +18,7 @@ public class RemotingClientFactory { private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient"; static { - log.info("begin load client"); paths = ServiceProvider.loadPath(CLIENT_LOCATION); - log.info("end load client, size:{}", paths.size()); } public static RemotingClient createInstance(String protocol) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java index 125d4e05c3900879dab409e3ad9ee1020ee8a5f1..e530d7a5c2ef1feea59e1b31b88f1c4a0dcb6c85 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java @@ -19,9 +19,7 @@ public class RemotingServerFactory { private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer"; static { - log.info("begin load server"); protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION); - log.info("end load server, size:{}", protocolPathMap.size()); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index 88008ab643ad035ffe3646bc0ec284573fb12b6b..7cc8915ac101c5a4d828aba6b37e1c9b63695b0c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -41,7 +41,7 @@ public class RemotingUtil { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static boolean isLinuxPlatform = false; private static boolean isWindowsPlatform = false; - public static final String DEFAULT_PROTOCOL = "http2"; + public static final String DEFAULT_PROTOCOL = "rocketmq"; public static final String REMOTING_CHARSET = "UTF-8"; static { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java index d0e363234829a7b966ed229494df302cd1a9b25b..193cd398d3310391d2d993ede4bd1a091f26349e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java @@ -45,7 +45,6 @@ public class CodecHelper { byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); - System.out.println("cmd: " + cmd); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 17053ff346fa09e3f309c479a5f46d23db7e0ca5..cae2bf4569410740527331a626ee79b771108bc7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -99,6 +99,11 @@ public abstract class NettyRemotingAbstract { */ protected Pair defaultRequestProcessor; + /** + * Used for async execute task for aysncInvokeMethod + */ + private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false); + /** * SSL context via which to create {@link SslHandler}. */ @@ -445,38 +450,66 @@ public abstract class NettyRemotingAbstract { } } - public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, + abstract protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException; + + public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, + final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { - long beginStartTime = System.currentTimeMillis(); - final int opaque = request.getOpaque(); - boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + invokeAsyncImpl(null, channel, request, timeoutMillis, invokeCallback); + } + + public void invokeAsyncImpl(final String addr, final RemotingCommand request, + final long timeoutMillis, + final InvokeCallback invokeCallback) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + invokeAsyncImpl(addr, null, request, timeoutMillis, invokeCallback); + } + + public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request, + final long timeoutMillis, + final InvokeCallback invokeCallback) + throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + final long beginStartTime = System.currentTimeMillis(); + boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { - final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); + SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { - throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); + once.release(); + throw new RemotingTimeoutException("InvokeAsyncImpl call timeout"); } - - final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); - this.responseTable.put(opaque, responseFuture); - try { - channel.writeAndFlush(request).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - if (f.isSuccess()) { - responseFuture.setSendRequestOK(true); - return; + final int opaque = request.getOpaque(); + final ResponseFuture responseFuture = new ResponseFuture(currentChannel, opaque, timeoutMillis, invokeCallback, once); + responseTable.put(opaque, responseFuture); + asyncExecuteService.submit(new Runnable() { + @Override + public void run() { + Channel channel = currentChannel; + final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel); + try { + if (channel == null) { + channel = getAndCreateChannel(addr, timeoutMillis); + responseFuture.setProcessChannel(channel); } + channel.writeAndFlush(request).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + if (f.isSuccess()) { + responseFuture.setSendRequestOK(true); + return; + } + requestFail(opaque); + log.warn("send a request command to channel <{}> failed.", remotingAddr); + } + }); + } catch (Exception ex) { + responseFuture.release(); requestFail(opaque); - log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); + log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", ex); } - }); - } catch (Exception e) { - responseFuture.release(); - log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); - throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); - } + } + }); } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); @@ -527,7 +560,8 @@ public abstract class NettyRemotingAbstract { } public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) - throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + throws + InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { @@ -625,6 +659,4 @@ public abstract class NettyRemotingAbstract { } } - - } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 5f4c8c69502b0dbba53ae9ace301b175fabdb5f2..bffe60279a98585898e98403d55a48ca1b124127 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class ResponseFuture { private final int opaque; - private final Channel processChannel; + private Channel processChannel; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final long beginTimestamp = System.currentTimeMillis(); @@ -121,6 +121,10 @@ public class ResponseFuture { return processChannel; } + public void setProcessChannel(Channel processChannel) { + this.processChannel = processChannel; + } + @Override public String toString() { return "ResponseFuture [responseCommand=" + responseCommand diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java index 6dab2187ffd94d916918b1c42fa1cf5429ea50fe..d53d40b333502f3314bfed3c069b3c716f577cab 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java @@ -32,6 +32,7 @@ import java.util.Random; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -39,11 +40,17 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyEvent; import org.apache.rocketmq.remoting.netty.NettyEventType; import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.util.ThreadUtils; public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @@ -53,6 +60,10 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract private final AtomicReference namesrvAddrChoosed = new AtomicReference(); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final Lock lockNamesrvChannel = new ReentrantLock(); + /** + * Used for async execute task for aysncInvokeMethod + */ + private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false); private final Lock lockChannelTables = new ReentrantLock(); private static final long LOCK_TIMEOUT_MILLIS = 3000; @@ -169,6 +180,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract } } + @Override protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException { if (null == addr) { return getAndCreateNameserverChannel(timeout); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java index e8d847199568a88dbeaebb2737023ddef136bc27..cec0086dc45429f9db3f81e5ab20b722917d7fd6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java @@ -1,5 +1,6 @@ package org.apache.rocketmq.remoting.transport; +import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; @@ -90,4 +91,8 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract RemotingUtil.closeChannel(ctx.channel()); } } + + @Override protected Channel getAndCreateChannel(String addr, long timeout) throws InterruptedException { + return null; + } } diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer index 5079c88c4cc9bf4943afaeaa008df012bd7f7ffe..9f70dce7d42f55fb8d3a1fec5d0bcc81ba2f79cc 100644 --- a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer +++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer @@ -1,2 +1,2 @@ rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer -http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl \ No newline at end of file +http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl