diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index b0668d49f87ef166da16d0b37090c0f102772e63..b2ae46418031966791554c38d4114720df3989ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; -public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { +public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected final static int DLQ_NUMS_PER_GROUP = 1; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 76a051b924c01bcdafd039d34c6c06f844f7063f..16f944cc8a0fa24e18ca2ecb9dc8c06a09022c00 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -118,6 +118,7 @@ import org.apache.rocketmq.filter.util.BitsArray; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -132,7 +133,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class AdminBrokerProcessor implements NettyRequestProcessor { +public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 5e0e6f44427ec9f44359b3aab11ca89b41fb6d8f..fa076aa98ea4852e5c0b26419640443a27c363ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; @@ -77,6 +78,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { + asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback); + } + public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; diff --git a/remoting/pom.xml b/remoting/pom.xml index ffbf7de3eadbeb1208ef5eae18946680a8504dfe..61b480c1c8a07664c3a35b02d11c86d84c4cd319 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -28,8 +28,8 @@ rocketmq-remoting ${project.version} - 1.8 - 1.8 + 1.6 + 1.6 diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..453fbdbbf8c38269879ac190c94fd09f86c70571 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java @@ -0,0 +1,12 @@ +package org.apache.rocketmq.remoting.netty; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor { + + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { + RemotingCommand response = processRequest(ctx, request); + responseCallback.callback(response); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 435e5b2ffd225b43bebab8f4c0d459245048d776..e4a6e3b6806c40c159a16e33725055913ada435f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -29,7 +29,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -201,25 +200,28 @@ public abstract class NettyRemotingAbstract { public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - CompletableFuture responseFuture = pair.getObject1().asyncProcessRequest(ctx, cmd); - responseFuture.thenAccept((r) -> { - doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, r); - if (!cmd.isOnewayRPC()) { - if (r != null) { - r.setOpaque(opaque); - r.markResponseType(); - try { - ctx.writeAndFlush(r); - } catch (Throwable e) { - log.error("process request over, but response failed", e); - log.error(cmd.toString()); - log.error(r.toString()); + AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); + final RemotingResponseCallback callback = new RemotingResponseCallback() { + @Override + public void callback(RemotingCommand response) { + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + if (!cmd.isOnewayRPC()) { + if (response != null) { + response.setOpaque(opaque); + response.markResponseType(); + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + log.error("process request over, but response failed", e); + log.error(cmd.toString()); + log.error(response.toString()); + } + } else { } - } else { - } } - }); + }; + processor.asyncProcessRequest(ctx, cmd, callback); } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index 6fb744c1e4e17c587cce479b43f2258d22632d89..48006899c395e827ac086199585073814ec361cc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -19,8 +19,6 @@ package org.apache.rocketmq.remoting.netty; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.util.concurrent.CompletableFuture; - /** * Common remoting command processor */ @@ -30,8 +28,4 @@ public interface NettyRequestProcessor { boolean rejectRequest(); - default CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws Exception { - return CompletableFuture.completedFuture(processRequest(ctx, request)); - } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java new file mode 100644 index 0000000000000000000000000000000000000000..8af148755dd84903fe4278ce2dcbdf1d5afccb4b --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java @@ -0,0 +1,7 @@ +package org.apache.rocketmq.remoting.netty; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public interface RemotingResponseCallback { + void callback(RemotingCommand response); +}