未验证 提交 2509b0ed 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2961 from areyouok/dev_speed_F

[ISSUE #2883] [Part F] Improve produce performance in M/S mode.
...@@ -280,18 +280,26 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -280,18 +280,26 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBody(body); msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag()); msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
msgInner.setPropertiesString(requestHeader.getProperties()); MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost()); msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
} else {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
CompletableFuture<PutMessageResult> putMessageResult = null; CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
......
...@@ -409,7 +409,23 @@ public class MessageDecoder { ...@@ -409,7 +409,23 @@ public class MessageDecoder {
} }
public static String messageProperties2String(Map<String, String> properties) { public static String messageProperties2String(Map<String, String> properties) {
StringBuilder sb = new StringBuilder(); if (properties == null) {
return "";
}
int len = 0;
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String name = entry.getKey();
final String value = entry.getValue();
if (value == null) {
continue;
}
if (name != null) {
len += name.length();
}
len += value.length();
len += 2; // separator
}
StringBuilder sb = new StringBuilder(len);
if (properties != null) { if (properties != null) {
for (final Map.Entry<String, String> entry : properties.entrySet()) { for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String name = entry.getKey(); final String name = entry.getKey();
...@@ -423,6 +439,9 @@ public class MessageDecoder { ...@@ -423,6 +439,9 @@ public class MessageDecoder {
sb.append(value); sb.append(value);
sb.append(PROPERTY_SEPARATOR); sb.append(PROPERTY_SEPARATOR);
} }
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
} }
return sb.toString(); return sb.toString();
} }
...@@ -430,12 +449,22 @@ public class MessageDecoder { ...@@ -430,12 +449,22 @@ public class MessageDecoder {
public static Map<String, String> string2messageProperties(final String properties) { public static Map<String, String> string2messageProperties(final String properties) {
Map<String, String> map = new HashMap<String, String>(); Map<String, String> map = new HashMap<String, String>();
if (properties != null) { if (properties != null) {
String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); int len = properties.length();
for (String i : items) { int index = 0;
String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); while (index < len) {
if (2 == nv.length) { int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index);
map.put(nv[0], nv[1]); if (newIndex < 0) {
newIndex = len;
}
if (newIndex - index >= 3) {
int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, index);
if (kvSepIndex > index && kvSepIndex < newIndex - 1) {
String k = properties.substring(index, kvSepIndex);
String v = properties.substring(kvSepIndex + 1, newIndex);
map.put(k, v);
}
} }
index = newIndex + 1;
} }
} }
......
...@@ -25,6 +25,8 @@ import java.net.UnknownHostException; ...@@ -25,6 +25,8 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.createMessageId; import static org.apache.rocketmq.common.message.MessageDecoder.createMessageId;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -265,4 +267,110 @@ public class MessageDecoderTest { ...@@ -265,4 +267,110 @@ public class MessageDecoderTest {
} }
} }
@Test
public void testString2messageProperties() {
StringBuilder sb = new StringBuilder();
sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1");
Map<String,String> m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("k1")).isEqualTo("v1");
m = MessageDecoder.string2messageProperties("");
assertThat(m).size().isEqualTo(0);
m = MessageDecoder.string2messageProperties(" ");
assertThat(m).size().isEqualTo(0);
m = MessageDecoder.string2messageProperties("aaa");
assertThat(m).size().isEqualTo(0);
sb.setLength(0);
sb.append("k1").append(NAME_VALUE_SEPARATOR);
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(0);
sb.setLength(0);
sb.append(NAME_VALUE_SEPARATOR).append("v1");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(0);
sb.setLength(0);
sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR);
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("k1")).isEqualTo("v1");
sb.setLength(0);
sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR)
.append("k2").append(NAME_VALUE_SEPARATOR).append("v2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(2);
assertThat(m.get("k1")).isEqualTo("v1");
assertThat(m.get("k2")).isEqualTo("v2");
sb.setLength(0);
sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR)
.append(NAME_VALUE_SEPARATOR).append("v2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("k1")).isEqualTo("v1");
sb.setLength(0);
sb.append("k1").append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR)
.append("k2").append(NAME_VALUE_SEPARATOR);
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("k1")).isEqualTo("v1");
sb.setLength(0);
sb.append(NAME_VALUE_SEPARATOR).append("v1").append(PROPERTY_SEPARATOR)
.append("k2").append(NAME_VALUE_SEPARATOR).append("v2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("k2")).isEqualTo("v2");
sb.setLength(0);
sb.append("k1").append(NAME_VALUE_SEPARATOR).append(PROPERTY_SEPARATOR)
.append("k2").append(NAME_VALUE_SEPARATOR).append("v2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("k2")).isEqualTo("v2");
sb.setLength(0);
sb.append("1").append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR)
.append("2").append(NAME_VALUE_SEPARATOR).append("2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(2);
assertThat(m.get("1")).isEqualTo("1");
assertThat(m.get("2")).isEqualTo("2");
sb.setLength(0);
sb.append("1").append(NAME_VALUE_SEPARATOR).append(PROPERTY_SEPARATOR)
.append("2").append(NAME_VALUE_SEPARATOR).append("2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("2")).isEqualTo("2");
sb.setLength(0);
sb.append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR)
.append("2").append(NAME_VALUE_SEPARATOR).append("2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("2")).isEqualTo("2");
sb.setLength(0);
sb.append("1").append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR)
.append("2").append(NAME_VALUE_SEPARATOR);
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("1")).isEqualTo("1");
sb.setLength(0);
sb.append("1").append(NAME_VALUE_SEPARATOR).append("1").append(PROPERTY_SEPARATOR)
.append(NAME_VALUE_SEPARATOR).append("2");
m = MessageDecoder.string2messageProperties(sb.toString());
assertThat(m).size().isEqualTo(1);
assertThat(m.get("1")).isEqualTo("1");
}
} }
\ No newline at end of file
...@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.message.Message; ...@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
...@@ -39,6 +38,7 @@ import java.util.HashMap; ...@@ -39,6 +38,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
...@@ -230,22 +230,6 @@ public class BatchPutMessageTest { ...@@ -230,22 +230,6 @@ public class BatchPutMessageTest {
return msgLen; return msgLen;
} }
public String messageProperties2String(Map<String, String> properties) {
StringBuilder sb = new StringBuilder();
if (properties != null) {
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String name = entry.getKey();
final String value = entry.getValue();
sb.append(name);
sb.append(NAME_VALUE_SEPARATOR);
sb.append(value);
sb.append(PROPERTY_SEPARATOR);
}
}
return sb.toString();
}
private class MyMessageArrivingListener implements MessageArrivingListener { private class MyMessageArrivingListener implements MessageArrivingListener {
@Override @Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册