diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 1917d27cf155caad84d302f7efca504d65d130b8..231ac0c68f3e26d099de8318c62741d320e7cb57 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -104,6 +104,7 @@ public class PullAPIWrapper { Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); + msg.setBrokerName(mq.getBrokerName()); } pullResultExt.setMsgFoundList(msgListFilterAgain); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java index 20cb0572717ca138a26ca5d157bbe295dae98fc4..7f351725348a7f18b9c1b5b91bb18977142753a4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag; public class MessageExt extends Message { private static final long serialVersionUID = 5720810158625748049L; + private String brokerName; + private int queueId; private int storeSize; @@ -107,6 +109,14 @@ public class MessageExt extends Message { return socketAddress2ByteBuffer(this.storeHost, byteBuffer); } + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + public int getQueueId() { return queueId; } @@ -235,7 +245,7 @@ public class MessageExt extends Message { @Override public String toString() { - return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset + return "MessageExt [brokerName=" + brokerName + ", queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="