diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..838bbf5176a4c6fd41ff43b020bbf296fef8eacf --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java @@ -0,0 +1,7 @@ +package io.openmessaging.rocketmq.domain; + +public interface RocketMQConstants { + + String STARTDELIVERTIME = "__STARTDELIVERTIME"; + +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 0938b831fbc594faf5baa1ea7a246687804068c8..36f0656caadb99ae19f187f69f5d9dea9c1c79fb 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -22,6 +22,7 @@ import io.openmessaging.Message.BuiltinKeys; import io.openmessaging.OMS; import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.RocketMQConstants; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; import java.util.Iterator; @@ -53,6 +54,13 @@ public class OMSUtil { //All destinations in RocketMQ use Topic rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION)); + if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { + long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); + if (deliverTime > 0) { + rmqMessage.putUserProperty(RocketMQConstants.STARTDELIVERTIME, String.valueOf(deliverTime)); + } + } + for (String key : userHeaders.keySet()) { MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); }