diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 801d886c43be168a4bb37dabbf32d0900f97fe12..dac4c27856c7064863c20cdaada265ed37012fe0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -281,6 +281,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); + String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + CompletableFuture putMessageResult = null; Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);