diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 8b3caa7e126a85c626e2ca3b46d9f9f283ae6b24..a76c144c08ed4109aed7b8470265750c7ee28981 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -280,18 +280,26 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); - MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); - msgInner.setPropertiesString(requestHeader.getProperties()); + Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); + MessageAccessor.setProperties(msgInner, origProps); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { + // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. + // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. + String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later + origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); + } else { + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + } CompletableFuture putMessageResult = null; - Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 2936c18eac9d45ff6c34cfa394d150701ed8cee3..c94700e5feb55485105b02f23a81edfb70d90376 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -409,7 +409,23 @@ public class MessageDecoder { } public static String messageProperties2String(Map properties) { - StringBuilder sb = new StringBuilder(); + if (properties == null) { + return ""; + } + int len = 0; + for (final Map.Entry entry : properties.entrySet()) { + final String name = entry.getKey(); + final String value = entry.getValue(); + if (value == null) { + continue; + } + if (name != null) { + len += name.length(); + } + len += value.length(); + len += 2; // separator + } + StringBuilder sb = new StringBuilder(len); if (properties != null) { for (final Map.Entry entry : properties.entrySet()) { final String name = entry.getKey(); @@ -423,6 +439,9 @@ public class MessageDecoder { sb.append(value); sb.append(PROPERTY_SEPARATOR); } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } } return sb.toString(); } @@ -430,12 +449,22 @@ public class MessageDecoder { public static Map string2messageProperties(final String properties) { Map map = new HashMap(); if (properties != null) { - String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); - for (String i : items) { - String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); - if (2 == nv.length) { - map.put(nv[0], nv[1]); + int len = properties.length(); + int index = 0; + while (index < len) { + int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index); + if (newIndex < 0) { + newIndex = len; + } + if (newIndex - index >= 3) { + int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, index); + if (kvSepIndex > index && kvSepIndex < newIndex - 1) { + String k = properties.substring(index, kvSepIndex); + String v = properties.substring(kvSepIndex + 1, newIndex); + map.put(k, v); + } } + index = newIndex + 1; } } diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java index fde523dba95344b177cb5426ec852e326bec84e4..b27f24669aa940ea7f97c1c8a4e7477f745b32ee 100644 --- a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java @@ -25,6 +25,8 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Map; +import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; import static org.apache.rocketmq.common.message.MessageDecoder.createMessageId; import static org.assertj.core.api.Assertions.assertThat; @@ -265,4 +267,110 @@ public class MessageDecoderTest { } } + @Test + public void testString2messageProperties() { + StringBuilder sb = new StringBuilder(); + sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1"); + Map m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("k1")).isEqualTo("v1"); + + m = MessageDecoder.string2messageProperties(""); + assertThat(m).size().isEqualTo(0); + + m = MessageDecoder.string2messageProperties(" "); + assertThat(m).size().isEqualTo(0); + + m = MessageDecoder.string2messageProperties("aaa"); + assertThat(m).size().isEqualTo(0); + + sb.setLength(0); + sb.append("k1").append(NAME_VALUE_SEPARATOR); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(0); + + sb.setLength(0); + sb.append(NAME_VALUE_SEPARATOR).append("v1"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(0); + + sb.setLength(0); + sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("k1")).isEqualTo("v1"); + + sb.setLength(0); + sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR) + .append("k2").append(NAME_VALUE_SEPARATOR).append("v2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(2); + assertThat(m.get("k1")).isEqualTo("v1"); + assertThat(m.get("k2")).isEqualTo("v2"); + + sb.setLength(0); + sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR) + .append(NAME_VALUE_SEPARATOR).append("v2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("k1")).isEqualTo("v1"); + + sb.setLength(0); + sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR) + .append("k2").append(NAME_VALUE_SEPARATOR); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("k1")).isEqualTo("v1"); + + sb.setLength(0); + sb.append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR) + .append("k2").append(NAME_VALUE_SEPARATOR).append("v2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("k2")).isEqualTo("v2"); + + sb.setLength(0); + sb.append("k1").append(NAME_VALUE_SEPARATOR).append(PROPERTY_SEPARATOR) + .append("k2").append(NAME_VALUE_SEPARATOR).append("v2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("k2")).isEqualTo("v2"); + + sb.setLength(0); + sb.append("1").append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR) + .append("2").append(NAME_VALUE_SEPARATOR).append("2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(2); + assertThat(m.get("1")).isEqualTo("1"); + assertThat(m.get("2")).isEqualTo("2"); + + sb.setLength(0); + sb.append("1").append(NAME_VALUE_SEPARATOR).append(PROPERTY_SEPARATOR) + .append("2").append(NAME_VALUE_SEPARATOR).append("2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("2")).isEqualTo("2"); + + sb.setLength(0); + sb.append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR) + .append("2").append(NAME_VALUE_SEPARATOR).append("2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("2")).isEqualTo("2"); + + sb.setLength(0); + sb.append("1").append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR) + .append("2").append(NAME_VALUE_SEPARATOR); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("1")).isEqualTo("1"); + + sb.setLength(0); + sb.append("1").append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR) + .append(NAME_VALUE_SEPARATOR).append("2"); + m = MessageDecoder.string2messageProperties(sb.toString()); + assertThat(m).size().isEqualTo(1); + assertThat(m.get("1")).isEqualTo("1"); + } + } \ No newline at end of file diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java index b3a7c196f94996e3d967d2efaad89a416f3eeaaa..2c1fd25f01c7ca695955c22532bd7a8025ed2b41 100644 --- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -39,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; @@ -230,22 +230,6 @@ public class BatchPutMessageTest { return msgLen; } - public String messageProperties2String(Map properties) { - StringBuilder sb = new StringBuilder(); - if (properties != null) { - for (final Map.Entry entry : properties.entrySet()) { - final String name = entry.getKey(); - final String value = entry.getValue(); - - sb.append(name); - sb.append(NAME_VALUE_SEPARATOR); - sb.append(value); - sb.append(PROPERTY_SEPARATOR); - } - } - return sb.toString(); - } - private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,