提交 6ae619c4 编写于 作者: J Jaskey 提交者: von gosling

Invoke callback at once when channel is close (#95)

上级 b75af6b0
...@@ -162,7 +162,7 @@ public class MQClientAPIImplTest { ...@@ -162,7 +162,7 @@ public class MQClientAPIImplTest {
public Object answer(InvocationOnMock mock) throws Throwable { public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3); InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1); RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null); ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request)); responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture); callback.operationComplete(responseFuture);
return null; return null;
......
...@@ -364,7 +364,7 @@ public abstract class NettyRemotingAbstract { ...@@ -364,7 +364,7 @@ public abstract class NettyRemotingAbstract {
final int opaque = request.getOpaque(); final int opaque = request.getOpaque();
try { try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture); this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress(); final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...@@ -407,8 +407,7 @@ public abstract class NettyRemotingAbstract { ...@@ -407,8 +407,7 @@ public abstract class NettyRemotingAbstract {
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) { if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once);
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
this.responseTable.put(opaque, responseFuture); this.responseTable.put(opaque, responseFuture);
try { try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...@@ -417,20 +416,8 @@ public abstract class NettyRemotingAbstract { ...@@ -417,20 +416,8 @@ public abstract class NettyRemotingAbstract {
if (f.isSuccess()) { if (f.isSuccess()) {
responseFuture.setSendRequestOK(true); responseFuture.setSendRequestOK(true);
return; return;
} else {
responseFuture.setSendRequestOK(false);
} }
requestFail(opaque);
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
responseFuture.release();
}
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
} }
}); });
...@@ -455,6 +442,38 @@ public abstract class NettyRemotingAbstract { ...@@ -455,6 +442,38 @@ public abstract class NettyRemotingAbstract {
} }
} }
private void requestFail(final int opaque) {
ResponseFuture responseFuture = responseTable.remove(opaque);
if (responseFuture != null) {
responseFuture.setSendRequestOK(false);
responseFuture.putResponse(null);
try {
executeInvokeCallback(responseFuture);
} catch (Throwable e) {
log.warn("execute callback in requestFail, and callback throw", e);
} finally {
responseFuture.release();
}
}
}
/**
* mark the request of the specified channel as fail and to invoke fail callback immediately
* @param channel the channel which is close already
*/
protected void failFast(final Channel channel) {
Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> entry = it.next();
if (entry.getValue().getProcessChannel() == channel) {
Integer opaque = entry.getKey();
if (opaque != null) {
requestFail(opaque);
}
}
}
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC(); request.markOnewayRPC();
......
...@@ -660,7 +660,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -660,7 +660,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
closeChannel(ctx.channel()); closeChannel(ctx.channel());
super.close(ctx, promise); super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) { if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.remoting.netty; package org.apache.rocketmq.remoting.netty;
import io.netty.channel.Channel;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -25,6 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -25,6 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture { public class ResponseFuture {
private final int opaque; private final int opaque;
private final Channel processChannel;
private final long timeoutMillis; private final long timeoutMillis;
private final InvokeCallback invokeCallback; private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis(); private final long beginTimestamp = System.currentTimeMillis();
...@@ -37,9 +39,10 @@ public class ResponseFuture { ...@@ -37,9 +39,10 @@ public class ResponseFuture {
private volatile boolean sendRequestOK = true; private volatile boolean sendRequestOK = true;
private volatile Throwable cause; private volatile Throwable cause;
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
SemaphoreReleaseOnlyOnce once) { SemaphoreReleaseOnlyOnce once) {
this.opaque = opaque; this.opaque = opaque;
this.processChannel = channel;
this.timeoutMillis = timeoutMillis; this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback; this.invokeCallback = invokeCallback;
this.once = once; this.once = once;
...@@ -114,11 +117,20 @@ public class ResponseFuture { ...@@ -114,11 +117,20 @@ public class ResponseFuture {
return opaque; return opaque;
} }
public Channel getProcessChannel() {
return processChannel;
}
@Override @Override
public String toString() { public String toString() {
return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK return "ResponseFuture [responseCommand=" + responseCommand
+ ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis + ", sendRequestOK=" + sendRequestOK
+ ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp + ", cause=" + cause
+ ", opaque=" + opaque
+ ", processChannel=" + processChannel
+ ", timeoutMillis=" + timeoutMillis
+ ", invokeCallback=" + invokeCallback
+ ", beginTimestamp=" + beginTimestamp
+ ", countDownLatch=" + countDownLatch + "]"; + ", countDownLatch=" + countDownLatch + "]";
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册