未验证 提交 3f48c174 编写于 作者: A AVP42 提交者: GitHub

[ISSUE# 2330] Store the properties of MessageBatch (#2343)

* [ISSUE# 2330]store the properties of MessageBatch

* [ISSUE# 2330]fix style problem

* [ISSUE# 2330]Undo newly added property in the MessageBatch
Co-authored-by: Nwufc <wufc@viomi.com>
上级 d5cb67ff
......@@ -1795,6 +1795,11 @@ public class CommitLog {
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
// properties from MessageExtBatch
String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
final short batchPropLen = (short) batchPropData.length;
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
messagesByteBuff.getInt();
......@@ -1818,7 +1823,8 @@ public class CommitLog {
final int topicLength = topicData.length;
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
propertiesLen + batchPropLen);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
......@@ -1871,9 +1877,13 @@ public class CommitLog {
this.msgBatchMemory.put((byte) topicLength);
this.msgBatchMemory.put(topicData);
// 17 PROPERTIES
this.msgBatchMemory.putShort(propertiesLen);
if (propertiesLen > 0)
this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen));
if (propertiesLen > 0) {
this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
if (batchPropLen > 0) {
this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
}
}
msgBatchMemory.flip();
return msgBatchMemory;
......
......@@ -35,6 +35,7 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -80,6 +81,12 @@ public class BatchPutMessageTest {
@Test
public void testPutMessages() throws Exception {
String batchPropK = "extraKey";
String batchPropV = "extraValue";
Map<String, String> batchProp = new HashMap<>(1);
batchProp.put(batchPropK, batchPropV);
short batchPropLen = (short) messageProperties2String(batchProp).getBytes(MessageDecoder.CHARSET_UTF8).length;
List<Message> messages = new ArrayList<>();
String topic = "batch-write-topic";
int queue = 0;
......@@ -98,7 +105,7 @@ public class BatchPutMessageTest {
short propertiesLength = (short) propertiesBytes.length;
final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength) + msgLengthArr[j - 1];
msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen) + msgLengthArr[j - 1];
j++;
}
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
......@@ -106,6 +113,7 @@ public class BatchPutMessageTest {
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(queue);
messageExtBatch.setBody(batchMessageBody);
messageExtBatch.putUserProperty(batchPropK,batchPropV);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 125));
messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 126));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册