提交 1356e35f 编写于 作者: J Jaskey 提交者: yukon

MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather...

MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2
上级 0c022e05
...@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract { ...@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract {
responseTable.remove(opaque); responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) { if (responseFuture.getInvokeCallback() != null) {
boolean runInThisThread = false; executeInvokeCallback(responseFuture);
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("execute callback in executor exception, and callback throw", e);
}
}
});
} catch (Exception e) {
runInThisThread = true;
PLOG.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("executeInvokeCallback Exception", e);
}
}
} else { } else {
responseFuture.putResponse(cmd); responseFuture.putResponse(cmd);
} }
...@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract { ...@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract {
} }
} }
//execute callback in callback executor. If callback executor is null, run directly in current thread
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("execute callback in executor exception, and callback throw", e);
}
}
});
} catch (Exception e) {
runInThisThread = true;
PLOG.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
PLOG.warn("executeInvokeCallback Exception", e);
}
}
}
public abstract RPCHook getRPCHook(); public abstract RPCHook getRPCHook();
abstract public ExecutorService getCallbackExecutor(); abstract public ExecutorService getCallbackExecutor();
...@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract { ...@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract {
for (ResponseFuture rf : rfList) { for (ResponseFuture rf : rfList) {
try { try {
rf.executeInvokeCallback(); executeInvokeCallback(rf);
} catch (Throwable e) { } catch (Throwable e) {
PLOG.warn("scanResponseTable, operationComplete Exception", e); PLOG.warn("scanResponseTable, operationComplete Exception", e);
} }
...@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract { ...@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract {
responseFuture.putResponse(null); responseFuture.putResponse(null);
responseTable.remove(opaque); responseTable.remove(opaque);
try { try {
responseFuture.executeInvokeCallback(); executeInvokeCallback(responseFuture);
} catch (Throwable e) { } catch (Throwable e) {
PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally { } finally {
......
...@@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; ...@@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
...@@ -51,6 +57,52 @@ public class NettyConnectionTest { ...@@ -51,6 +57,52 @@ public class NettyConnectionTest {
System.out.println("-----------------------------------------------------------------"); System.out.println("-----------------------------------------------------------------");
} }
@Test
public void test_async_timeout() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RemotingClient client = createRemotingClient();
final AtomicInteger ai = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(100);
for(int i=0;i<100;i++) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
@Override
public void operationComplete(ResponseFuture responseFuture) {
if (responseFuture.isTimeout()) {
if(ai.getAndIncrement()==4) {
try {
System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName());
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else{
System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName());
}
}
else{
System.out.println("Success."+Thread.currentThread().getName());
}
latch.countDown();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
latch.await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, latch.getCount());//only one should be blocked
client.shutdown();
System.out.println("-----------------------------------------------------------------");
}
public static RemotingClient createRemotingClient() { public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig(); NettyClientConfig config = new NettyClientConfig();
config.setClientChannelMaxIdleTimeSeconds(15); config.setClientChannelMaxIdleTimeSeconds(15);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册