diff --git a/docs/en/Example_Orderly.md b/docs/en/Example_Orderly.md new file mode 100644 index 0000000000000000000000000000000000000000..dbb02cf6680e1ae4f092a8abd084ad06893a112f --- /dev/null +++ b/docs/en/Example_Orderly.md @@ -0,0 +1,232 @@ +# 2 Example for ordered messages + +RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner. + +The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process. + +## 2.1 produce ordered messages +``` +package org.apache.rocketmq.example.order2 + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/* +* ordered messages producer +*/ +public class Producer { + + public static void main(String[] args) throws Exception { + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + String[] tags = new String[]{"TagA", "TagC", "TagD"}; + // sales orders list + List orderList = new Producer().buildOrders(); + + Date date = new Date(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String dateStr = sdf.format(date); + + for (int i = 0; i < 10; i++) { + // generate message timestamp + String body = dateStr + " Hello RocketMQ " + orderList.get(i); + Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes()); + + SendResult sendResult = producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Long id = (Long) arg; //message queue is selected by #salesOrderID + long index = id % mqs.size(); + return mqs.get((int) index); + } + }, orderList.get(i).getOrderId()); + + System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", + sendResult.getSendStatus(), + sendResult.getMessageQueue().getQueueId(), + body)); + } + + producer.shutdown(); + } + + /** + * each sales order step + */ + private static class OrderStep { + private long orderId; + private String desc; + + public long getOrderId() { + return orderId; + } + + public void setOrderId(long orderId) { + this.orderId = orderId; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + @Override + public String toString() { + return "OrderStep{" + + "orderId=" + orderId + + ", desc='" + desc + '\'' + + '}'; + } + } + + /** + * to generate ten OrderStep objects for three sales orders: + * #SalesOrder "15103111039L": create, pay, send, finish; + * #SalesOrder "15103111065L": create, pay, finish; + * #SalesOrder "15103117235L": create, pay, finish; + */ + private List buildOrders() { + + List orderList = new ArrayList(); + + //create sales order with orderid="15103111039L" + OrderStep orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111039L); + orderDemo.setDesc("create"); + orderList.add(orderDemo); + + //create sales order with orderid="15103111065L" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111065L); + orderDemo.setDesc("create"); + orderList.add(orderDemo); + + //pay sales order #"15103111039L" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111039L); + orderDemo.setDesc("pay"); + orderList.add(orderDemo); + + //create sales order with orderid="15103117235L" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103117235L); + orderDemo.setDesc("create"); + orderList.add(orderDemo); + + //pay sales order #"15103111065L" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111065L); + orderDemo.setDesc("pay"); + orderList.add(orderDemo); + + //pay sales order #"15103117235L" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103117235L); + orderDemo.setDesc("pay"); + orderList.add(orderDemo); + + //mark sales order #"15103111065L" as "finish" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111065L); + orderDemo.setDesc("finish"); + orderList.add(orderDemo); + + //mark mark sales order #"15103111039L" as "send" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111039L); + orderDemo.setDesc("send"); + orderList.add(orderDemo); + + ////mark sales order #"15103117235L" as "finish" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103117235L); + orderDemo.setDesc("finish"); + orderList.add(orderDemo); + + //mark sales order #"15103111039L" as "finish" + orderDemo = new OrderStep(); + orderDemo.setOrderId(15103111039L); + orderDemo.setDesc("finish"); + orderList.add(orderDemo); + + return orderList; + } +} + +``` + +## 2.2 Consume ordered messages + +``` + +package org.apache.rocketmq.example.order2; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * consume messages in order + */ +public class ConsumerInOrder { + + public static void main(String[] args) throws Exception { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); + consumer.setNamesrvAddr("127.0.0.1:9876"); + /** + * when the consumer is first run, the start point of message queue where it can get messages will be set. + * or if it is restarted, it will continue from the last place to get messages. + */ + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + + consumer.registerMessageListener(new MessageListenerOrderly() { + + Random random = new Random(); + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + context.setAutoCommit(true); + for (MessageExt msg : msgs) { + // one consumer for each message queue, and messages order are kept in a single message queue. + System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); + } + + try { + TimeUnit.SECONDS.sleep(random.nextInt(10)); + } catch (Exception e) { + e.printStackTrace(); + } + return ConsumeOrderlyStatus.SUCCESS; + } + }); + + consumer.start(); + + System.out.println("Consumer Started."); + } +} + +``` + +