From de0be119f4116798c163b484efe4583464973507 Mon Sep 17 00:00:00 2001 From: "shutian.lzh" Date: Tue, 24 Apr 2018 11:10:12 +0800 Subject: [PATCH] Support scheduled message --- .../openmessaging/rocketmq/domain/RocketMQConstants.java | 7 +++++++ .../java/io/openmessaging/rocketmq/utils/OMSUtil.java | 8 ++++++++ 2 files changed, 15 insertions(+) create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java new file mode 100644 index 00000000..838bbf51 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java @@ -0,0 +1,7 @@ +package io.openmessaging.rocketmq.domain; + +public interface RocketMQConstants { + + String STARTDELIVERTIME = "__STARTDELIVERTIME"; + +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 0938b831..36f0656c 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -22,6 +22,7 @@ import io.openmessaging.Message.BuiltinKeys; import io.openmessaging.OMS; import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.RocketMQConstants; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; import java.util.Iterator; @@ -53,6 +54,13 @@ public class OMSUtil { //All destinations in RocketMQ use Topic rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION)); + if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { + long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); + if (deliverTime > 0) { + rmqMessage.putUserProperty(RocketMQConstants.STARTDELIVERTIME, String.valueOf(deliverTime)); + } + } + for (String key : userHeaders.keySet()) { MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); } -- GitLab