提交 de0be119 编写于 作者: S shutian.lzh

Support scheduled message

上级 75358894
package io.openmessaging.rocketmq.domain;
public interface RocketMQConstants {
String STARTDELIVERTIME = "__STARTDELIVERTIME";
}
......@@ -22,6 +22,7 @@ import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
......@@ -53,6 +54,13 @@ public class OMSUtil {
//All destinations in RocketMQ use Topic
rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
if (deliverTime > 0) {
rmqMessage.putUserProperty(RocketMQConstants.STARTDELIVERTIME, String.valueOf(deliverTime));
}
}
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册