diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index 265c5c122fd98f95573dcd2e63c383a9b19477ab..565857a34779c126154694f0ac6b0881d60d5007 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -173,7 +173,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader); request.setBody(msg.getBody()); - String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO); + String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); PushReplyResult pushReplyResult = new PushReplyResult(false); if (senderId != null) { @@ -207,9 +207,9 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen log.warn(pushReplyResult.getRemark()); } } else { - log.warn("REPLY_TO is null, can not reply message"); + log.warn(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + " is null, can not reply message"); pushReplyResult.setPushOk(false); - pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO + "] is null"); + pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + "] is null"); } return pushReplyResult; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java index a28f78060168b82f3e6f9ac95163101ccad70b6c..94fd8f560d1ff0a22baf037122cfb441dd9e34ff 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java @@ -30,7 +30,6 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; @@ -58,7 +57,6 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -126,7 +124,7 @@ public class ReplyMessageProcessorTest { requestHeader.setFlag(124); requestHeader.setReconsumeTimes(0); Map map = new HashMap(); - map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "127.0.0.1"); + map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1"); requestHeader.setProperties(MessageDecoder.messageProperties2String(map)); return requestHeader; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index c1c37737d8bff2593d5c7cbc9a742e3e4725b0b6..f2cc605b972ce2344a8c30307d4994355ce9aa7c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1548,7 +1548,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { String correlationId = CorrelationIdUtil.createCorrelationId(); String requestClientId = this.getmQClientFactory().getClientId(); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO, requestClientId); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout)); boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic()); diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java index 626b4525bb2adf230d01da0e864822628bfc34ee..416ba44da3284eaa711a10a735b8b6edeb1e51f8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java +++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java @@ -29,7 +29,7 @@ public class MessageUtil { if (requestMessage != null) { Message replyMessage = new Message(); String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER); - String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); + String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID); String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL); replyMessage.setBody(body); @@ -38,7 +38,7 @@ public class MessageUtil { replyMessage.setTopic(replyTopic); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId); - MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl); return replyMessage; @@ -48,4 +48,8 @@ public class MessageUtil { } throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null."); } + + public static String getReplyToClient(final Message msg) { + return msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java index 7847e7ee3545702ed773af4e8c4db4ff0991a649..803e596fc829bc1cd666563f70bd35c7e824cc13 100644 --- a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java @@ -35,7 +35,7 @@ public class MessageUtilsTest { public void testCreateReplyMessage() throws MQClientException { Message msg = MessageUtil.createReplyMessage(createReplyMessage("clusterName"), new byte[] {'a'}); assertThat(msg.getTopic()).isEqualTo("clusterName" + "_" + MixAll.REPLY_TOPIC_POSTFIX); - assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO)).isEqualTo("127.0.0.1"); + assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT)).isEqualTo("127.0.0.1"); assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_TTL)).isEqualTo("3000"); } @@ -59,10 +59,18 @@ public class MessageUtilsTest { } } + @Test + public void testGetReplyToClient() throws MQClientException { + Message msg = createReplyMessage("clusterName"); + String replyToClient = MessageUtil.getReplyToClient(msg); + assertThat(replyToClient).isNotNull(); + assertThat(replyToClient).isEqualTo("127.0.0.1"); + } + private Message createReplyMessage(String clusterName) { Message requestMessage = new Message(); Map map = new HashMap(); - map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO, "127.0.0.1"); + map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1"); map.put(MessageConst.PROPERTY_CLUSTER, clusterName); map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000"); MessageAccessor.setProperties(requestMessage, map); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index f773c2b945fe8ad6baa4379ef42fbce7ffcee0aa..5bdc846562dc6d294f3a4e5cecc4d4bfd5a83099 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -46,7 +46,7 @@ public class MessageConst { public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS"; public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID"; public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID"; - public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; + public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT"; public static final String PROPERTY_MESSAGE_TTL = "TTL"; public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME"; public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME"; @@ -82,7 +82,7 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP); STRING_HASH_SET.add(PROPERTY_INSTANCE_ID); STRING_HASH_SET.add(PROPERTY_CORRELATION_ID); - STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO); + STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT); STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL); STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME); STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME); diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java index 39012c8b3f40594e4c33ff85940c843ee5681580..5a53538b54fa36c7a9d7711b5962ae85d475e5c9 100644 --- a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java @@ -57,7 +57,7 @@ public class ResponseConsumer { for (MessageExt msg : msgs) { try { System.out.printf("handle message: %s", msg.toString()); - String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); + String replyTo = MessageUtil.getReplyToClient(msg); byte[] replyContent = "reply message contents.".getBytes(); // create reply message with given util, do not create reply message by yourself Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);