diff --git a/docs/cn/Example_Batch.md b/docs/cn/Example_Batch.md new file mode 100644 index 0000000000000000000000000000000000000000..6c8897fa6bcfa790a66234ab257751fce1f34e0e --- /dev/null +++ b/docs/cn/Example_Batch.md @@ -0,0 +1,82 @@ +# 批量消息发送 +批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。 + +### 1 发送批量消息 +如果你一次只发送不超过 4MiB 的消息,使用批处理很容易: +```java +String topic = "BatchTest"; +List messages = new ArrayList<>(); +messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); +messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); +messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); +try { + producer.send(messages); +} catch (Exception e) { + e.printStackTrace(); + //handle the error +} +``` +### 2 拆分 +当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息: + +```java +public class ListSplitter implements Iterator> { + private final int SIZE_LIMIT = 1024 * 1024 * 4; + private final List messages; + private int currIndex; + public ListSplitter(List messages) { + this.messages = messages; + } + @Override public boolean hasNext() { + return currIndex < messages.size(); + } + @Override public List next() { + int startIndex = getStartIndex(); + int nextIndex = startIndex; + int totalSize = 0; + for (; nextIndex < messages.size(); nextIndex++) { + Message message = messages.get(nextIndex); + int tmpSize = calcMessageSize(message); + if (tmpSize + totalSize > SIZE_LIMIT) { + break; + } else { + totalSize += tmpSize; + } + } + List subList = messages.subList(startIndex, nextIndex); + currIndex = nextIndex; + return subList; + } + private int getStartIndex() { + Message currMessage = messages.get(currIndex); + int tmpSize = calcMessageSize(currMessage); + while(tmpSize > SIZE_LIMIT) { + currIndex += 1; + Message message = messages.get(curIndex); + tmpSize = calcMessageSize(message); + } + return currIndex; + } + private int calcMessageSize(Message message) { + int tmpSize = message.getTopic().length() + message.getBody().length(); + Map properties = message.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes + return tmpSize; + } +} + +// then you could split the large list into small ones: +ListSplitter splitter = new ListSplitter(messages); +while (splitter.hasNext()) { + try { + List listItem = splitter.next(); + producer.send(listItem); + } catch (Exception e) { + e.printStackTrace(); + // handle the error + } +} +``` \ No newline at end of file