提交 2ee02595 编写于 作者: 武汉红喜's avatar 武汉红喜

delay message

上级 11726b5a
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.starter.enums.MessageDelayLevel;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
......@@ -120,7 +121,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return syncSend(destination, payload, producer.getSendMsgTimeout());
}
public SendResult sendDelayed(String destination, Message<?> message, int delayLevel) {
public SendResult sendDelayed(String destination, Message<?> message, MessageDelayLevel delayLevel) {
return sendDelayed(destination, message, producer.getSendMsgTimeout(), delayLevel);
}
......@@ -130,27 +131,27 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param timeout send timeout with millis
* @param delayLevel level for the delay message
* @param delayLevel level for the delay message {@link MessageDelayLevel}
* @return {@link SendResult}
*/
public SendResult sendDelayed(String destination, Message<?> message, long timeout, int delayLevel) {
public SendResult sendDelayed(String destination, Message<?> message, long timeout, MessageDelayLevel delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSend failed. destination:{}, message is null ", destination);
log.error("sendDelayed failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
if (delayLevel.level > 0) {
rocketMsg.setDelayTimeLevel(delayLevel.level);
}
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
log.error("sendDelayed failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
......
package org.apache.rocketmq.spring.starter.enums;
public enum MessageDelayLevel {
TIME_1S(1),
TIME_5S(2),
TIME_10S(3),
TIME_30S(4),
TIME_1M(5),
TIME_2M(6),
TIME_3M(7),
TIME_4M(8),
TIME_5M(9),
TIME_6M(10),
TIME_7M(11),
TIME_8M(12),
TIME_9M(13),
TIME_10M(14),
TIME_20M(15),
TIME_30M(16),
TIME_1H(17),
TIME_2H(18),
// 新增延迟类型需要在broker配置增加messageDelayLevel
TIME_6H(19),
TIME_12H(20);
public int level;
MessageDelayLevel(int level) {
this.level = level;
}
}
\ No newline at end of file
package org.hongxi.whatsmars.rocketmq.boot.producer;
import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
import org.apache.rocketmq.spring.starter.enums.MessageDelayLevel;
import org.hongxi.whatsmars.rocketmq.boot.OrderPaidEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
......@@ -25,7 +26,7 @@ public class ProducerApplication implements CommandLineRunner {
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
rocketMQTemplate.sendDelayed("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build(), 2);
rocketMQTemplate.sendDelayed("test-topic-1", MessageBuilder.withPayload("I'm delayed message").build(), MessageDelayLevel.TIME_1M.level);
} catch (Exception e) {
e.printStackTrace();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册