diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index e66cead4121c118a92093fe898ed97363b8a49fd..1db019bec810accdf844597dc2783910f05c1e1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -20,7 +20,9 @@ package org.apache.rocketmq.broker.plugin; import java.util.HashMap; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.CommitLogDispatcher; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.GetMessageResult; @@ -86,6 +88,16 @@ public abstract class AbstractPluginMessageStore implements MessageStore { return next.putMessage(msg); } + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + return next.asyncPutMessage(msg); + } + + @Override + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + return next.asyncPutMessages(messageExtBatch); + } + @Override public GetMessageResult getMessage(String group, String topic, int queueId, long offset, int maxMsgNums, final MessageFilter messageFilter) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index adf027993b5fa64b65f725d6e005b73da8435452..55b939201561f3645ebb5fff332b397c5d027ed4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -49,11 +49,12 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; -public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { +public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected final static int DLQ_NUMS_PER_GROUP = 1; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 2b077cbb13ac46f0f1bb0421f09f95d3c31cd31a..fb7aeceda9b1682e1562230d398309470db17064 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -122,6 +122,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -136,7 +137,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class AdminBrokerProcessor implements NettyRequestProcessor { +public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index 971237929ab0bbcca01237677881becee84d3c4c..aa7d0a3a2211d7fe03108cafdd6801f7b7d83e1a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -40,9 +40,10 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ClientManageProcessor implements NettyRequestProcessor { +public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 028d21bf9765a1750badc8323090faf811296f07..77317a6ffe7cea3d3e9c4e1d27a13f15cbec04ab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -34,10 +34,11 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ConsumerManageProcessor implements NettyRequestProcessor { +public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index 9844cae6c2335df858f3558a69aff15a1aef6562..41e7df307c9a56e2541ca172cf903efc1d957404 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -41,7 +42,7 @@ import org.apache.rocketmq.store.config.BrokerRole; /** * EndTransaction processor: process commit and rollback message */ -public class EndTransactionProcessor implements NettyRequestProcessor { +public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java index b0f0a0545529505eb38300460327d1d7eee122fa..cd935983465abf78e8694c5f612ce87344fce183 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java @@ -21,10 +21,11 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ForwardRequestProcessor implements NettyRequestProcessor { +public class ForwardRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index b4f6daa05eb8fa6a5474a57515e5145c08f91e5f..b02b5a058838a5eb6305d5f65cd856213536a805 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -57,6 +57,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -67,7 +68,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; -public class PullMessageProcessor implements NettyRequestProcessor { +public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; private List consumeMessageHookList; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index a5ca872a86194a877a1b24bad8dcd8fe44d483f3..0f8d64c8438506461c21e5c81c52494baf3b846b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -33,12 +33,13 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class QueryMessageProcessor implements NettyRequestProcessor { +public class QueryMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index f753ebba4bf4c84f690e06ce0ae5dcc8153c95a8..801d886c43be168a4bb37dabbf32d0900f97fe12 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.processor; import java.net.SocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; @@ -48,6 +50,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; @@ -65,28 +68,38 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - SendMessageContext mqtraceContext; + RemotingCommand response = null; + try { + response = asyncProcessRequest(ctx, request).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("process SendMessage error, request : " + request.toString(), e); + } + return response; + } + + @Override + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { + asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback); + } + + public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: - return this.consumerSendMsgBack(ctx, request); + return this.asyncConsumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { - return null; + return CompletableFuture.completedFuture(null); } - mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); - - RemotingCommand response; if (requestHeader.isBatch()) { - response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); + return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { - response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } - - this.executeSendMessageHookAfter(response, mqtraceContext); - return response; } } @@ -96,50 +109,38 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } - private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) - throws RemotingCommandException { + private CompletableFuture asyncConsumerSendMsgBack(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = - (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); - + (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup()); if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { - - ConsumeMessageContext context = new ConsumeMessageContext(); - context.setNamespace(namespace); - context.setConsumerGroup(requestHeader.getGroup()); - context.setTopic(requestHeader.getOriginTopic()); - context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); - context.setCommercialRcvTimes(1); - context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); - + ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request); this.executeConsumeMessageHookAfter(context); } - SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; + return CompletableFuture.completedFuture(response); } - if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); - return response; + return CompletableFuture.completedFuture(response); } if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); - return response; + return CompletableFuture.completedFuture(response); } String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); - int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); @@ -152,20 +153,19 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); - return response; + return CompletableFuture.completedFuture(response); } if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); - return response; + return CompletableFuture.completedFuture(response); } - MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); - return response; + return CompletableFuture.completedFuture(response); } final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); @@ -181,25 +181,23 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, - DLQ_NUMS_PER_GROUP, - PermName.PERM_WRITE, 0 - ); + DLQ_NUMS_PER_GROUP, + PermName.PERM_WRITE, 0); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); - return response; + return CompletableFuture.completedFuture(response); } } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } - msgExt.setDelayTimeLevel(delayLevel); } @@ -215,40 +213,103 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); - msgInner.setStoreHost(this.getStoreHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); + CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); + return putMessageResult.thenApply((r) -> { + if (r != null) { + switch (r.getPutMessageStatus()) { + case PUT_OK: + String backTopic = msgExt.getTopic(); + String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (correctTopic != null) { + backTopic = correctTopic; + } + this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + default: + break; + } + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(r.getPutMessageStatus().name()); + return response; + } + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("putMessageResult is null"); + return response; + }); + } - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - switch (putMessageResult.getPutMessageStatus()) { - case PUT_OK: - String backTopic = msgExt.getTopic(); - String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (correctTopic != null) { - backTopic = correctTopic; - } - this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, + SendMessageContext mqtraceContext, + SendMessageRequestHeader requestHeader) { + final RemotingCommand response = preSend(ctx, request, requestHeader); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); + if (response.getCode() != -1) { + return CompletableFuture.completedFuture(response); + } - return response; - default: - break; - } + final byte[] body = request.getBody(); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(putMessageResult.getPutMessageStatus().name()); - return response; + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("putMessageResult is null"); - return response; + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setQueueId(queueIdInt); + + if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { + return CompletableFuture.completedFuture(response); + } + + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + CompletableFuture putMessageResult = null; + Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); + String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (transFlag != null && Boolean.parseBoolean(transFlag)) { + if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark( + "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + + "] sending transaction message is forbidden"); + return CompletableFuture.completedFuture(response); + } + putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); + } else { + putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); + } + return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); + } + + private CompletableFuture handlePutMessageResultFuture(CompletableFuture putMessageResult, + RemotingCommand response, + RemotingCommand request, + MessageExt msgInner, + SendMessageResponseHeader responseHeader, + SendMessageContext sendMessageContext, + ChannelHandlerContext ctx, + int queueIdInt) { + return putMessageResult.thenApply((r) -> + handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt) + ); } private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, @@ -476,52 +537,29 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } - private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, - final RemotingCommand request, - final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { - - final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + private CompletableFuture asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request, + SendMessageContext mqtraceContext, + SendMessageRequestHeader requestHeader) { + final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); - response.setOpaque(request.getOpaque()); - - response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); - response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - - log.debug("Receive SendMessage request command {}", request); - - final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); - if (this.brokerController.getMessageStore().now() < startTimstamp) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); - return response; - } - - response.setCode(-1); - super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { - return response; + return CompletableFuture.completedFuture(response); } int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("message topic length too long " + requestHeader.getTopic().length()); - return response; + return CompletableFuture.completedFuture(response); } - if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); - return response; - } MessageExtBatch messageExtBatch = new MessageExtBatch(); messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setQueueId(queueIdInt); @@ -542,11 +580,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); - - return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); + CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch); + return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt); } + + public boolean hasConsumeMessageHook() { return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); } @@ -585,4 +624,49 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement public void registerConsumeMessageHook(List consumeMessageHookList) { this.consumeMessageHookList = consumeMessageHookList; } + + static private ConsumeMessageContext buildConsumeMessageContext(String namespace, + ConsumerSendMsgBackRequestHeader requestHeader, + RemotingCommand request) { + ConsumeMessageContext context = new ConsumeMessageContext(); + context.setNamespace(namespace); + context.setConsumerGroup(requestHeader.getGroup()); + context.setTopic(requestHeader.getOriginTopic()); + context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); + return context; + } + + private int randomQueueId(int writeQueueNums) { + return (this.random.nextInt() % 99999999) % writeQueueNums; + } + + private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, + SendMessageRequestHeader requestHeader) { + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + log.debug("Receive SendMessage request command {}", request); + + final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + + if (this.brokerController.getMessageStore().now() < startTimestamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + return response; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java index 143909fd972382c37029b594369a50aa2a97d790..c8eefd357b15273c37edf7b7bb71ef8d4880891e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import java.util.concurrent.CompletableFuture; public interface TransactionalMessageService { @@ -31,6 +32,14 @@ public interface TransactionalMessageService { */ PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); + /** + * Process prepare message in async manner, we should put this message to storage service + * + * @param messageInner Prepare(Half) message. + * @return CompletableFuture of put result, will be completed at put success(flush and replica done) + */ + CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner); + /** * Delete prepare message when this message has been committed or rolled back. * diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index 67f7a5f5628a0030dc2ffe733e84821eb03f449d..012b9def0d085e0db500e77c4423d4545be46b3f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public class TransactionalMessageBridge { @@ -195,6 +196,10 @@ public class TransactionalMessageBridge { return store.putMessage(parseHalfMessageInner(messageInner)); } + public CompletableFuture asyncPutHalfMessage(MessageExtBrokerInner messageInner) { + return store.asyncPutMessage(parseHalfMessageInner(messageInner)); + } + private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index f7ba2e38a47fa09463faf1fd2b69f1bcfb2c6bab..25065ebe885f5644aa128f96e09a6583ad0ede65 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -40,6 +40,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public class TransactionalMessageServiceImpl implements TransactionalMessageService { @@ -59,6 +60,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ private ConcurrentHashMap opQueueMap = new ConcurrentHashMap<>(); + @Override + public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) { + return transactionalMessageBridge.asyncPutHalfMessage(messageInner); + } + @Override public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 792fd0fa2c33b2515e9064e0482f755df0b76791..bdf13d4a7faa7334861df540d9e5910996ad4b0a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -56,6 +56,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -93,13 +94,15 @@ public class SendMessageProcessorTest { @Test public void testProcessRequest() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); assertPutResult(ResponseCode.SUCCESS); } @Test public void testProcessRequest_WithHook() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); List sendMessageHookList = new ArrayList<>(); final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; SendMessageHook sendMessageHook = new SendMessageHook() { @@ -129,55 +132,64 @@ public class SendMessageProcessorTest { @Test public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT); } @Test public void testProcessRequest_MessageIllegal() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.MESSAGE_ILLEGAL); } @Test public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SYSTEM_ERROR); } @Test public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT); } @Test public void testProcessRequest_PageCacheBusy() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SYSTEM_ERROR); } @Test public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.MESSAGE_ILLEGAL); } @Test public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE); } @Test public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE); } @Test public void testProcessRequest_WithMsgBack() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); sendMessageProcessor = new SendMessageProcessor(brokerController); @@ -189,7 +201,8 @@ public class SendMessageProcessorTest { @Test public void testProcessRequest_Transaction() throws RemotingCommandException { brokerController.setTransactionalMessageService(transactionMsgService); - when(brokerController.getTransactionalMessageService().prepareMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(brokerController.getTransactionalMessageService().asyncPrepareMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE); final RemotingCommand[] response = new RemotingCommand[1]; doAnswer(new Answer() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java index ebe887285f0ff63fd81c741523a6f1820017fccc..5d8c2b955d6cadac46eb8302a7d79bcaeb6951fa 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java @@ -47,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -87,6 +88,14 @@ public class TransactionalMessageBridgeTest { assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); } + @Test + public void testAsyncPutHalfMessage() throws Exception { + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); + CompletableFuture result = transactionBridge.asyncPutHalfMessage(createMessageBrokerInner()); + assertThat(result.get().getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); + } + @Test public void testFetchMessageQueues() { Set messageQueues = transactionBridge.fetchMessageQueues(MixAll.RMQ_SYS_TRANS_HALF_TOPIC); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java index 3fedf7722ee09fa1305b45018fb910072886c05a..281bd57bfef0224d9b0887d9ddb4aae3923366c1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import java.util.concurrent.CompletableFuture; + public class TransactionalMessageServiceImpl implements TransactionalMessageService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); @@ -35,6 +37,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ return null; } + @Override + public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) { + return null; + } + @Override public boolean deletePrepareMessage(MessageExt messageExt) { return false; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 5861bc4baa18405eded9e4c4da35ad00eb145643..3ef60af1799a9c91664581f8b596bb4046c6da1b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -52,10 +52,11 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ClientRemotingProcessor implements NettyRequestProcessor { +public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 467078c44f84f220ef1860fa2cfc7ea3b5cd59fc..7210246edc4344a760bb22e1eb799280b232b541 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -54,10 +54,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class DefaultRequestProcessor implements NettyRequestProcessor { +public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); protected final NamesrvController namesrvController; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..db333f833836a627bf49573b246a9530416f9483 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor { + + public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { + RemotingCommand response = processRequest(ctx, request); + responseCallback.callback(response); + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index d190e00f44f6d6e4b736bedce0e391db70db6a84..23376727ced2abcd0d51acc6b6a9f3b12d1ec962 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -36,6 +36,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; @@ -102,7 +103,6 @@ public abstract class NettyRemotingAbstract { protected List rpcHooks = new ArrayList(); - static { NettyLogger.initNettyLogger(); } @@ -200,23 +200,34 @@ public abstract class NettyRemotingAbstract { public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); - doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); - - if (!cmd.isOnewayRPC()) { - if (response != null) { - response.setOpaque(opaque); - response.markResponseType(); - try { - ctx.writeAndFlush(response); - } catch (Throwable e) { - log.error("process request over, but response failed", e); - log.error(cmd.toString()); - log.error(response.toString()); + final RemotingResponseCallback callback = new RemotingResponseCallback() { + @Override + public void callback(RemotingCommand response) { + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + if (!cmd.isOnewayRPC()) { + if (response != null) { + response.setOpaque(opaque); + response.markResponseType(); + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + log.error("process request over, but response failed", e); + log.error(cmd.toString()); + log.error(response.toString()); + } + } else { + } } - } else { - } + }; + if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { + AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); + processor.asyncProcessRequest(ctx, cmd, callback); + } else { + NettyRequestProcessor processor = pair.getObject1(); + RemotingCommand response = processor.processRequest(ctx, cmd); + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index 040f7684883dc6cf3f35ef69b8d61deecca2cf56..48006899c395e827ac086199585073814ec361cc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -27,4 +27,5 @@ public interface NettyRequestProcessor { throws Exception; boolean rejectRequest(); + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java new file mode 100644 index 0000000000000000000000000000000000000000..7185f20d1d60e198b1da9c5e8a8f34787bab070b --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingResponseCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public interface RemotingResponseCallback { + void callback(RemotingCommand response); +} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java index 0ecfaaa5aa007c661637b80e14a3a00ffb10aa06..e378a7bf1a69bb1e0c231081c003f2e12973ded9 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java @@ -26,12 +26,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; -import org.apache.rocketmq.remoting.netty.NettyRemotingServer; -import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.netty.*; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.AfterClass; @@ -48,7 +43,7 @@ public class RemotingServerTest { public static RemotingServer createRemotingServer() throws InterruptedException { NettyServerConfig config = new NettyServerConfig(); RemotingServer remotingServer = new NettyRemotingServer(config); - remotingServer.registerProcessor(0, new NettyRequestProcessor() { + remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { request.setRemark("Hi " + ctx.channel().remoteAddress()); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 352585cbb5fb91c62aff9cfc002de7fde81404d7..82bd6706c929365afc2f47f138c2472dee2d4a7f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -23,8 +23,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -63,6 +66,7 @@ public class CommitLog { protected volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; + protected final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { @@ -550,6 +554,228 @@ public class CommitLog { return beginTimeInLock; } + public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { + // Set the storage time + msg.setStoreTimestamp(System.currentTimeMillis()); + // Set the message body BODY CRC (consider the most appropriate setting + // on the client) + msg.setBodyCRC(UtilAll.crc32(msg.getBody())); + // Back to Results + AppendMessageResult result = null; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + String topic = msg.getTopic(); + int queueId = msg.getQueueId(); + + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE + || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + // Delay Delivery + if (msg.getDelayTimeLevel() > 0) { + if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { + msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); + } + + topic = ScheduleMessageService.SCHEDULE_TOPIC; + queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); + + // Backup real topic, queueId + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + msg.setTopic(topic); + msg.setQueueId(queueId); + } + } + + long elapsedTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + msg.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); + } + + result = mappedFile.appendMessage(msg, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); + } + result = mappedFile.appendMessage(msg, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + case UNKNOWN_ERROR: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + default: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); + storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); + + CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); + CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); + return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { + if (flushStatus != PutMessageStatus.PUT_OK) { + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + } + if (replicaStatus != PutMessageStatus.PUT_OK) { + putMessageResult.setPutMessageStatus(replicaStatus); + } + return putMessageResult; + }); + } + + public CompletableFuture asyncPutMessages(final MessageExtBatch messageExtBatch) { + messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); + AppendMessageResult result; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); + + if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + } + if (messageExtBatch.getDelayTimeLevel() > 0) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + } + + long elapsedTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + //fine-grained lock instead of the coarse-grained + MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + + putMessageLock.lock(); + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + messageExtBatch.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); + } + + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); + } + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + case UNKNOWN_ERROR: + default: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); + + CompletableFuture flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch); + CompletableFuture replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch); + return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> { + if (flushStatus != PutMessageStatus.PUT_OK) { + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + } + + if (replicaStatus != PutMessageStatus.PUT_OK) { + putMessageResult.setPutMessageStatus(replicaStatus); + } + return putMessageResult; + }); + + } + public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); @@ -673,6 +899,53 @@ public class CommitLog { return putMessageResult; } + public CompletableFuture submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, + MessageExt messageExt) { + // Synchronization flush + if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { + final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; + if (messageExt.isWaitStoreMsgOK()) { + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), + this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + service.putRequest(request); + return request.future(); + } else { + service.wakeup(); + return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); + } + } + // Asynchronous flush + else { + if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + flushCommitLogService.wakeup(); + } else { + commitLogService.wakeup(); + } + return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); + } + } + + public CompletableFuture submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, + MessageExt messageExt) { + if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { + HAService service = this.defaultMessageStore.getHaService(); + if (messageExt.isWaitStoreMsgOK()) { + if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) { + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), + this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + service.putRequest(request); + service.getWaitNotifyObject().wakeupAll(); + return request.future(); + } + else { + return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE); + } + } + } + return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); + } + + public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -680,8 +953,15 @@ public class CommitLog { if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); - boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); - if (!flushOK) { + CompletableFuture flushOkFuture = request.future(); + PutMessageStatus flushStatus = null; + try { + flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + //flushOK=false; + } + if (flushStatus != PutMessageStatus.PUT_OK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); @@ -709,9 +989,13 @@ public class CommitLog { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); - boolean flushOK = - request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); - if (!flushOK) { + PutMessageStatus replicaStatus = null; + try { + replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + } + if (replicaStatus != PutMessageStatus.PUT_OK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); @@ -1081,31 +1365,35 @@ public class CommitLog { public static class GroupCommitRequest { private final long nextOffset; - private final CountDownLatch countDownLatch = new CountDownLatch(1); - private volatile boolean flushOK = false; + private CompletableFuture flushOKFuture = new CompletableFuture<>(); + private final long startTimestamp = System.currentTimeMillis(); + private long timeoutMillis = Long.MAX_VALUE; + + public GroupCommitRequest(long nextOffset, long timeoutMillis) { + this.nextOffset = nextOffset; + this.timeoutMillis = timeoutMillis; + } public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } + public long getNextOffset() { return nextOffset; } public void wakeupCustomer(final boolean flushOK) { - this.flushOK = flushOK; - this.countDownLatch.countDown(); + long endTimestamp = System.currentTimeMillis(); + PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ? + PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT; + this.flushOKFuture.complete(result); } - public boolean waitForFlush(long timeout) { - try { - this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return this.flushOK; - } catch (InterruptedException e) { - log.error("Interrupted", e); - return false; - } + public CompletableFuture future() { + return flushOKFuture; } + } /** diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index d5ba5692a928e4b65b41ec94be0356b0c81dd1c9..f59b5eb98ba59c4f026155265a53e001a08f6b14 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; @@ -351,109 +352,170 @@ public class DefaultMessageStore implements MessageStore { } } - public PutMessageResult putMessage(MessageExtBrokerInner msg) { + private PutMessageStatus checkMessage(MessageExtBrokerInner msg) { + if (msg.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + msg.getTopic().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + + if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + return PutMessageStatus.PUT_OK; + } + + private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) { + if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + + if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { + log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + + return PutMessageStatus.PUT_OK; + } + + private PutMessageStatus checkStoreStatus() { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { - log.warn("message store is slave mode, so putMessage is forbidden "); + log.warn("message store has shutdown, so putMessage is forbidden"); } - - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { - log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); + log.warn("message store has shutdown, so putMessage is forbidden"); } - - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this.printTimes.set(0); } - if (msg.getTopic().length() > Byte.MAX_VALUE) { - log.warn("putMessage message topic length too long " + msg.getTopic().length()); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + if (this.isOSPageCacheBusy()) { + return PutMessageStatus.OS_PAGECACHE_BUSY; } + return PutMessageStatus.PUT_OK; + } - if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); - return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } - if (this.isOSPageCacheBusy()) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + PutMessageStatus msgCheckStatus = this.checkMessage(msg); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessage(msg); + CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); - long elapsedTime = this.getSystemClock().now() - beginTime; - if (elapsedTime > 500) { - log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); - } - this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); + putResultFuture.thenAccept((result) -> { + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); + } + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); - } + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + }); - return result; + return putResultFuture; } - public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { - if (this.shutdown) { - log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } - if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { - long value = this.printTimes.getAndIncrement(); - if ((value % 50000) == 0) { - log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); + PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); + } + + long beginTime = this.getSystemClock().now(); + CompletableFuture resultFuture = this.commitLog.asyncPutMessages(messageExtBatch); + + resultFuture.thenAccept((result) -> { + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); - } + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - if (!this.runningFlags.isWriteable()) { - long value = this.printTimes.getAndIncrement(); - if ((value % 50000) == 0) { - log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits()); + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } + }); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); - } else { - this.printTimes.set(0); + return resultFuture; + } + + @Override + public PutMessageResult putMessage(MessageExtBrokerInner msg) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return new PutMessageResult(checkStoreStatus, null); } - if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { - log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + PutMessageStatus msgCheckStatus = this.checkMessage(msg); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return new PutMessageResult(msgCheckStatus, null); } - if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { - log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + long beginTime = this.getSystemClock().now(); + PutMessageResult result = this.commitLog.putMessage(msg); + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } - if (this.isOSPageCacheBusy()) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); + + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + + return result; + } + + @Override + public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return new PutMessageResult(checkStoreStatus, null); + } + + PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return new PutMessageResult(msgCheckStatus, null); } long beginTime = this.getSystemClock().now(); PutMessageResult result = this.commitLog.putMessages(messageExtBatch); - long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 5a046ff181f747624f1bb7c427541dfba28509e3..64eb5250de6e59f7ce3ef19033cd40b5d5f0254f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.store; import java.util.HashMap; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.config.BrokerRole; @@ -53,6 +55,26 @@ public interface MessageStore { */ void destroy(); + /** Store a message into store in async manner, the processor can process the next request + * rather than wait for result + * when result is completed, notify the client in async manner + * + * @param msg MessageInstance to store + * @return a CompletableFuture for the result of store operation + */ + default CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { + return CompletableFuture.completedFuture(putMessage(msg)); + } + + /** + * Store a batch of messages in async manner + * @param messageExtBatch the message batch + * @return a CompletableFuture for the result of store operation + */ + default CompletableFuture asyncPutMessages(final MessageExtBatch messageExtBatch) { + return CompletableFuture.completedFuture(putMessages(messageExtBatch)); + } + /** * Store a message into store. * diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 13da48bfe9f08408c57212ea506b674939bbb3ac..3d748bf8900b5c92ae35cc8b9513109519ce8a86 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dledger.utils.DLedgerUtils; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; @@ -489,6 +490,16 @@ public class DLedgerCommitLog extends CommitLog { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + return CompletableFuture.completedFuture(this.putMessage(msg)); + } + + @Override + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + return CompletableFuture.completedFuture(putMessages(messageExtBatch)); + } + @Override public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 64495c33987b32d42e1a3c3adf813a6e958068e9..a2702a0f67afcda651198c9748ab9b40a04b6af7 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.store; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -34,12 +36,13 @@ import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * HATest @@ -119,6 +122,37 @@ public class HATest { } } + @Test + public void testSemiSyncReplica() throws Exception { + long totalMsgs = 5; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + for (long i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner msg = buildMessage(); + CompletableFuture putResultFuture = messageStore.asyncPutMessage(msg); + PutMessageResult result = putResultFuture.get(); + assertEquals(PutMessageStatus.PUT_OK, result.getPutMessageStatus()); + //message has been replicated to slave's commitLog, but maybe not dispatch to ConsumeQueue yet + //so direct read from commitLog by physical offset + MessageExt slaveMsg = slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset()); + assertNotNull(slaveMsg); + assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody())); + assertEquals(msg.getTopic(), slaveMsg.getTopic()); + assertEquals(msg.getTags(), slaveMsg.getTags()); + assertEquals(msg.getKeys(), slaveMsg.getKeys()); + } + + //shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT + slaveMessageStore.shutdown(); + //wait to let master clean the slave's connection + Thread.sleep(masterMessageStoreConfig.getHaHousekeepingInterval() + 500); + for (long i = 0; i < totalMsgs; i++) { + CompletableFuture putResultFuture = messageStore.asyncPutMessage(buildMessage()); + PutMessageResult result = putResultFuture.get(); + assertEquals(PutMessageStatus.SLAVE_NOT_AVAILABLE, result.getPutMessageStatus()); + } + } + @After public void destroy() throws Exception{ Thread.sleep(5000L); @@ -156,6 +190,7 @@ public class HATest { msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(StoreHost); msg.setBornHost(BornHost); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); return msg; } diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQTransactionalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQTransactionalProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..dcc76b2d844e303096a25c5869f4ec9851e3eaa9 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQTransactionalProducer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.rmq; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; +import org.apache.rocketmq.test.sendresult.ResultWrapper; + +public class RMQTransactionalProducer extends AbstractMQProducer { + private static Logger logger = Logger.getLogger(RMQTransactionalProducer.class); + private TransactionMQProducer producer = null; + private String nsAddr = null; + + public RMQTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) { + this(nsAddr, topic, false, transactionListener); + } + + public RMQTransactionalProducer(String nsAddr, String topic, boolean useTLS, TransactionListener transactionListener) { + super(topic); + this.nsAddr = nsAddr; + create(useTLS, transactionListener); + start(); + } + + protected void create(boolean useTLS, TransactionListener transactionListener) { + producer = new TransactionMQProducer(); + producer.setProducerGroup(getProducerGroupName()); + producer.setInstanceName(getProducerInstanceName()); + producer.setTransactionListener(transactionListener); + producer.setUseTLS(useTLS); + + if (nsAddr != null) { + producer.setNamesrvAddr(nsAddr); + } + } + + public void start() { + try { + producer.start(); + super.setStartSuccess(true); + } catch (MQClientException e) { + super.setStartSuccess(false); + logger.error(e); + e.printStackTrace(); + } + } + + @Override + public ResultWrapper send(Object msg, Object arg) { + boolean commitMsg = ((Pair) arg).getObject2() == LocalTransactionState.COMMIT_MESSAGE; + org.apache.rocketmq.client.producer.SendResult metaqResult = null; + Message message = (Message) msg; + try { + long start = System.currentTimeMillis(); + metaqResult = producer.sendMessageInTransaction(message, arg); + this.msgRTs.addData(System.currentTimeMillis() - start); + if (isDebug) { + logger.info(metaqResult); + } + sendResult.setMsgId(metaqResult.getMsgId()); + sendResult.setSendResult(true); + sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); + if (commitMsg) { + msgBodys.addData(new String(message.getBody())); + } + originMsgs.addData(msg); + originMsgIndex.put(new String(message.getBody()), metaqResult); + } catch (MQClientException e) { + if (isDebug) { + e.printStackTrace(); + } + + sendResult.setSendResult(false); + sendResult.setSendException(e); + errorMsgs.addData(msg); + } + return sendResult; + } + + @Override + public void shutdown() { + producer.shutdown(); + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 45c6750dfdf98a370913f910ca523ea02122b201..c6a835fd67f0bf990e64da2049f3c2468adc1ac5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -22,12 +22,14 @@ import java.util.List; import org.apache.log4j.Logger; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer; import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; import org.apache.rocketmq.test.factory.ConsumerFactory; @@ -96,6 +98,15 @@ public class BaseConf { return producer; } + public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) { + RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener); + if (debug) { + producer.setDebug(); + } + mqClients.add(producer); + return producer; + } + public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup, String instanceName) { RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/transaction/TransactionalMsgIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/transaction/TransactionalMsgIT.java new file mode 100644 index 0000000000000000000000000000000000000000..b5f46c298732e31ce1bee59b17e62a200f96eee9 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/transaction/TransactionalMsgIT.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.transaction; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener; +import org.apache.rocketmq.test.util.MQWait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static com.google.common.truth.Truth.assertThat; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class TransactionalMsgIT extends BaseConf { + private static Logger logger = Logger.getLogger(TransactionalMsgIT.class); + private RMQTransactionalProducer producer = null; + private RMQNormalConsumer consumer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getTransactionalProducer(nsAddr, topic, new TransactionListenerImpl()); + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); + } + + @After + public void tearDown() { + super.shutdown(); + } + + @Test + public void testMessageVisibility() throws Exception { + Thread.sleep(3000); + int msgSize = 120; + List msgs = MQMessageFactory.getMsg(topic, msgSize); + for (int i = 0; i < msgSize; i++) { + producer.send(msgs.get(i), getTransactionHandle(i)); + } + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer.getListener()); + assertThat(recvAll).isEqualTo(true); + } + + static Pair getTransactionHandle(int msgIndex) { + switch (msgIndex % 5) { + case 0: + //commit immediately + return new Pair<>(true, LocalTransactionState.COMMIT_MESSAGE); + case 1: + //rollback immediately + return new Pair<>(true, LocalTransactionState.ROLLBACK_MESSAGE); + case 2: + //commit in check + return new Pair<>(false, LocalTransactionState.COMMIT_MESSAGE); + case 3: + //rollback in check + return new Pair<>(false, LocalTransactionState.ROLLBACK_MESSAGE); + case 4: + default: + return new Pair<>(false, LocalTransactionState.UNKNOW); + + } + } + + static private class TransactionListenerImpl implements TransactionListener { + ConcurrentHashMap checkStatus = new ConcurrentHashMap<>(); + + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + Pair transactionHandle = (Pair) arg; + if (transactionHandle.getObject1()) { + return transactionHandle.getObject2(); + } else { + checkStatus.put(msg.getTransactionId(), transactionHandle.getObject2()); + return LocalTransactionState.UNKNOW; + } + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + LocalTransactionState state = checkStatus.get(msg.getTransactionId()); + if (state == null) { + return LocalTransactionState.UNKNOW; + } else { + return state; + } + } + } +}