提交 d7cafdf0 编写于 作者: S shenhui.backend

WIP: compatible with JDK1.6

上级 f28be821
...@@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final static int DLQ_NUMS_PER_GROUP = 1; protected final static int DLQ_NUMS_PER_GROUP = 1;
......
...@@ -118,6 +118,7 @@ import org.apache.rocketmq.filter.util.BitsArray; ...@@ -118,6 +118,7 @@ import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -132,7 +133,7 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -132,7 +133,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor { public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
......
...@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; ...@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
...@@ -77,6 +78,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -77,6 +78,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
@Override @Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback);
}
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext; final SendMessageContext mqtraceContext;
......
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
<name>rocketmq-remoting ${project.version}</name> <name>rocketmq-remoting ${project.version}</name>
<properties> <properties>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.6</maven.compiler.target>
</properties> </properties>
<dependencies> <dependencies>
......
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor {
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
RemotingCommand response = processRequest(ctx, request);
responseCallback.callback(response);
}
}
...@@ -29,7 +29,6 @@ import java.util.Iterator; ...@@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
...@@ -201,25 +200,28 @@ public abstract class NettyRemotingAbstract { ...@@ -201,25 +200,28 @@ public abstract class NettyRemotingAbstract {
public void run() { public void run() {
try { try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
CompletableFuture<RemotingCommand> responseFuture = pair.getObject1().asyncProcessRequest(ctx, cmd); AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
responseFuture.thenAccept((r) -> { final RemotingResponseCallback callback = new RemotingResponseCallback() {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, r); @Override
if (!cmd.isOnewayRPC()) { public void callback(RemotingCommand response) {
if (r != null) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
r.setOpaque(opaque); if (!cmd.isOnewayRPC()) {
r.markResponseType(); if (response != null) {
try { response.setOpaque(opaque);
ctx.writeAndFlush(r); response.markResponseType();
} catch (Throwable e) { try {
log.error("process request over, but response failed", e); ctx.writeAndFlush(response);
log.error(cmd.toString()); } catch (Throwable e) {
log.error(r.toString()); log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
} }
} else {
} }
} }
}); };
processor.asyncProcessRequest(ctx, cmd, callback);
} catch (Throwable e) { } catch (Throwable e) {
log.error("process request exception", e); log.error("process request exception", e);
log.error(cmd.toString()); log.error(cmd.toString());
......
...@@ -19,8 +19,6 @@ package org.apache.rocketmq.remoting.netty; ...@@ -19,8 +19,6 @@ package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.CompletableFuture;
/** /**
* Common remoting command processor * Common remoting command processor
*/ */
...@@ -30,8 +28,4 @@ public interface NettyRequestProcessor { ...@@ -30,8 +28,4 @@ public interface NettyRequestProcessor {
boolean rejectRequest(); boolean rejectRequest();
default CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception {
return CompletableFuture.completedFuture(processRequest(ctx, request));
}
} }
package org.apache.rocketmq.remoting.netty;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingResponseCallback {
void callback(RemotingCommand response);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册