From deaece56941b1df36605a924ebc3e9627dc35f5c Mon Sep 17 00:00:00 2001 From: ThailandKing <38276808+ThailandKing@users.noreply.github.com> Date: Mon, 18 Feb 2019 19:37:19 +0800 Subject: [PATCH] [RIP-9] Add a delay example in RocketMQ [RIP-9] Add a delay example in RocketMQ --- docs/en/Example_Delay.md | 85 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 docs/en/Example_Delay.md diff --git a/docs/en/Example_Delay.md b/docs/en/Example_Delay.md new file mode 100644 index 00000000..d59c2c58 --- /dev/null +++ b/docs/en/Example_Delay.md @@ -0,0 +1,85 @@ +# Schedule example + +### 1、Start consumer to wait for incoming subscribed messages + +```java +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import java.util.List; + +public class ScheduledMessageConsumer { + + public static void main(String[] args) throws Exception { + // Instantiate message consumer + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); + // Subscribe topics + consumer.subscribe("TestTopic", "*"); + // Register message listener + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) { + for (MessageExt message : messages) { + // Print approximate delay time period + System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + // Launch consumer + consumer.start(); + } +} +``` + +### 2、Send scheduled messages + +```java +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +public class ScheduledMessageProducer { + + public static void main(String[] args) throws Exception { + // Instantiate a producer to send scheduled messages + DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); + // Launch producer + producer.start(); + int totalMessagesToSend = 100; + for (int i = 0; i < totalMessagesToSend; i++) { + Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); + // This message will be delivered to consumer 10 seconds later. + message.setDelayTimeLevel(3); + // Send the message + producer.send(message); + } + + // Shutdown producer after use. + producer.shutdown(); + } + +} +``` + +### 3、Verification + +You should see messages are consumed about 10 seconds later than their storing time. + +### 4、Use scenarios for scheduled messages + +For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released. + +### 5、Restrictions on the use of scheduled messages + +```java +// org/apache/rocketmq/store/config/MessageStoreConfig.java + +private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; +``` + +Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries. + + See `SendMessageProcessor.java` \ No newline at end of file -- GitLab