提交 11653ce2 编写于 作者: D dongeforever 提交者: dongeforever

ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53

上级 087d989f
...@@ -374,9 +374,11 @@ public class BrokerController { ...@@ -374,9 +374,11 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/** /**
* PullMessageProcessor * PullMessageProcessor
......
...@@ -17,11 +17,6 @@ ...@@ -17,11 +17,6 @@
package org.apache.rocketmq.broker.processor; package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
...@@ -51,6 +46,12 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; ...@@ -51,6 +46,12 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -279,6 +280,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces ...@@ -279,6 +280,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null; SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) { switch (request.getCode()) {
case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2: case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 = requestHeaderV2 =
(SendMessageRequestHeaderV2) request (SendMessageRequestHeaderV2) request
......
...@@ -95,6 +95,7 @@ public class Validators { ...@@ -95,6 +95,7 @@ public class Validators {
} }
// topic // topic
Validators.checkTopic(msg.getTopic()); Validators.checkTopic(msg.getTopic());
// body // body
if (null == msg.getBody()) { if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
......
...@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl; ...@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.Iterator;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullCallback;
...@@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats; ...@@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
...@@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; ...@@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger; import org.slf4j.Logger;
public class MQClientAPIImpl { public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog(); private final static Logger log = ClientLogger.getLog();
...@@ -278,14 +280,14 @@ public class MQClientAPIImpl { ...@@ -278,14 +280,14 @@ public class MQClientAPIImpl {
} }
public SendResult sendMessage(// public SendResult sendMessage(//
final String addr, // 1 final String addr, // 1
final String brokerName, // 2 final String brokerName, // 2
final Message msg, // 3 final Message msg, // 3
final SendMessageRequestHeader requestHeader, // 4 final SendMessageRequestHeader requestHeader, // 4
final long timeoutMillis, // 5 final long timeoutMillis, // 5
final CommunicationMode communicationMode, // 6 final CommunicationMode communicationMode, // 6
final SendMessageContext context, // 7 final SendMessageContext context, // 7
final DefaultMQProducerImpl producer // 8 final DefaultMQProducerImpl producer // 8
) throws RemotingException, MQBrokerException, InterruptedException { ) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
} }
...@@ -305,9 +307,9 @@ public class MQClientAPIImpl { ...@@ -305,9 +307,9 @@ public class MQClientAPIImpl {
final DefaultMQProducerImpl producer // 12 final DefaultMQProducerImpl producer // 12
) throws RemotingException, MQBrokerException, InterruptedException { ) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null; RemotingCommand request = null;
if (sendSmartMsg) { if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else { } else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
} }
...@@ -334,11 +336,11 @@ public class MQClientAPIImpl { ...@@ -334,11 +336,11 @@ public class MQClientAPIImpl {
} }
private SendResult sendMessageSync(// private SendResult sendMessageSync(//
final String addr, // final String addr, //
final String brokerName, // final String brokerName, //
final Message msg, // final Message msg, //
final long timeoutMillis, // final long timeoutMillis, //
final RemotingCommand request// final RemotingCommand request//
) throws RemotingException, MQBrokerException, InterruptedException { ) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null; assert response != null;
...@@ -507,8 +509,16 @@ public class MQClientAPIImpl { ...@@ -507,8 +509,16 @@ public class MQClientAPIImpl {
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
}
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus, SendResult sendResult = new SendResult(sendStatus,
MessageClientIDSetter.getUniqID(msg), uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId()); sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
...@@ -1452,7 +1462,7 @@ public class MQClientAPIImpl { ...@@ -1452,7 +1462,7 @@ public class MQClientAPIImpl {
} }
public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
requestHeader.setTopic(topic); requestHeader.setTopic(topic);
requestHeader.setGroup(group); requestHeader.setGroup(group);
......
...@@ -30,6 +30,16 @@ import java.util.concurrent.ExecutorService; ...@@ -30,6 +30,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.common.ClientErrorCode;
...@@ -58,15 +68,6 @@ import org.apache.rocketmq.common.MixAll; ...@@ -58,15 +68,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
...@@ -595,8 +596,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -595,8 +596,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
byte[] prevBody = msg.getBody(); byte[] prevBody = msg.getBody();
try { try {
//for MessageBatch,ID has been set in the generating process
MessageClientIDSetter.setUniqID(msg); if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0; int sysFlag = 0;
if (this.tryToCompressMessage(msg)) { if (this.tryToCompressMessage(msg)) {
...@@ -652,6 +655,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -652,6 +655,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0); requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) { if (reconsumeTimes != null) {
...@@ -737,6 +741,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -737,6 +741,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
private boolean tryToCompressMessage(final Message msg) { private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch dose not support compressing right now
return false;
}
byte[] body = msg.getBody(); byte[] body = msg.getBody();
if (body != null) { if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
......
...@@ -16,14 +16,18 @@ ...@@ -16,14 +16,18 @@
*/ */
package org.apache.rocketmq.client.producer; package org.apache.rocketmq.client.producer;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageId;
...@@ -577,6 +581,40 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -577,6 +581,40 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
} }
@Override
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
@Override
public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}
@Override
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
}
@Override
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
}
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
return msgBatch;
}
public String getProducerGroup() { public String getProducerGroup() {
return producerGroup; return producerGroup;
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.client.producer; package org.apache.rocketmq.client.producer;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
...@@ -81,4 +82,17 @@ public interface MQProducer extends MQAdmin { ...@@ -81,4 +82,17 @@ public interface MQProducer extends MQAdmin {
TransactionSendResult sendMessageInTransaction(final Message msg, TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
} }
...@@ -19,4 +19,5 @@ package org.apache.rocketmq.common; ...@@ -19,4 +19,5 @@ package org.apache.rocketmq.common;
public enum TopicFilterType { public enum TopicFilterType {
SINGLE_TAG, SINGLE_TAG,
MULTI_TAG MULTI_TAG
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.common.MixAll;
public class MessageBatch extends Message implements Iterable<Message> {
private static final long serialVersionUID = 621335151046335557L;
private final List<Message> messages;
private MessageBatch(List<Message> messages) {
this.messages = messages;
}
public byte[] encode() {
return MessageDecoder.encodeMessages(messages);
}
public Iterator<Message> iterator() {
return messages.iterator();
}
public static MessageBatch generateFromList(Collection<Message> messages) {
assert messages != null;
assert messages.size() > 0;
List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;
for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");
}
if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
throw new UnsupportedOperationException("Retry Group is not supported for batching");
}
if (first == null) {
first = message;
} else {
if (!first.getTopic().equals(message.getTopic())) {
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
}
if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
}
}
messageList.add(message);
}
MessageBatch messageBatch = new MessageBatch(messageList);
messageBatch.setTopic(first.getTopic());
messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
return messageBatch;
}
}
...@@ -200,6 +200,8 @@ public class MessageDecoder { ...@@ -200,6 +200,8 @@ public class MessageDecoder {
return byteBuffer.array(); return byteBuffer.array();
} }
public static MessageExt decode( public static MessageExt decode(
java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) { java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
return decode(byteBuffer, readBody, deCompressBody, false); return decode(byteBuffer, readBody, deCompressBody, false);
...@@ -372,4 +374,105 @@ public class MessageDecoder { ...@@ -372,4 +374,105 @@ public class MessageDecoder {
return map; return map;
} }
public static byte[] encodeMessage(Message message) {
//only need flag, body, properties
byte[] body = message.getBody();
int bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
//note properties length must not more than Short.MAX
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = message.getFlag();
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(0);
// 3 BODYCRC
byteBuffer.putInt(0);
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
}
public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception {
Message message = new Message();
// 1 TOTALSIZE
byteBuffer.getInt();
// 2 MAGICCODE
byteBuffer.getInt();
// 3 BODYCRC
byteBuffer.getInt();
// 4 FLAG
int flag = byteBuffer.getInt();
message.setFlag(flag);
// 5 BODY
int bodyLen = byteBuffer.getInt();
byte[] body = new byte[bodyLen];
byteBuffer.get(body);
message.setBody(body);
// 6 properties
short propertiesLen = byteBuffer.getShort();
byte[] propertiesBytes = new byte[propertiesLen];
byteBuffer.get(propertiesBytes);
message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8)));
return message;
}
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message: messages) {
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
}
public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
//TO DO add a callback for processing, avoid creating lists
List<Message> msgs = new ArrayList<Message>();
while (byteBuffer.hasRemaining()) {
Message msg = decodeMessage(byteBuffer);
msgs.add(msg);
}
return msgs;
}
} }
...@@ -64,7 +64,7 @@ public class MessageExt extends Message { ...@@ -64,7 +64,7 @@ public class MessageExt extends Message {
return TopicFilterType.SINGLE_TAG; return TopicFilterType.SINGLE_TAG;
} }
private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
byteBuffer.putInt(inetSocketAddress.getPort()); byteBuffer.putInt(inetSocketAddress.getPort());
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.message;
import java.nio.ByteBuffer;
public class MessageExtBatch extends MessageExt {
private static final long serialVersionUID = -2353110995348498537L;
public ByteBuffer wrap() {
assert getBody() != null;
return ByteBuffer.wrap(getBody(), 0, getBody().length);
}
private ByteBuffer encodedBuff;
public ByteBuffer getEncodedBuff() {
return encodedBuff;
}
public void setEncodedBuff(ByteBuffer encodedBuff) {
this.encodedBuff = encodedBuff;
}
}
...@@ -159,4 +159,7 @@ public class RequestCode { ...@@ -159,4 +159,7 @@ public class RequestCode {
* get config from name server * get config from name server
*/ */
public static final int GET_NAMESRV_CONFIG = 319; public static final int GET_NAMESRV_CONFIG = 319;
public static final int SEND_BATCH_MESSAGE = 320;
} }
...@@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private Integer reconsumeTimes; private Integer reconsumeTimes;
@CFNullable @CFNullable
private boolean unitMode = false; private boolean unitMode = false;
@CFNullable
private boolean batch = false;
private Integer maxReconsumeTimes; private Integer maxReconsumeTimes;
@Override @Override
...@@ -149,4 +151,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -149,4 +151,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) { public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes; this.maxReconsumeTimes = maxReconsumeTimes;
} }
public boolean isBatch() {
return batch;
}
public void setBatch(boolean batch) {
this.batch = batch;
}
} }
...@@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private Integer l; // consumeRetryTimes private Integer l; // consumeRetryTimes
@CFNullable
private boolean m; //batch
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader(); SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a); v1.setProducerGroup(v2.a);
...@@ -65,6 +69,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -65,6 +69,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setReconsumeTimes(v2.j); v1.setReconsumeTimes(v2.j);
v1.setUnitMode(v2.k); v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l); v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
return v1; return v1;
} }
...@@ -82,6 +87,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -82,6 +87,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.j = v1.getReconsumeTimes(); v2.j = v1.getReconsumeTimes();
v2.k = v1.isUnitMode(); v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes(); v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
return v2; return v2;
} }
...@@ -184,4 +190,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -184,4 +190,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public void setL(final Integer l) { public void setL(final Integer l) {
this.l = l; this.l = l;
} }
public boolean isM() {
return m;
}
public void setM(boolean m) {
this.m = m;
}
} }
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.junit.Test;
public class MessageBatchTest {
public List<Message> generateMessages() {
List<Message> messages = new ArrayList<Message>();
Message message1 = new Message("topic1", "body".getBytes());
Message message2 = new Message("topic1", "body".getBytes());
messages.add(message1);
messages.add(message2);
return messages;
}
@Test
public void testGenerate_OK() throws Exception{
List<Message> messages = generateMessages();
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_DiffTopic() throws Exception{
List<Message> messages = generateMessages();
messages.get(1).setTopic("topic2");
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_DiffWaitOK() throws Exception{
List<Message> messages = generateMessages();
messages.get(1).setWaitStoreMsgOK(false);
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_Delay() throws Exception{
List<Message> messages = generateMessages();
messages.get(1).setDelayTimeLevel(1);
MessageBatch.generateFromList(messages);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_Retry() throws Exception{
List<Message> messages = generateMessages();
messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
MessageBatch.generateFromList(messages);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertTrue;
/**
* Created by liuzhendong on 16/12/21.
*/
public class MessageEncodeDecodeTest {
@Test
public void testEncodeDecodeSingle() throws Exception{
Message message = new Message("topic", "body".getBytes());
message.setFlag(12);
message.putUserProperty("key","value");
byte[] bytes = MessageDecoder.encodeMessage(message);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
Message newMessage = MessageDecoder.decodeMessage(buffer);
assertTrue(message.getFlag() == newMessage.getFlag());
assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key")));
assertTrue(Arrays.equals(newMessage.getBody(), message.getBody()));
}
@Test
public void testEncodeDecodeList() throws Exception {
List<Message> messages = new ArrayList<Message>(128);
for (int i = 0; i < 100; i++) {
Message message = new Message("topic", ("body" + i).getBytes());
message.setFlag(i);
message.putUserProperty("key", "value" + i);
messages.add(message);
}
byte[] bytes = MessageDecoder.encodeMessages(messages);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
List<Message> newMsgs = MessageDecoder.decodeMessages(buffer);
assertTrue(newMsgs.size() == messages.size());
for (int i = 0; i < newMsgs.size(); i++) {
Message message = messages.get(i);
Message newMessage = newMsgs.get(i);
assertTrue(message.getFlag() == newMessage.getFlag());
assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key")));
assertTrue(Arrays.equals(newMessage.getBody(), message.getBody()));
}
}
}
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.rocketmq.common.message.MessageExtBatch;
/** /**
* Write messages callback interface * Write messages callback interface
...@@ -32,5 +33,17 @@ public interface AppendMessageCallback { ...@@ -32,5 +33,17 @@ public interface AppendMessageCallback {
* @return How many bytes to write * @return How many bytes to write
*/ */
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank, final MessageExtBrokerInner msg); final int maxBlank, final MessageExtBrokerInner msg);
/**
* After batched message serialization, write MapedByteBuffer
*
* @param byteBuffer
* @param maxBlank
* @param messageExtBatch, backed up by a byte array
*
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank, final MessageExtBatch messageExtBatch);
} }
...@@ -34,6 +34,8 @@ public class AppendMessageResult { ...@@ -34,6 +34,8 @@ public class AppendMessageResult {
private long logicsOffset; private long logicsOffset;
private long pagecacheRT = 0; private long pagecacheRT = 0;
private int msgNum = 1;
public AppendMessageResult(AppendMessageStatus status) { public AppendMessageResult(AppendMessageStatus status) {
this(status, 0, 0, "", 0, 0, 0); this(status, 0, 0, "", 0, 0, 0);
} }
...@@ -109,6 +111,14 @@ public class AppendMessageResult { ...@@ -109,6 +111,14 @@ public class AppendMessageResult {
this.logicsOffset = logicsOffset; this.logicsOffset = logicsOffset;
} }
public int getMsgNum() {
return msgNum;
}
public void setMsgNum(int msgNum) {
this.msgNum = msgNum;
}
@Override @Override
public String toString() { public String toString() {
return "AppendMessageResult{" + return "AppendMessageResult{" +
...@@ -119,6 +129,7 @@ public class AppendMessageResult { ...@@ -119,6 +129,7 @@ public class AppendMessageResult {
", storeTimestamp=" + storeTimestamp + ", storeTimestamp=" + storeTimestamp +
", logicsOffset=" + logicsOffset + ", logicsOffset=" + logicsOffset +
", pagecacheRT=" + pagecacheRT + ", pagecacheRT=" + pagecacheRT +
", msgNum=" + msgNum +
'}'; '}';
} }
} }
...@@ -331,7 +331,7 @@ public class ConsumeQueue { ...@@ -331,7 +331,7 @@ public class ConsumeQueue {
public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
long logicOffset) { long logicOffset) {
final int maxRetries = 30; final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) { for (int i = 0; i < maxRetries && canWrite; i++) {
boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
if (result) { if (result) {
......
...@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.UtilAll; ...@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.running.RunningStats; import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
...@@ -325,6 +326,62 @@ public class DefaultMessageStore implements MessageStore { ...@@ -325,6 +326,62 @@ public class DefaultMessageStore implements MessageStore {
return result; return result;
} }
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
if (this.shutdown) {
log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
@Override @Override
public boolean isOSPageCacheBusy() { public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock(); long begin = this.getCommitLog().getBeginTimeInLock();
......
...@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC; import org.apache.rocketmq.store.util.LibC;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -187,7 +189,15 @@ public class MappedFile extends ReferenceResource { ...@@ -187,7 +189,15 @@ public class MappedFile extends ReferenceResource {
} }
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
assert msg != null; return appendMessagesInner(msg, cb);
}
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
return appendMessagesInner(messageExtBatch, cb);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null; assert cb != null;
int currentPos = this.wrotePosition.get(); int currentPos = this.wrotePosition.get();
...@@ -195,30 +205,28 @@ public class MappedFile extends ReferenceResource { ...@@ -195,30 +205,28 @@ public class MappedFile extends ReferenceResource {
if (currentPos < this.fileSize) { if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos); byteBuffer.position(currentPos);
AppendMessageResult result = AppendMessageResult result = null;
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes()); this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp(); this.storeTimestamp = result.getStoreTimestamp();
return result; return result;
} }
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
+ this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
} }
/**
*/
public long getFileFromOffset() { public long getFileFromOffset() {
return this.fileFromOffset; return this.fileFromOffset;
} }
/**
*
*/
public boolean appendMessage(final byte[] data) { public boolean appendMessage(final byte[] data) {
int currentPos = this.wrotePosition.get(); int currentPos = this.wrotePosition.get();
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.store; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public interface MessageStore { public interface MessageStore {
...@@ -33,6 +34,8 @@ public interface MessageStore { ...@@ -33,6 +34,8 @@ public interface MessageStore {
PutMessageResult putMessage(final MessageExtBrokerInner msg); PutMessageResult putMessage(final MessageExtBrokerInner msg);
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
GetMessageResult getMessage(final String group, final String topic, final int queueId, GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); final long offset, final int maxMsgNums, final SubscriptionData subscriptionData);
......
...@@ -27,6 +27,8 @@ public class RunningFlags { ...@@ -27,6 +27,8 @@ public class RunningFlags {
private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3; private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
private static final int DISK_FULL_BIT = 1 << 4; private static final int DISK_FULL_BIT = 1 << 4;
private volatile int flagBits = 0; private volatile int flagBits = 0;
public RunningFlags() { public RunningFlags() {
...@@ -76,6 +78,15 @@ public class RunningFlags { ...@@ -76,6 +78,15 @@ public class RunningFlags {
return false; return false;
} }
//for consume queue, just ignore the DISK_FULL_BIT
public boolean isCQWriteable() {
if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
return true;
}
return false;
}
public boolean getAndMakeNotWriteable() { public boolean getAndMakeNotWriteable() {
boolean result = this.isWriteable(); boolean result = this.isWriteable();
if (result) { if (result) {
......
...@@ -132,6 +132,7 @@ public class MessageStoreConfig { ...@@ -132,6 +132,7 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5; private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false; private boolean fastFailIfNoBufferInStorePool = false;
public boolean isDebugLockEnable() { public boolean isDebugLockEnable() {
return debugLockEnable; return debugLockEnable;
} }
...@@ -629,4 +630,5 @@ public class MessageStoreConfig { ...@@ -629,4 +630,5 @@ public class MessageStoreConfig {
public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) { public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
} }
} }
...@@ -123,7 +123,9 @@ public class BrokerStatsManager { ...@@ -123,7 +123,9 @@ public class BrokerStatsManager {
public void incTopicPutNums(final String topic) { public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
} }
public void incTopicPutNums(final String topic, int num, int times) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times);
}
public void incTopicPutSize(final String topic, final int size) { public void incTopicPutSize(final String topic, final int size) {
this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1); this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1);
} }
...@@ -154,7 +156,9 @@ public class BrokerStatsManager { ...@@ -154,7 +156,9 @@ public class BrokerStatsManager {
public void incBrokerPutNums() { public void incBrokerPutNums() {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
} }
public void incBrokerPutNums(final int incValue) {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
}
public void incBrokerGetNums(final int incValue) { public void incBrokerGetNums(final int incValue) {
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AppendCallbackTest {
AppendMessageCallback callback;
CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024);
@Before
public void init() throws Exception{
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog");
//too much reference
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null);
CommitLog commitLog = new CommitLog(messageStore);
callback = commitLog.new DefaultAppendMessageCallback(1024);
}
@Test
public void testAppendMessageBatchEndOfFile() throws Exception{
List<Message> messages = new ArrayList<>();
String topic = "test-topic";
int queue= 0;
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setBody("body".getBytes());
msg.setTopic(topic);
msg.setTags("abc");
messages.add(msg);
}
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(queue);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123));
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
//encounter end of file when append half of the data
AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch);
assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus());
assertEquals(0, result.getWroteOffset());
assertEquals(0, result.getLogicsOffset());
assertEquals(1000, result.getWroteBytes());
assertEquals(8, buff.position()); //write blank size and magic value
assertTrue(result.getMsgId().length() > 0); //should have already constructed some message ids
}
@Test
public void testAppendMessageBatchSucc() throws Exception {
List<Message> messages = new ArrayList<>();
String topic = "test-topic";
int queue= 0;
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setBody("body".getBytes());
msg.setTopic(topic);
msg.setTags("abc");
messages.add(msg);
}
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(queue);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123));
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch);
assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
assertEquals(0, allresult.getWroteOffset());
assertEquals(0, allresult.getLogicsOffset());
assertEquals(buff.position(), allresult.getWroteBytes());
assertEquals(messages.size(), allresult.getMsgNum());
Set<String> msgIds = new HashSet<>();
for (String msgId: allresult.getMsgId().split(",")) {
assertEquals(32, msgId.length());
msgIds.add(msgId);
}
assertEquals(messages.size(), msgIds.size());
List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) buff.flip());
assertEquals(decodeMsgs.size(), decodeMsgs.size());
long queueOffset = decodeMsgs.get(0).getQueueOffset();
long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp();
for (int i = 0; i < messages.size(); i++) {
assertEquals(messages.get(i).getTopic(), decodeMsgs.get(i).getTopic());
assertEquals(new String(messages.get(i).getBody()), new String(decodeMsgs.get(i).getBody()));
assertEquals(messages.get(i).getTags(), decodeMsgs.get(i).getTags());
assertEquals(messageExtBatch.getBornHostNameString(), decodeMsgs.get(i).getBornHostNameString());
assertEquals(messageExtBatch.getBornTimestamp(), decodeMsgs.get(i).getBornTimestamp());
assertEquals(storeTimeStamp, decodeMsgs.get(i).getStoreTimestamp());
assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.producer.batch;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.apache.rocketmq.test.util.RandomUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BatchSendIT extends BaseConf {
private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class);
private String topic = null;
private Random random = new Random();
@Before
public void setUp() {
topic = initTopic();
logger.info(String.format("user topic[%s]!", topic));
}
@After
public void tearDown() {
super.shutDown();
}
@Test
public void testBatchSend_ViewMessage() throws Exception {
List<Message> messageList = new ArrayList<>();
int batchNum = 100;
for (int i = 0; i < batchNum; i++) {
messageList.add(new Message(topic, RandomUtils.getStringByUUID().getBytes()));
}
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
SendResult sendResult = producer.send(messageList);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
String[] offsetIds = sendResult.getOffsetMsgId().split(",");
String[] msgIds = sendResult.getMsgId().split(",");
Assert.assertEquals(messageList.size(), offsetIds.length);
Assert.assertEquals(messageList.size(), msgIds.length);
Thread.sleep(2000);
for (int i = 0; i < 3; i++) {
producer.viewMessage(offsetIds[random.nextInt(batchNum)]);
}
for (int i = 0; i < 3; i++) {
producer.viewMessage(topic, msgIds[random.nextInt(batchNum)]);
}
}
@Test
public void testBatchSend_CheckProperties() throws Exception {
List<Message> messageList = new ArrayList<>();
Message message = new Message();
message.setTopic(topic);
message.setKeys("keys123");
message.setTags("tags123");
message.setWaitStoreMsgOK(false);
message.setBuyerId("buyerid123");
message.setFlag(123);
message.setBody("body".getBytes());
messageList.add(message);
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
SendResult sendResult = producer.send(messageList);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
String[] offsetIds = sendResult.getOffsetMsgId().split(",");
String[] msgIds = sendResult.getMsgId().split(",");
Assert.assertEquals(messageList.size(), offsetIds.length);
Assert.assertEquals(messageList.size(), msgIds.length);
Thread.sleep(2000);
Message messageByOffset = producer.viewMessage(offsetIds[0]);
Message messageByMsgId = producer.viewMessage(topic, msgIds[0]);
System.out.println(messageByOffset);
System.out.println(messageByMsgId);
Assert.assertEquals(message.getTopic(), messageByMsgId.getTopic());
Assert.assertEquals(message.getTopic(), messageByOffset.getTopic());
Assert.assertEquals(message.getKeys(), messageByOffset.getKeys());
Assert.assertEquals(message.getKeys(), messageByMsgId.getKeys());
Assert.assertEquals(message.getTags(), messageByOffset.getTags());
Assert.assertEquals(message.getTags(), messageByMsgId.getTags());
Assert.assertEquals(message.isWaitStoreMsgOK(), messageByOffset.isWaitStoreMsgOK());
Assert.assertEquals(message.isWaitStoreMsgOK(), messageByMsgId.isWaitStoreMsgOK());
Assert.assertEquals(message.getBuyerId(), messageByOffset.getBuyerId());
Assert.assertEquals(message.getBuyerId(), messageByMsgId.getBuyerId());
Assert.assertEquals(message.getFlag(), messageByOffset.getFlag());
Assert.assertEquals(message.getFlag(), messageByMsgId.getFlag());
}
}
...@@ -61,7 +61,7 @@ public class MessageExceptionIT extends BaseConf { ...@@ -61,7 +61,7 @@ public class MessageExceptionIT extends BaseConf {
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
public void testSynSendNullMessage() throws Exception { public void testSynSendNullMessage() throws Exception {
producer.send(null); producer.send((Message) null);
} }
@Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册