diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index bf019618d9c897e474ac10c1882f5c5fb37d2551..c13e75c206c793b2c1bc23a593de89e21f9ac10e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -162,7 +162,7 @@ public class MQClientAPIImplTest { public Object answer(InvocationOnMock mock) throws Throwable { InvokeCallback callback = mock.getArgument(3); RemotingCommand request = mock.getArgument(1); - ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null); + ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null); responseFuture.setResponseCommand(createSuccessResponse(request)); callback.operationComplete(responseFuture); return null; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 7c414e9dd651bc9329e17fa8f4eab1d85858e80c..45ca7304c8f096f1628b2d3774974bb62ef094f0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -364,7 +364,7 @@ public abstract class NettyRemotingAbstract { final int opaque = request.getOpaque(); try { - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); + final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -407,8 +407,7 @@ public abstract class NettyRemotingAbstract { boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); - - final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); + final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -417,20 +416,8 @@ public abstract class NettyRemotingAbstract { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; - } else { - responseFuture.setSendRequestOK(false); - } - - responseFuture.putResponse(null); - responseTable.remove(opaque); - try { - executeInvokeCallback(responseFuture); - } catch (Throwable e) { - log.warn("excute callback in writeAndFlush addListener, and callback throw", e); - } finally { - responseFuture.release(); } - + requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); @@ -455,6 +442,38 @@ public abstract class NettyRemotingAbstract { } } + private void requestFail(final int opaque) { + ResponseFuture responseFuture = responseTable.remove(opaque); + if (responseFuture != null) { + responseFuture.setSendRequestOK(false); + responseFuture.putResponse(null); + try { + executeInvokeCallback(responseFuture); + } catch (Throwable e) { + log.warn("execute callback in requestFail, and callback throw", e); + } finally { + responseFuture.release(); + } + } + } + + /** + * mark the request of the specified channel as fail and to invoke fail callback immediately + * @param channel the channel which is close already + */ + protected void failFast(final Channel channel) { + Iterator> it = responseTable.entrySet().iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + if (entry.getValue().getProcessChannel() == channel) { + Integer opaque = entry.getKey(); + if (opaque != null) { + requestFail(opaque); + } + } + } + } + public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 7cdfb80accbee2d50db929c1972d825f6097c526..241f2b07ad1abc292a75c3432376a22a678c71ce 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -660,7 +660,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); closeChannel(ctx.channel()); super.close(ctx, promise); - + NettyRemotingClient.this.failFast(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java index 1157c450283e7d5af6bee2afd15369948ad5d677..5f4c8c69502b0dbba53ae9ace301b175fabdb5f2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.remoting.netty; +import io.netty.channel.Channel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,6 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class ResponseFuture { private final int opaque; + private final Channel processChannel; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final long beginTimestamp = System.currentTimeMillis(); @@ -37,9 +39,10 @@ public class ResponseFuture { private volatile boolean sendRequestOK = true; private volatile Throwable cause; - public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, + public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback, SemaphoreReleaseOnlyOnce once) { this.opaque = opaque; + this.processChannel = channel; this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; this.once = once; @@ -114,11 +117,20 @@ public class ResponseFuture { return opaque; } + public Channel getProcessChannel() { + return processChannel; + } + @Override public String toString() { - return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK - + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis - + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp + return "ResponseFuture [responseCommand=" + responseCommand + + ", sendRequestOK=" + sendRequestOK + + ", cause=" + cause + + ", opaque=" + opaque + + ", processChannel=" + processChannel + + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + + ", beginTimestamp=" + beginTimestamp + ", countDownLatch=" + countDownLatch + "]"; } }