提交 b1f954df 编写于 作者: 武汉红喜's avatar 武汉红喜

RocketMQTemplate send delay message

上级 f0d8f6b8
......@@ -120,6 +120,37 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return syncSend(destination, payload, producer.getSendMsgTimeout());
}
/**
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message}
* @param timeout send timeout with millis
* @param delayLevel level for the delay message
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = convertToRocketMsg(destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}
/**
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册