diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index e66cead4121c118a92093fe898ed97363b8a49fd..1db019bec810accdf844597dc2783910f05c1e1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -20,7 +20,9 @@ package org.apache.rocketmq.broker.plugin; import java.util.HashMap; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.CommitLogDispatcher; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.GetMessageResult; @@ -86,6 +88,16 @@ public abstract class AbstractPluginMessageStore implements MessageStore { return next.putMessage(msg); } + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + return next.asyncPutMessage(msg); + } + + @Override + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + return next.asyncPutMessages(messageExtBatch); + } + @Override public GetMessageResult getMessage(String group, String topic, int queueId, long offset, int maxMsgNums, final MessageFilter messageFilter) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 8035ae6f185b5c91db73eb0ceea5e0a26f43b3f3..5e0e6f44427ec9f44359b3aab11ca89b41fb6d8f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.processor; import java.net.SocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; @@ -65,28 +67,34 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - SendMessageContext mqtraceContext; + RemotingCommand response = null; + try { + response = asyncProcessRequest(ctx, request).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("process SendMessage error, request : " + request.toString(), e); + } + return response; + } + + @Override + public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: - return this.consumerSendMsgBack(ctx, request); + return this.asyncConsumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { - return null; + return CompletableFuture.completedFuture(null); } - mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); - - RemotingCommand response; if (requestHeader.isBatch()) { - response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); + return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { - response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); + return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } - - this.executeSendMessageHookAfter(response, mqtraceContext); - return response; } } @@ -96,50 +104,38 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } - private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) - throws RemotingCommandException { + private CompletableFuture asyncConsumerSendMsgBack(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = - (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); - + (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup()); if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { - - ConsumeMessageContext context = new ConsumeMessageContext(); - context.setNamespace(namespace); - context.setConsumerGroup(requestHeader.getGroup()); - context.setTopic(requestHeader.getOriginTopic()); - context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); - context.setCommercialRcvTimes(1); - context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); - + ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request); this.executeConsumeMessageHookAfter(context); } - SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); - return response; + return CompletableFuture.completedFuture(response); } - if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); - return response; + return CompletableFuture.completedFuture(response); } if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); - return response; + return CompletableFuture.completedFuture(response); } String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); - int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); @@ -152,20 +148,19 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); - return response; + return CompletableFuture.completedFuture(response); } if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); - return response; + return CompletableFuture.completedFuture(response); } - MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); - return response; + return CompletableFuture.completedFuture(response); } final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); @@ -181,25 +176,23 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, - DLQ_NUMS_PER_GROUP, - PermName.PERM_WRITE, 0 - ); + DLQ_NUMS_PER_GROUP, + PermName.PERM_WRITE, 0); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); - return response; + return CompletableFuture.completedFuture(response); } } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } - msgExt.setDelayTimeLevel(delayLevel); } @@ -215,40 +208,102 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); - msgInner.setStoreHost(this.getStoreHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); + CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); + return putMessageResult.thenApply((r) -> { + if (r != null) { + switch (r.getPutMessageStatus()) { + case PUT_OK: + String backTopic = msgExt.getTopic(); + String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (correctTopic != null) { + backTopic = correctTopic; + } + this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + default: + break; + } + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(r.getPutMessageStatus().name()); + return response; + } + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("putMessageResult is null"); + return response; + }); + } - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - if (putMessageResult != null) { - switch (putMessageResult.getPutMessageStatus()) { - case PUT_OK: - String backTopic = msgExt.getTopic(); - String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); - if (correctTopic != null) { - backTopic = correctTopic; - } - this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); + private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, + SendMessageContext mqtraceContext, + SendMessageRequestHeader requestHeader) { + final RemotingCommand response = preSend(ctx, request, requestHeader); + final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); + if (response.getCode() != -1) { + return CompletableFuture.completedFuture(response); + } - return response; - default: - break; - } + final byte[] body = request.getBody(); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(putMessageResult.getPutMessageStatus().name()); - return response; + int queueIdInt = requestHeader.getQueueId(); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + + if (queueIdInt < 0) { + queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("putMessageResult is null"); - return response; + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setQueueId(queueIdInt); + + if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { + return CompletableFuture.completedFuture(response); + } + + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + CompletableFuture putMessageResult = null; + Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); + String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); + if (transFlag != null && Boolean.parseBoolean(transFlag)) { + if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark( + "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + + "] sending transaction message is forbidden"); + return CompletableFuture.completedFuture(response); + } + putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); + } else { + putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); + } + return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); + } + + private CompletableFuture handlePutMessageResultFuture(CompletableFuture putMessageResult, + RemotingCommand response, + RemotingCommand request, + MessageExt msgInner, + SendMessageResponseHeader responseHeader, + SendMessageContext sendMessageContext, + ChannelHandlerContext ctx, + int queueIdInt) { + return putMessageResult.thenApply((r) -> + handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt) + ); } private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, @@ -295,79 +350,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return true; } - private RemotingCommand sendMessage(final ChannelHandlerContext ctx, - final RemotingCommand request, - final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { - - final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); - final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); - - response.setOpaque(request.getOpaque()); - - response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); - response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - - log.debug("receive SendMessage request command, {}", request); - - final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); - if (this.brokerController.getMessageStore().now() < startTimstamp) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); - return response; - } - - response.setCode(-1); - super.msgCheck(ctx, requestHeader, response); - if (response.getCode() != -1) { - return response; - } - - final byte[] body = request.getBody(); - - int queueIdInt = requestHeader.getQueueId(); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); - - if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); - } - - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(requestHeader.getTopic()); - msgInner.setQueueId(queueIdInt); - - if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { - return response; - } - - msgInner.setBody(body); - msgInner.setFlag(requestHeader.getFlag()); - MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); - msgInner.setPropertiesString(requestHeader.getProperties()); - msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); - msgInner.setBornHost(ctx.channel().remoteAddress()); - msgInner.setStoreHost(this.getStoreHost()); - msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); - PutMessageResult putMessageResult = null; - Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); - String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); - if (traFlag != null && Boolean.parseBoolean(traFlag)) { - if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark( - "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() - + "] sending transaction message is forbidden"); - return response; - } - putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); - } else { - putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); - } - - return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); - - } - private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, @@ -473,52 +455,29 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } - private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, - final RemotingCommand request, - final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { - - final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + private CompletableFuture asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request, + SendMessageContext mqtraceContext, + SendMessageRequestHeader requestHeader) { + final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); - response.setOpaque(request.getOpaque()); - - response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); - response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - - log.debug("Receive SendMessage request command {}", request); - - final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); - if (this.brokerController.getMessageStore().now() < startTimstamp) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); - return response; - } - - response.setCode(-1); - super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { - return response; + return CompletableFuture.completedFuture(response); } int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("message topic length too long " + requestHeader.getTopic().length()); - return response; + return CompletableFuture.completedFuture(response); } - if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); - return response; - } MessageExtBatch messageExtBatch = new MessageExtBatch(); messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setQueueId(queueIdInt); @@ -537,11 +496,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); - - return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt); + CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch); + return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt); } + + public boolean hasConsumeMessageHook() { return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); } @@ -580,4 +540,49 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement public void registerConsumeMessageHook(List consumeMessageHookList) { this.consumeMessageHookList = consumeMessageHookList; } + + static private ConsumeMessageContext buildConsumeMessageContext(String namespace, + ConsumerSendMsgBackRequestHeader requestHeader, + RemotingCommand request) { + ConsumeMessageContext context = new ConsumeMessageContext(); + context.setNamespace(namespace); + context.setConsumerGroup(requestHeader.getGroup()); + context.setTopic(requestHeader.getOriginTopic()); + context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK); + context.setCommercialRcvTimes(1); + context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER)); + return context; + } + + private int randomQueueId(int writeQueueNums) { + return (this.random.nextInt() % 99999999) % writeQueueNums; + } + + private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, + SendMessageRequestHeader requestHeader) { + final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); + + response.setOpaque(request.getOpaque()); + + response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + + log.debug("Receive SendMessage request command {}", request); + + final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + + if (this.brokerController.getMessageStore().now() < startTimestamp) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp))); + return response; + } + + response.setCode(-1); + super.msgCheck(ctx, requestHeader, response); + if (response.getCode() != -1) { + return response; + } + + return response; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java index 143909fd972382c37029b594369a50aa2a97d790..c8eefd357b15273c37edf7b7bb71ef8d4880891e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import java.util.concurrent.CompletableFuture; public interface TransactionalMessageService { @@ -31,6 +32,14 @@ public interface TransactionalMessageService { */ PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); + /** + * Process prepare message in async manner, we should put this message to storage service + * + * @param messageInner Prepare(Half) message. + * @return CompletableFuture of put result, will be completed at put success(flush and replica done) + */ + CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner); + /** * Delete prepare message when this message has been committed or rolled back. * diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index 84a62761bd61a084723abeaf37fedeeb5213735c..3967ff325ee08829d712c5b03c06d98de349c11b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public class TransactionalMessageBridge { @@ -190,6 +191,10 @@ public class TransactionalMessageBridge { return store.putMessage(parseHalfMessageInner(messageInner)); } + public CompletableFuture asyncPutHalfMessage(MessageExtBrokerInner messageInner) { + return store.asyncPutMessage(parseHalfMessageInner(messageInner)); + } + private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index e1549b15177e0a2de5fb4160d31d13aee9798f0e..183ce4705c644b8887ed1fd998db3b27327540df 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -40,6 +40,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public class TransactionalMessageServiceImpl implements TransactionalMessageService { @@ -59,6 +60,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ private ConcurrentHashMap opQueueMap = new ConcurrentHashMap<>(); + @Override + public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) { + return transactionalMessageBridge.asyncPutHalfMessage(messageInner); + } + @Override public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 792fd0fa2c33b2515e9064e0482f755df0b76791..bdf13d4a7faa7334861df540d9e5910996ad4b0a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -56,6 +56,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -93,13 +94,15 @@ public class SendMessageProcessorTest { @Test public void testProcessRequest() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); assertPutResult(ResponseCode.SUCCESS); } @Test public void testProcessRequest_WithHook() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); List sendMessageHookList = new ArrayList<>(); final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; SendMessageHook sendMessageHook = new SendMessageHook() { @@ -129,55 +132,64 @@ public class SendMessageProcessorTest { @Test public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT); } @Test public void testProcessRequest_MessageIllegal() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.MESSAGE_ILLEGAL); } @Test public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SYSTEM_ERROR); } @Test public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT); } @Test public void testProcessRequest_PageCacheBusy() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SYSTEM_ERROR); } @Test public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.MESSAGE_ILLEGAL); } @Test public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE); } @Test public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)))); assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE); } @Test public void testProcessRequest_WithMsgBack() throws RemotingCommandException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); sendMessageProcessor = new SendMessageProcessor(brokerController); @@ -189,7 +201,8 @@ public class SendMessageProcessorTest { @Test public void testProcessRequest_Transaction() throws RemotingCommandException { brokerController.setTransactionalMessageService(transactionMsgService); - when(brokerController.getTransactionalMessageService().prepareMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(brokerController.getTransactionalMessageService().asyncPrepareMessage(any(MessageExtBrokerInner.class))) + .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE); final RemotingCommand[] response = new RemotingCommand[1]; doAnswer(new Answer() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java index 3fedf7722ee09fa1305b45018fb910072886c05a..281bd57bfef0224d9b0887d9ddb4aae3923366c1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import java.util.concurrent.CompletableFuture; + public class TransactionalMessageServiceImpl implements TransactionalMessageService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); @@ -35,6 +37,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ return null; } + @Override + public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) { + return null; + } + @Override public boolean deletePrepareMessage(MessageExt messageExt) { return false; diff --git a/remoting/pom.xml b/remoting/pom.xml index 61b480c1c8a07664c3a35b02d11c86d84c4cd319..ffbf7de3eadbeb1208ef5eae18946680a8504dfe 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -28,8 +28,8 @@ rocketmq-remoting ${project.version} - 1.6 - 1.6 + 1.8 + 1.8 diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index d190e00f44f6d6e4b736bedce0e391db70db6a84..435e5b2ffd225b43bebab8f4c0d459245048d776 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; + import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; @@ -102,7 +104,6 @@ public abstract class NettyRemotingAbstract { protected List rpcHooks = new ArrayList(); - static { NettyLogger.initNettyLogger(); } @@ -200,24 +201,25 @@ public abstract class NettyRemotingAbstract { public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); - doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); + CompletableFuture responseFuture = pair.getObject1().asyncProcessRequest(ctx, cmd); + responseFuture.thenAccept((r) -> { + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, r); + if (!cmd.isOnewayRPC()) { + if (r != null) { + r.setOpaque(opaque); + r.markResponseType(); + try { + ctx.writeAndFlush(r); + } catch (Throwable e) { + log.error("process request over, but response failed", e); + log.error(cmd.toString()); + log.error(r.toString()); + } + } else { - if (!cmd.isOnewayRPC()) { - if (response != null) { - response.setOpaque(opaque); - response.markResponseType(); - try { - ctx.writeAndFlush(response); - } catch (Throwable e) { - log.error("process request over, but response failed", e); - log.error(cmd.toString()); - log.error(response.toString()); } - } else { - } - } + }); } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index 040f7684883dc6cf3f35ef69b8d61deecca2cf56..6fb744c1e4e17c587cce479b43f2258d22632d89 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.remoting.netty; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.concurrent.CompletableFuture; + /** * Common remoting command processor */ @@ -27,4 +29,9 @@ public interface NettyRequestProcessor { throws Exception; boolean rejectRequest(); + + default CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) + throws Exception { + return CompletableFuture.completedFuture(processRequest(ctx, request)); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 2ab66e84634f119b7dcb3b08e95c3399f26aff44..6061f507690f0d7e463db33b25e7f7854f4ed696 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -21,8 +21,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -61,6 +65,7 @@ public class CommitLog { protected volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; + protected final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { @@ -533,6 +538,228 @@ public class CommitLog { return beginTimeInLock; } + public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { + // Set the storage time + msg.setStoreTimestamp(System.currentTimeMillis()); + // Set the message body BODY CRC (consider the most appropriate setting + // on the client) + msg.setBodyCRC(UtilAll.crc32(msg.getBody())); + // Back to Results + AppendMessageResult result = null; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + String topic = msg.getTopic(); + int queueId = msg.getQueueId(); + + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE + || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + // Delay Delivery + if (msg.getDelayTimeLevel() > 0) { + if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { + msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); + } + + topic = ScheduleMessageService.SCHEDULE_TOPIC; + queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); + + // Backup real topic, queueId + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + msg.setTopic(topic); + msg.setQueueId(queueId); + } + } + + long elapsedTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + msg.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); + } + + result = mappedFile.appendMessage(msg, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); + } + result = mappedFile.appendMessage(msg, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + case UNKNOWN_ERROR: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + default: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); + storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); + + CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); + CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); + return flushResultFuture.thenCombine(replicaResultFuture, (flushOK, replicaOK) -> { + if (!flushOK) { + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + } + if (!replicaOK) { + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + } + return putMessageResult; + }); + } + + public CompletableFuture asyncPutMessages(final MessageExtBatch messageExtBatch) { + messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); + AppendMessageResult result; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); + + if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + } + if (messageExtBatch.getDelayTimeLevel() > 0) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); + } + + long elapsedTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + //fine-grained lock instead of the coarse-grained + MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); + + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + + putMessageLock.lock(); + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + messageExtBatch.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); + } + + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); + } + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + case UNKNOWN_ERROR: + default: + beginTimeInLock = 0; + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); + + CompletableFuture flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch); + CompletableFuture replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch); + return flushOKFuture.thenCombine(replicaOKFuture, (flushOK, replicaOK) -> { + if (!flushOK) { + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + } + + if (!replicaOK) { + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + } + return putMessageResult; + }); + + } + public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); @@ -645,6 +872,53 @@ public class CommitLog { return putMessageResult; } + public CompletableFuture submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, + MessageExt messageExt) { + // Synchronization flush + if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { + final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; + if (messageExt.isWaitStoreMsgOK()) { + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), + this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + service.putRequest(request); + return request.future(); + } else { + service.wakeup(); + return CompletableFuture.completedFuture(true); + } + } + // Asynchronous flush + else { + if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + flushCommitLogService.wakeup(); + } else { + commitLogService.wakeup(); + } + return CompletableFuture.completedFuture(true); + } + } + + public CompletableFuture submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, + MessageExt messageExt) { + if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { + HAService service = this.defaultMessageStore.getHaService(); + if (messageExt.isWaitStoreMsgOK()) { + if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) { + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), + this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + service.putRequest(request); + service.getWaitNotifyObject().wakeupAll(); + return request.future(); + } + else { + return CompletableFuture.completedFuture(false); + } + } + } + return CompletableFuture.completedFuture(true); + } + + public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -652,7 +926,14 @@ public class CommitLog { if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); - boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + CompletableFuture flushOkFuture = request.future(); + boolean flushOK = false; + try { + flushOK = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + //flushOK=false; + } if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); @@ -681,8 +962,13 @@ public class CommitLog { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); - boolean flushOK = - request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + boolean flushOK = false; + try { + flushOK = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + //flushOK=false; + } if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); @@ -1041,26 +1327,39 @@ public class CommitLog { public static class GroupCommitRequest { private final long nextOffset; private final CountDownLatch countDownLatch = new CountDownLatch(1); - private volatile boolean flushOK = false; + private CompletableFuture flushOk = new CompletableFuture<>(); + private final long startTimestamp = System.currentTimeMillis(); + private long timeoutMillis = Long.MAX_VALUE; + + public GroupCommitRequest(long nextOffset, long timeoutMillis) { + this.nextOffset = nextOffset; + this.timeoutMillis = timeoutMillis; + } public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } + public long getNextOffset() { return nextOffset; } public void wakeupCustomer(final boolean flushOK) { - this.flushOK = flushOK; + long endTimestamp = System.currentTimeMillis(); + this.flushOk.complete(flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)); this.countDownLatch.countDown(); } + public CompletableFuture future() { + return flushOk; + } + public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return this.flushOK; - } catch (InterruptedException e) { + return flushOk.get(); + } catch (InterruptedException | ExecutionException e) { log.error("Interrupted", e); return false; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 971b1e75b38da514fd7e8c5f12e5ff5e3dc9c2ab..45a600d483f1f32e1d2cef80921a3532f6a5fa00 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; @@ -351,109 +352,170 @@ public class DefaultMessageStore implements MessageStore { } } - public PutMessageResult putMessage(MessageExtBrokerInner msg) { + private PutMessageStatus checkMessage(MessageExtBrokerInner msg) { + if (msg.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + msg.getTopic().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + + if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + return PutMessageStatus.PUT_OK; + } + + private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) { + if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + + if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { + log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + + return PutMessageStatus.PUT_OK; + } + + private PutMessageStatus checkStoreStatus() { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { - log.warn("message store is slave mode, so putMessage is forbidden "); + log.warn("message store has shutdown, so putMessage is forbidden"); } - - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; } if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { - log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); + log.warn("message store has shutdown, so putMessage is forbidden"); } - - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this.printTimes.set(0); } - if (msg.getTopic().length() > Byte.MAX_VALUE) { - log.warn("putMessage message topic length too long " + msg.getTopic().length()); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + if (this.isOSPageCacheBusy()) { + return PutMessageStatus.OS_PAGECACHE_BUSY; } + return PutMessageStatus.PUT_OK; + } - if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); - return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } - if (this.isOSPageCacheBusy()) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + PutMessageStatus msgCheckStatus = this.checkMessage(msg); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessage(msg); + CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); - long elapsedTime = this.getSystemClock().now() - beginTime; - if (elapsedTime > 500) { - log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); - } - this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); + putResultFuture.thenAccept((result) -> { + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); + } + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); - } + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + }); - return result; + return putResultFuture; } - public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { - if (this.shutdown) { - log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } - if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { - long value = this.printTimes.getAndIncrement(); - if ((value % 50000) == 0) { - log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); + PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); + } + + long beginTime = this.getSystemClock().now(); + CompletableFuture resultFuture = this.commitLog.asyncPutMessages(messageExtBatch); + + resultFuture.thenAccept((result) -> { + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); - } + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - if (!this.runningFlags.isWriteable()) { - long value = this.printTimes.getAndIncrement(); - if ((value % 50000) == 0) { - log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits()); + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } + }); - return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); - } else { - this.printTimes.set(0); + return resultFuture; + } + + @Override + public PutMessageResult putMessage(MessageExtBrokerInner msg) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return new PutMessageResult(checkStoreStatus, null); } - if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { - log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + PutMessageStatus msgCheckStatus = this.checkMessage(msg); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return new PutMessageResult(msgCheckStatus, null); } - if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { - log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + long beginTime = this.getSystemClock().now(); + PutMessageResult result = this.commitLog.putMessage(msg); + long elapsedTime = this.getSystemClock().now() - beginTime; + if (elapsedTime > 500) { + log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } - if (this.isOSPageCacheBusy()) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); + + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + + return result; + } + + @Override + public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { + PutMessageStatus checkStoreStatus = this.checkStoreStatus(); + if (checkStoreStatus != PutMessageStatus.PUT_OK) { + return new PutMessageResult(checkStoreStatus, null); + } + + PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); + if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { + return new PutMessageResult(msgCheckStatus, null); } long beginTime = this.getSystemClock().now(); PutMessageResult result = this.commitLog.putMessages(messageExtBatch); - long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } + this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 5a046ff181f747624f1bb7c427541dfba28509e3..64eb5250de6e59f7ce3ef19033cd40b5d5f0254f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.store; import java.util.HashMap; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.CompletableFuture; + import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.config.BrokerRole; @@ -53,6 +55,26 @@ public interface MessageStore { */ void destroy(); + /** Store a message into store in async manner, the processor can process the next request + * rather than wait for result + * when result is completed, notify the client in async manner + * + * @param msg MessageInstance to store + * @return a CompletableFuture for the result of store operation + */ + default CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { + return CompletableFuture.completedFuture(putMessage(msg)); + } + + /** + * Store a batch of messages in async manner + * @param messageExtBatch the message batch + * @return a CompletableFuture for the result of store operation + */ + default CompletableFuture asyncPutMessages(final MessageExtBatch messageExtBatch) { + return CompletableFuture.completedFuture(putMessages(messageExtBatch)); + } + /** * Store a message into store. * diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 5192e86defba1da4f0bb91080afec8fb3af5424c..572c67bbebccc6525e85cc5310e3b2b5bc21c850 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; import io.openmessaging.storage.dledger.utils.DLedgerUtils; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; @@ -491,7 +492,15 @@ public class DLedgerCommitLog extends CommitLog { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + return CompletableFuture.completedFuture(this.putMessage(msg)); + } + @Override + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + return CompletableFuture.completedFuture(putMessages(messageExtBatch)); + } @Override public SelectMappedBufferResult getMessage(final long offset, final int size) { diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 64495c33987b32d42e1a3c3adf813a6e958068e9..cd23d8bb499dbdba9de2637a39c7a283d7699560 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.store; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -34,12 +36,13 @@ import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * HATest @@ -119,6 +122,35 @@ public class HATest { } } + @Test + public void testSemiSyncReplica() throws Exception { + long totalMsgs = 5; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + for (long i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner msg = buildMessage(); + CompletableFuture putResultFuture = messageStore.asyncPutMessage(msg); + PutMessageResult result = putResultFuture.get(); + assertEquals(PutMessageStatus.PUT_OK, result.getPutMessageStatus()); + //message has been replicated to slave's commitLog, but maybe not dispatch to ConsumeQueue yet + //so direct read from commitLog by physical offset + MessageExt slaveMsg = slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset()); + assertNotNull(slaveMsg); + assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody())); + assertEquals(msg.getTopic(), slaveMsg.getTopic()); + assertEquals(msg.getTags(), slaveMsg.getTags()); + assertEquals(msg.getKeys(), slaveMsg.getKeys()); + } + + //shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT + slaveMessageStore.shutdown(); + for (long i = 0; i < totalMsgs; i++) { + CompletableFuture putResultFuture = messageStore.asyncPutMessage(buildMessage()); + PutMessageResult result = putResultFuture.get(); + assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, result.getPutMessageStatus()); + } + } + @After public void destroy() throws Exception{ Thread.sleep(5000L); @@ -156,6 +188,7 @@ public class HATest { msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(StoreHost); msg.setBornHost(BornHost); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); return msg; }