From 1356e35f45ebfbca27930fe06e7abd659f111fb4 Mon Sep 17 00:00:00 2001 From: Jaskey Date: Tue, 27 Dec 2016 17:26:05 +0800 Subject: [PATCH] MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2 --- .../remoting/netty/NettyRemotingAbstract.java | 67 ++++++++++--------- .../remoting/NettyConnectionTest.java | 52 ++++++++++++++ 2 files changed, 88 insertions(+), 31 deletions(-) diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java index 70ae5b51..1c3fdc55 100644 --- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract { responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { - boolean runInThisThread = false; - ExecutorService executor = this.getCallbackExecutor(); - if (executor != null) { - try { - executor.submit(new Runnable() { - @Override - public void run() { - try { - responseFuture.executeInvokeCallback(); - } catch (Throwable e) { - PLOG.warn("execute callback in executor exception, and callback throw", e); - } - } - }); - } catch (Exception e) { - runInThisThread = true; - PLOG.warn("execute callback in executor exception, maybe executor busy", e); - } - } else { - runInThisThread = true; - } - - if (runInThisThread) { - try { - responseFuture.executeInvokeCallback(); - } catch (Throwable e) { - PLOG.warn("executeInvokeCallback Exception", e); - } - } + executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); } @@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract { } } + //execute callback in callback executor. If callback executor is null, run directly in current thread + private void executeInvokeCallback(final ResponseFuture responseFuture) { + boolean runInThisThread = false; + ExecutorService executor = this.getCallbackExecutor(); + if (executor != null) { + try { + executor.submit(new Runnable() { + @Override + public void run() { + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("execute callback in executor exception, and callback throw", e); + } + } + }); + } catch (Exception e) { + runInThisThread = true; + PLOG.warn("execute callback in executor exception, maybe executor busy", e); + } + } else { + runInThisThread = true; + } + + if (runInThisThread) { + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("executeInvokeCallback Exception", e); + } + } + } + public abstract RPCHook getRPCHook(); abstract public ExecutorService getCallbackExecutor(); @@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract { for (ResponseFuture rf : rfList) { try { - rf.executeInvokeCallback(); + executeInvokeCallback(rf); } catch (Throwable e) { PLOG.warn("scanResponseTable, operationComplete Exception", e); } @@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract { responseFuture.putResponse(null); responseTable.remove(opaque); try { - responseFuture.executeInvokeCallback(); + executeInvokeCallback(responseFuture); } catch (Throwable e) { PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java index e4ff9489..755d3322 100644 --- a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java @@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; +import com.alibaba.rocketmq.remoting.netty.ResponseFuture; import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** @@ -51,6 +57,52 @@ public class NettyConnectionTest { System.out.println("-----------------------------------------------------------------"); } + + @Test + public void test_async_timeout() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + RemotingClient client = createRemotingClient(); + final AtomicInteger ai = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(100); + for(int i=0;i<100;i++) { + try { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout + @Override + public void operationComplete(ResponseFuture responseFuture) { + if (responseFuture.isTimeout()) { + if(ai.getAndIncrement()==4) { + try { + System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName()); + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + else{ + System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName()); + } + } + else{ + System.out.println("Success."+Thread.currentThread().getName()); + } + latch.countDown(); + + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + + + + latch.await(1000, TimeUnit.MILLISECONDS); + Assert.assertEquals(1, latch.getCount());//only one should be blocked + client.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + public static RemotingClient createRemotingClient() { NettyClientConfig config = new NettyClientConfig(); config.setClientChannelMaxIdleTimeSeconds(15); -- GitLab