From 3f48c174343223ec0dc5258595c19c8291d5e21d Mon Sep 17 00:00:00 2001 From: AVP42 <36760766+AVP42@users.noreply.github.com> Date: Mon, 19 Oct 2020 08:48:45 +0800 Subject: [PATCH] [ISSUE# 2330] Store the properties of MessageBatch (#2343) * [ISSUE# 2330]store the properties of MessageBatch * [ISSUE# 2330]fix style problem * [ISSUE# 2330]Undo newly added property in the MessageBatch Co-authored-by: wufc --- .../org/apache/rocketmq/store/CommitLog.java | 16 +++++++++++++--- .../rocketmq/store/BatchPutMessageTest.java | 10 +++++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index b6d17daa..d489e843 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1795,6 +1795,11 @@ public class CommitLog { ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); + // properties from MessageExtBatch + String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties()); + final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8); + final short batchPropLen = (short) batchPropData.length; + while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE messagesByteBuff.getInt(); @@ -1818,7 +1823,8 @@ public class CommitLog { final int topicLength = topicData.length; - final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen); + final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, + propertiesLen + batchPropLen); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { @@ -1871,9 +1877,13 @@ public class CommitLog { this.msgBatchMemory.put((byte) topicLength); this.msgBatchMemory.put(topicData); // 17 PROPERTIES - this.msgBatchMemory.putShort(propertiesLen); - if (propertiesLen > 0) + this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen)); + if (propertiesLen > 0) { this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); + } + if (batchPropLen > 0) { + this.msgBatchMemory.put(batchPropData, 0, batchPropLen); + } } msgBatchMemory.flip(); return msgBatchMemory; diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index 8618dbb0..b3a7c196 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -35,6 +35,7 @@ import java.io.File; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,6 +81,12 @@ public class BatchPutMessageTest { @Test public void testPutMessages() throws Exception { + String batchPropK = "extraKey"; + String batchPropV = "extraValue"; + Map batchProp = new HashMap<>(1); + batchProp.put(batchPropK, batchPropV); + short batchPropLen = (short) messageProperties2String(batchProp).getBytes(MessageDecoder.CHARSET_UTF8).length; + List messages = new ArrayList<>(); String topic = "batch-write-topic"; int queue = 0; @@ -98,7 +105,7 @@ public class BatchPutMessageTest { short propertiesLength = (short) propertiesBytes.length; final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; - msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength) + msgLengthArr[j - 1]; + msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen) + msgLengthArr[j - 1]; j++; } byte[] batchMessageBody = MessageDecoder.encodeMessages(messages); @@ -106,6 +113,7 @@ public class BatchPutMessageTest { messageExtBatch.setTopic(topic); messageExtBatch.setQueueId(queue); messageExtBatch.setBody(batchMessageBody); + messageExtBatch.putUserProperty(batchPropK,batchPropV); messageExtBatch.setBornTimestamp(System.currentTimeMillis()); messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 125)); messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 126)); -- GitLab