From d7cafdf027cd6222db6a07cb1c09f09065fe8162 Mon Sep 17 00:00:00 2001 From: "shenhui.backend" Date: Mon, 14 Oct 2019 22:42:52 +0800 Subject: [PATCH] WIP: compatible with JDK1.6 --- .../AbstractSendMessageProcessor.java | 3 +- .../processor/AdminBrokerProcessor.java | 3 +- .../processor/SendMessageProcessor.java | 5 +++ remoting/pom.xml | 4 +-- .../netty/AsyncNettyRequestProcessor.java | 12 +++++++ .../remoting/netty/NettyRemotingAbstract.java | 36 ++++++++++--------- .../remoting/netty/NettyRequestProcessor.java | 6 ---- .../netty/RemotingResponseCallback.java | 7 ++++ 8 files changed, 49 insertions(+), 27 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index b0668d49..b2ae4641 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; -public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { +public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected final static int DLQ_NUMS_PER_GROUP = 1; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 76a051b9..16f944cc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -118,6 +118,7 @@ import org.apache.rocketmq.filter.util.BitsArray; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -132,7 +133,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class AdminBrokerProcessor implements NettyRequestProcessor { +public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 5e0e6f44..fa076aa9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; @@ -77,6 +78,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { + asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback); + } + public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; diff --git a/remoting/pom.xml b/remoting/pom.xml index ffbf7de3..61b480c1 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -28,8 +28,8 @@ rocketmq-remoting ${project.version} - 1.8 - 1.8 + 1.6 + 1.6 diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java new file mode 100644 index 00000000..453fbdbb --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java @@ -0,0 +1,12 @@ +package org.apache.rocketmq.remoting.netty; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor { + + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { + RemotingCommand response = processRequest(ctx, request); + responseCallback.callback(response); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 435e5b2f..e4a6e3b6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -29,7 +29,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -201,25 +200,28 @@ public abstract class NettyRemotingAbstract { public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - CompletableFuture responseFuture = pair.getObject1().asyncProcessRequest(ctx, cmd); - responseFuture.thenAccept((r) -> { - doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, r); - if (!cmd.isOnewayRPC()) { - if (r != null) { - r.setOpaque(opaque); - r.markResponseType(); - try { - ctx.writeAndFlush(r); - } catch (Throwable e) { - log.error("process request over, but response failed", e); - log.error(cmd.toString()); - log.error(r.toString()); + AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); + final RemotingResponseCallback callback = new RemotingResponseCallback() { + @Override + public void callback(RemotingCommand response) { + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + if (!cmd.isOnewayRPC()) { + if (response != null) { + response.setOpaque(opaque); + response.markResponseType(); + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + log.error("process request over, but response failed", e); + log.error(cmd.toString()); + log.error(response.toString()); + } + } else { } - } else { - } } - }); + }; + processor.asyncProcessRequest(ctx, cmd, callback); } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index 6fb744c1..48006899 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -19,8 +19,6 @@ package org.apache.rocketmq.remoting.netty; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.util.concurrent.CompletableFuture; - /** * Common remoting command processor */ @@ -30,8 +28,4 @@ public interface NettyRequestProcessor { boolean rejectRequest(); - default CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws Exception { - return CompletableFuture.completedFuture(processRequest(ctx, request)); - } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java new file mode 100644 index 00000000..8af14875 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java @@ -0,0 +1,7 @@ +package org.apache.rocketmq.remoting.netty; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public interface RemotingResponseCallback { + void callback(RemotingCommand response); +} -- GitLab