提交 f28be821 编写于 作者: S shenhui.backend

optimization : change SYNC_MASTER flushSlave/flushDisk to pipeline to reduce...

optimization : change SYNC_MASTER flushSlave/flushDisk to pipeline to reduce produce latency and improve throughput
上级 247e0bfb
...@@ -20,7 +20,9 @@ package org.apache.rocketmq.broker.plugin; ...@@ -20,7 +20,9 @@ package org.apache.rocketmq.broker.plugin;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.CommitLogDispatcher; import org.apache.rocketmq.store.CommitLogDispatcher;
import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
...@@ -86,6 +88,16 @@ public abstract class AbstractPluginMessageStore implements MessageStore { ...@@ -86,6 +88,16 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
return next.putMessage(msg); return next.putMessage(msg);
} }
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return next.asyncPutMessage(msg);
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
return next.asyncPutMessages(messageExtBatch);
}
@Override @Override
public GetMessageResult getMessage(String group, String topic, int queueId, long offset, public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
int maxMsgNums, final MessageFilter messageFilter) { int maxMsgNums, final MessageFilter messageFilter) {
......
...@@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.processor; ...@@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
...@@ -65,28 +67,34 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -65,28 +67,34 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override @Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext; RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
@Override
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) { switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK: case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request); return this.asyncConsumerSendMsgBack(ctx, request);
default: default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request); SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) { if (requestHeader == null) {
return null; return CompletableFuture.completedFuture(null);
} }
mqtraceContext = buildMsgContext(ctx, requestHeader); mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext); this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) { if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else { } else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
} }
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
} }
} }
...@@ -96,50 +104,38 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -96,50 +104,38 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.brokerController.getMessageStore().isTransientStorePoolDeficient(); this.brokerController.getMessageStore().isTransientStorePoolDeficient();
} }
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader = final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup()); String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) { if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);
ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
this.executeConsumeMessageHookAfter(context); this.executeConsumeMessageHookAfter(context);
} }
SubscriptionGroupConfig subscriptionGroupConfig = SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) { if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response; return CompletableFuture.completedFuture(response);
} }
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION); response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response; return CompletableFuture.completedFuture(response);
} }
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
return response; return CompletableFuture.completedFuture(response);
} }
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0; int topicSysFlag = 0;
if (requestHeader.isUnitMode()) { if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true); topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
...@@ -152,20 +148,19 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -152,20 +148,19 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (null == topicConfig) { if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist"); response.setRemark("topic[" + newTopic + "] not exist");
return response; return CompletableFuture.completedFuture(response);
} }
if (!PermName.isWriteable(topicConfig.getPerm())) { if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION); response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response; return CompletableFuture.completedFuture(response);
} }
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) { if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset()); response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response; return CompletableFuture.completedFuture(response);
} }
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
...@@ -181,25 +176,23 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -181,25 +176,23 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
} }
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) { || delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP, DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0 PermName.PERM_WRITE, 0);
);
if (null == topicConfig) { if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist"); response.setRemark("topic[" + newTopic + "] not exist");
return response; return CompletableFuture.completedFuture(response);
} }
} else { } else {
if (0 == delayLevel) { if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes(); delayLevel = 3 + msgExt.getReconsumeTimes();
} }
msgExt.setDelayTimeLevel(delayLevel); msgExt.setDelayTimeLevel(delayLevel);
} }
...@@ -215,40 +208,102 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -215,40 +208,102 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost()); msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost()); msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt); String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setCode(ResponseCode.SUCCESS); if (response.getCode() != -1) {
response.setRemark(null); return CompletableFuture.completedFuture(response);
}
return response; final byte[] body = request.getBody();
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR); int queueIdInt = requestHeader.getQueueId();
response.setRemark(putMessageResult.getPutMessageStatus().name()); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
return response;
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
} }
response.setCode(ResponseCode.SYSTEM_ERROR); MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
response.setRemark("putMessageResult is null"); msgInner.setTopic(requestHeader.getTopic());
return response; msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> putMessageResult,
RemotingCommand response,
RemotingCommand request,
MessageExt msgInner,
SendMessageResponseHeader responseHeader,
SendMessageContext sendMessageContext,
ChannelHandlerContext ctx,
int queueIdInt) {
return putMessageResult.thenApply((r) ->
handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt)
);
} }
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
...@@ -295,79 +350,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -295,79 +350,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return true; return true;
} }
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg, RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
...@@ -473,52 +455,29 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -473,52 +455,29 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return response; return response;
} }
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,
final RemotingCommand request, SendMessageContext mqtraceContext,
final SendMessageContext sendMessageContext, SendMessageRequestHeader requestHeader) {
final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = preSend(ctx, request, requestHeader);
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) { if (response.getCode() != -1) {
return response; return CompletableFuture.completedFuture(response);
} }
int queueIdInt = requestHeader.getQueueId(); int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) { if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
} }
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("message topic length too long " + requestHeader.getTopic().length()); response.setRemark("message topic length too long " + requestHeader.getTopic().length());
return response; return CompletableFuture.completedFuture(response);
} }
if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
return response;
}
MessageExtBatch messageExtBatch = new MessageExtBatch(); MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt); messageExtBatch.setQueueId(queueIdInt);
...@@ -537,11 +496,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -537,11 +496,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
} }
public boolean hasConsumeMessageHook() { public boolean hasConsumeMessageHook() {
return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
} }
...@@ -580,4 +540,49 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -580,4 +540,49 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) { public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
this.consumeMessageHookList = consumeMessageHookList; this.consumeMessageHookList = consumeMessageHookList;
} }
static private ConsumeMessageContext buildConsumeMessageContext(String namespace,
ConsumerSendMsgBackRequestHeader requestHeader,
RemotingCommand request) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
return context;
}
private int randomQueueId(int writeQueueNums) {
return (this.random.nextInt() % 99999999) % writeQueueNums;
}
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}", request);
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimestamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
return response;
}
} }
...@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import java.util.concurrent.CompletableFuture;
public interface TransactionalMessageService { public interface TransactionalMessageService {
...@@ -31,6 +32,14 @@ public interface TransactionalMessageService { ...@@ -31,6 +32,14 @@ public interface TransactionalMessageService {
*/ */
PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
/**
* Process prepare message in async manner, we should put this message to storage service
*
* @param messageInner Prepare(Half) message.
* @return CompletableFuture of put result, will be completed at put success(flush and replica done)
*/
CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
/** /**
* Delete prepare message when this message has been committed or rolled back. * Delete prepare message when this message has been committed or rolled back.
* *
......
...@@ -47,6 +47,7 @@ import java.util.ArrayList; ...@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMessageBridge { public class TransactionalMessageBridge {
...@@ -190,6 +191,10 @@ public class TransactionalMessageBridge { ...@@ -190,6 +191,10 @@ public class TransactionalMessageBridge {
return store.putMessage(parseHalfMessageInner(messageInner)); return store.putMessage(parseHalfMessageInner(messageInner));
} }
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
......
...@@ -40,6 +40,7 @@ import java.util.Date; ...@@ -40,6 +40,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class TransactionalMessageServiceImpl implements TransactionalMessageService { public class TransactionalMessageServiceImpl implements TransactionalMessageService {
...@@ -59,6 +60,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -59,6 +60,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>();
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
@Override @Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner); return transactionalMessageBridge.putHalfMessage(messageInner);
......
...@@ -56,6 +56,7 @@ import java.net.InetSocketAddress; ...@@ -56,6 +56,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
...@@ -93,13 +94,15 @@ public class SendMessageProcessorTest { ...@@ -93,13 +94,15 @@ public class SendMessageProcessorTest {
@Test @Test
public void testProcessRequest() throws RemotingCommandException { public void testProcessRequest() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
assertPutResult(ResponseCode.SUCCESS); assertPutResult(ResponseCode.SUCCESS);
} }
@Test @Test
public void testProcessRequest_WithHook() throws RemotingCommandException { public void testProcessRequest_WithHook() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
List<SendMessageHook> sendMessageHookList = new ArrayList<>(); List<SendMessageHook> sendMessageHookList = new ArrayList<>();
final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; final SendMessageContext[] sendMessageContext = new SendMessageContext[1];
SendMessageHook sendMessageHook = new SendMessageHook() { SendMessageHook sendMessageHook = new SendMessageHook() {
...@@ -129,55 +132,64 @@ public class SendMessageProcessorTest { ...@@ -129,55 +132,64 @@ public class SendMessageProcessorTest {
@Test @Test
public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { public void testProcessRequest_FlushTimeOut() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT); assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT);
} }
@Test @Test
public void testProcessRequest_MessageIllegal() throws RemotingCommandException { public void testProcessRequest_MessageIllegal() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.MESSAGE_ILLEGAL); assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
} }
@Test @Test
public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException { public void testProcessRequest_CreateMappedFileFailed() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SYSTEM_ERROR); assertPutResult(ResponseCode.SYSTEM_ERROR);
} }
@Test @Test
public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException { public void testProcessRequest_FlushSlaveTimeout() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT); assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT);
} }
@Test @Test
public void testProcessRequest_PageCacheBusy() throws RemotingCommandException { public void testProcessRequest_PageCacheBusy() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SYSTEM_ERROR); assertPutResult(ResponseCode.SYSTEM_ERROR);
} }
@Test @Test
public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException { public void testProcessRequest_PropertiesTooLong() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.MESSAGE_ILLEGAL); assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
} }
@Test @Test
public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException { public void testProcessRequest_ServiceNotAvailable() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE); assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE);
} }
@Test @Test
public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException { public void testProcessRequest_SlaveNotAvailable() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))));
assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE); assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE);
} }
@Test @Test
public void testProcessRequest_WithMsgBack() throws RemotingCommandException { public void testProcessRequest_WithMsgBack() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
sendMessageProcessor = new SendMessageProcessor(brokerController); sendMessageProcessor = new SendMessageProcessor(brokerController);
...@@ -189,7 +201,8 @@ public class SendMessageProcessorTest { ...@@ -189,7 +201,8 @@ public class SendMessageProcessorTest {
@Test @Test
public void testProcessRequest_Transaction() throws RemotingCommandException { public void testProcessRequest_Transaction() throws RemotingCommandException {
brokerController.setTransactionalMessageService(transactionMsgService); brokerController.setTransactionalMessageService(transactionMsgService);
when(brokerController.getTransactionalMessageService().prepareMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); when(brokerController.getTransactionalMessageService().asyncPrepareMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE); RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE);
final RemotingCommand[] response = new RemotingCommand[1]; final RemotingCommand[] response = new RemotingCommand[1];
doAnswer(new Answer() { doAnswer(new Answer() {
......
...@@ -27,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -27,6 +27,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import java.util.concurrent.CompletableFuture;
public class TransactionalMessageServiceImpl implements TransactionalMessageService { public class TransactionalMessageServiceImpl implements TransactionalMessageService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
...@@ -35,6 +37,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -35,6 +37,11 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
return null; return null;
} }
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return null;
}
@Override @Override
public boolean deletePrepareMessage(MessageExt messageExt) { public boolean deletePrepareMessage(MessageExt messageExt) {
return false; return false;
......
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
<name>rocketmq-remoting ${project.version}</name> <name>rocketmq-remoting ${project.version}</name>
<properties> <properties>
<maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
</properties> </properties>
<dependencies> <dependencies>
......
...@@ -29,6 +29,7 @@ import java.util.Iterator; ...@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
...@@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
...@@ -102,7 +104,6 @@ public abstract class NettyRemotingAbstract { ...@@ -102,7 +104,6 @@ public abstract class NettyRemotingAbstract {
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>(); protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
static { static {
NettyLogger.initNettyLogger(); NettyLogger.initNettyLogger();
} }
...@@ -200,24 +201,25 @@ public abstract class NettyRemotingAbstract { ...@@ -200,24 +201,25 @@ public abstract class NettyRemotingAbstract {
public void run() { public void run() {
try { try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); CompletableFuture<RemotingCommand> responseFuture = pair.getObject1().asyncProcessRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); responseFuture.thenAccept((r) -> {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, r);
if (!cmd.isOnewayRPC()) {
if (r != null) {
r.setOpaque(opaque);
r.markResponseType();
try {
ctx.writeAndFlush(r);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(r.toString());
}
} else {
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
} }
} else {
} }
} });
} catch (Throwable e) { } catch (Throwable e) {
log.error("process request exception", e); log.error("process request exception", e);
log.error(cmd.toString()); log.error(cmd.toString());
......
...@@ -19,6 +19,8 @@ package org.apache.rocketmq.remoting.netty; ...@@ -19,6 +19,8 @@ package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.CompletableFuture;
/** /**
* Common remoting command processor * Common remoting command processor
*/ */
...@@ -27,4 +29,9 @@ public interface NettyRequestProcessor { ...@@ -27,4 +29,9 @@ public interface NettyRequestProcessor {
throws Exception; throws Exception;
boolean rejectRequest(); boolean rejectRequest();
default CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception {
return CompletableFuture.completedFuture(processRequest(ctx, request));
}
} }
...@@ -21,8 +21,12 @@ import java.util.ArrayList; ...@@ -21,8 +21,12 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
...@@ -61,6 +65,7 @@ public class CommitLog { ...@@ -61,6 +65,7 @@ public class CommitLog {
protected volatile long confirmOffset = -1L; protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0; private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock; protected final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) { public CommitLog(final DefaultMessageStore defaultMessageStore) {
...@@ -533,6 +538,228 @@ public class CommitLog { ...@@ -533,6 +538,228 @@ public class CommitLog {
return beginTimeInLock; return beginTimeInLock;
} }
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<Boolean> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<Boolean> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushOK, replicaOK) -> {
if (!flushOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (!replicaOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
return putMessageResult;
});
}
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
CompletableFuture<Boolean> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch);
CompletableFuture<Boolean> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushOK, replicaOK) -> {
if (!flushOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (!replicaOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
return putMessageResult;
});
}
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time // Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis()); msg.setStoreTimestamp(System.currentTimeMillis());
...@@ -645,6 +872,53 @@ public class CommitLog { ...@@ -645,6 +872,53 @@ public class CommitLog {
return putMessageResult; return putMessageResult;
} }
public CompletableFuture<Boolean> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(true);
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(true);
}
}
public CompletableFuture<Boolean> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult,
MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(false);
}
}
}
return CompletableFuture.completedFuture(true);
}
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush // Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
...@@ -652,7 +926,14 @@ public class CommitLog { ...@@ -652,7 +926,14 @@ public class CommitLog {
if (messageExt.isWaitStoreMsgOK()) { if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request); service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); CompletableFuture<Boolean> flushOkFuture = request.future();
boolean flushOK = false;
try {
flushOK = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (!flushOK) { if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString()); + " client address: " + messageExt.getBornHostString());
...@@ -681,8 +962,13 @@ public class CommitLog { ...@@ -681,8 +962,13 @@ public class CommitLog {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request); service.putRequest(request);
service.getWaitNotifyObject().wakeupAll(); service.getWaitNotifyObject().wakeupAll();
boolean flushOK = boolean flushOK = false;
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); try {
flushOK = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (!flushOK) { if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
...@@ -1041,26 +1327,39 @@ public class CommitLog { ...@@ -1041,26 +1327,39 @@ public class CommitLog {
public static class GroupCommitRequest { public static class GroupCommitRequest {
private final long nextOffset; private final long nextOffset;
private final CountDownLatch countDownLatch = new CountDownLatch(1); private final CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile boolean flushOK = false; private CompletableFuture<Boolean> flushOk = new CompletableFuture<>();
private final long startTimestamp = System.currentTimeMillis();
private long timeoutMillis = Long.MAX_VALUE;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
this.timeoutMillis = timeoutMillis;
}
public GroupCommitRequest(long nextOffset) { public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset; this.nextOffset = nextOffset;
} }
public long getNextOffset() { public long getNextOffset() {
return nextOffset; return nextOffset;
} }
public void wakeupCustomer(final boolean flushOK) { public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK; long endTimestamp = System.currentTimeMillis();
this.flushOk.complete(flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis));
this.countDownLatch.countDown(); this.countDownLatch.countDown();
} }
public CompletableFuture<Boolean> future() {
return flushOk;
}
public boolean waitForFlush(long timeout) { public boolean waitForFlush(long timeout) {
try { try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK; return flushOk.get();
} catch (InterruptedException e) { } catch (InterruptedException | ExecutionException e) {
log.error("Interrupted", e); log.error("Interrupted", e);
return false; return false;
} }
......
...@@ -29,6 +29,7 @@ import java.util.LinkedList; ...@@ -29,6 +29,7 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -351,109 +352,170 @@ public class DefaultMessageStore implements MessageStore { ...@@ -351,109 +352,170 @@ public class DefaultMessageStore implements MessageStore {
} }
} }
public PutMessageResult putMessage(MessageExtBrokerInner msg) { private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) {
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
private PutMessageStatus checkStoreStatus() {
if (this.shutdown) { if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden"); log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} }
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement(); long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) { if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden "); log.warn("message store has shutdown, so putMessage is forbidden");
} }
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} }
if (!this.runningFlags.isWriteable()) { if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement(); long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) { if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); log.warn("message store has shutdown, so putMessage is forbidden");
} }
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else { } else {
this.printTimes.set(0); this.printTimes.set(0);
} }
if (msg.getTopic().length() > Byte.MAX_VALUE) { if (this.isOSPageCacheBusy()) {
log.warn("putMessage message topic length too long " + msg.getTopic().length()); return PutMessageStatus.OS_PAGECACHE_BUSY;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
} }
return PutMessageStatus.PUT_OK;
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { @Override
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
} }
if (this.isOSPageCacheBusy()) { PutMessageStatus msgCheckStatus = this.checkMessage(msg);
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
} }
long beginTime = this.getSystemClock().now(); long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg); CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime; putResultFuture.thenAccept((result) -> {
if (elapsedTime > 500) { long elapsedTime = this.getSystemClock().now() - beginTime;
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); if (elapsedTime > 500) {
} log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); }
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) { if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
} }
});
return result; return putResultFuture;
} }
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
if (this.shutdown) { PutMessageStatus checkStoreStatus = this.checkStoreStatus();
log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
} }
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
long value = this.printTimes.getAndIncrement(); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
if ((value % 50000) == 0) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); }
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
resultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
} }
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
}
if (!this.runningFlags.isWriteable()) { if (null == result || !result.isOk()) {
long value = this.printTimes.getAndIncrement(); this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
} }
});
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); return resultFuture;
} else { }
this.printTimes.set(0);
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
} }
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { PutMessageStatus msgCheckStatus = this.checkMessage(msg);
log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); return new PutMessageResult(msgCheckStatus, null);
} }
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { long beginTime = this.getSystemClock().now();
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); PutMessageResult result = this.commitLog.putMessage(msg);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
} }
if (this.isOSPageCacheBusy()) { this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
} }
long beginTime = this.getSystemClock().now(); long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch); PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime; long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) { if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
} }
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) { if (null == result || !result.isOk()) {
......
...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store; ...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
...@@ -53,6 +55,26 @@ public interface MessageStore { ...@@ -53,6 +55,26 @@ public interface MessageStore {
*/ */
void destroy(); void destroy();
/** Store a message into store in async manner, the processor can process the next request
* rather than wait for result
* when result is completed, notify the client in async manner
*
* @param msg MessageInstance to store
* @return a CompletableFuture for the result of store operation
*/
default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(putMessage(msg));
}
/**
* Store a batch of messages in async manner
* @param messageExtBatch the message batch
* @return a CompletableFuture for the result of store operation
*/
default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}
/** /**
* Store a message into store. * Store a message into store.
* *
......
...@@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult; ...@@ -30,6 +30,7 @@ import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import io.openmessaging.storage.dledger.utils.DLedgerUtils; import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
...@@ -491,7 +492,15 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -491,7 +492,15 @@ public class DLedgerCommitLog extends CommitLog {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
} }
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
return CompletableFuture.completedFuture(this.putMessage(msg));
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}
@Override @Override
public SelectMappedBufferResult getMessage(final long offset, final int size) { public SelectMappedBufferResult getMessage(final long offset, final int size) {
......
...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store; ...@@ -19,6 +19,8 @@ package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
...@@ -34,12 +36,13 @@ import java.lang.reflect.Method; ...@@ -34,12 +36,13 @@ import java.lang.reflect.Method;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Arrays;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
/** /**
* HATest * HATest
...@@ -119,6 +122,35 @@ public class HATest { ...@@ -119,6 +122,35 @@ public class HATest {
} }
} }
@Test
public void testSemiSyncReplica() throws Exception {
long totalMsgs = 5;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner msg = buildMessage();
CompletableFuture<PutMessageResult> putResultFuture = messageStore.asyncPutMessage(msg);
PutMessageResult result = putResultFuture.get();
assertEquals(PutMessageStatus.PUT_OK, result.getPutMessageStatus());
//message has been replicated to slave's commitLog, but maybe not dispatch to ConsumeQueue yet
//so direct read from commitLog by physical offset
MessageExt slaveMsg = slaveMessageStore.lookMessageByOffset(result.getAppendMessageResult().getWroteOffset());
assertNotNull(slaveMsg);
assertTrue(Arrays.equals(msg.getBody(), slaveMsg.getBody()));
assertEquals(msg.getTopic(), slaveMsg.getTopic());
assertEquals(msg.getTags(), slaveMsg.getTags());
assertEquals(msg.getKeys(), slaveMsg.getKeys());
}
//shutdown slave, putMessage should return FLUSH_SLAVE_TIMEOUT
slaveMessageStore.shutdown();
for (long i = 0; i < totalMsgs; i++) {
CompletableFuture<PutMessageResult> putResultFuture = messageStore.asyncPutMessage(buildMessage());
PutMessageResult result = putResultFuture.get();
assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, result.getPutMessageStatus());
}
}
@After @After
public void destroy() throws Exception{ public void destroy() throws Exception{
Thread.sleep(5000L); Thread.sleep(5000L);
...@@ -156,6 +188,7 @@ public class HATest { ...@@ -156,6 +188,7 @@ public class HATest {
msg.setBornTimestamp(System.currentTimeMillis()); msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost); msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost); msg.setBornHost(BornHost);
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
return msg; return msg;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册