diff --git a/docs/cn/Example_Batch.md b/docs/cn/Example_Batch.md new file mode 100644 index 0000000000000000000000000000000000000000..6c8897fa6bcfa790a66234ab257751fce1f34e0e --- /dev/null +++ b/docs/cn/Example_Batch.md @@ -0,0 +1,82 @@ +# 批量消息发送 +批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。 + +### 1 发送批量消息 +如果你一次只发送不超过 4MiB 的消息,使用批处理很容易: +```java +String topic = "BatchTest"; +List messages = new ArrayList<>(); +messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); +messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); +messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); +try { + producer.send(messages); +} catch (Exception e) { + e.printStackTrace(); + //handle the error +} +``` +### 2 拆分 +当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息: + +```java +public class ListSplitter implements Iterator> { + private final int SIZE_LIMIT = 1024 * 1024 * 4; + private final List messages; + private int currIndex; + public ListSplitter(List messages) { + this.messages = messages; + } + @Override public boolean hasNext() { + return currIndex < messages.size(); + } + @Override public List next() { + int startIndex = getStartIndex(); + int nextIndex = startIndex; + int totalSize = 0; + for (; nextIndex < messages.size(); nextIndex++) { + Message message = messages.get(nextIndex); + int tmpSize = calcMessageSize(message); + if (tmpSize + totalSize > SIZE_LIMIT) { + break; + } else { + totalSize += tmpSize; + } + } + List subList = messages.subList(startIndex, nextIndex); + currIndex = nextIndex; + return subList; + } + private int getStartIndex() { + Message currMessage = messages.get(currIndex); + int tmpSize = calcMessageSize(currMessage); + while(tmpSize > SIZE_LIMIT) { + currIndex += 1; + Message message = messages.get(curIndex); + tmpSize = calcMessageSize(message); + } + return currIndex; + } + private int calcMessageSize(Message message) { + int tmpSize = message.getTopic().length() + message.getBody().length(); + Map properties = message.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes + return tmpSize; + } +} + +// then you could split the large list into small ones: +ListSplitter splitter = new ListSplitter(messages); +while (splitter.hasNext()) { + try { + List listItem = splitter.next(); + producer.send(listItem); + } catch (Exception e) { + e.printStackTrace(); + // handle the error + } +} +``` \ No newline at end of file diff --git a/docs/cn/Example_Delay.md b/docs/cn/Example_Delay.md new file mode 100644 index 0000000000000000000000000000000000000000..31df40f4485e6015bbc1bc272d05f474d2837e59 --- /dev/null +++ b/docs/cn/Example_Delay.md @@ -0,0 +1,85 @@ +# Schedule example + +### 1 启动消费者等待传入的订阅消息 + +```java +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import java.util.List; + +public class ScheduledMessageConsumer { + + public static void main(String[] args) throws Exception { + // Instantiate message consumer + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); + // Subscribe topics + consumer.subscribe("TestTopic", "*"); + // Register message listener + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) { + for (MessageExt message : messages) { + // Print approximate delay time period + System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + // Launch consumer + consumer.start(); + } +} +``` + +### 2 发送延迟消息 + +```java +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +public class ScheduledMessageProducer { + + public static void main(String[] args) throws Exception { + // Instantiate a producer to send scheduled messages + DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); + // Launch producer + producer.start(); + int totalMessagesToSend = 100; + for (int i = 0; i < totalMessagesToSend; i++) { + Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); + // This message will be delivered to consumer 10 seconds later. + message.setDelayTimeLevel(3); + // Send the message + producer.send(message); + } + + // Shutdown producer after use. + producer.shutdown(); + } + +} +``` + +### 3 确认 + +您应该会看到消息在其存储时间后大约 10 秒被消耗。 + +### 4 延迟消息的使用场景 + +例如在电子商务中,如果提交订单,可以发送延迟消息,1小时后可以查看订单状态。 如果订单仍未付款,则可以取消订单并释放库存。 + +### 5 使用延迟消息的限制 + +```java +// org/apache/rocketmq/store/config/MessageStoreConfig.java + +private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; +``` + +当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。 + + See `SendMessageProcessor.java`