From 92b8cf9738aa898032bf33e59065686e029779c2 Mon Sep 17 00:00:00 2001 From: MountainOne <593591605@qq.com> Date: Mon, 30 Mar 2020 17:50:24 +0800 Subject: [PATCH] [ISSUE #1505]Fix the ListSplitter of batch sample in docs --- docs/cn/RocketMQ_Example.md | 88 +++++++++++++++++++------------------ docs/en/Example_Batch.md | 59 +++++++++++++------------ 2 files changed, 78 insertions(+), 69 deletions(-) diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md index 66980d57..9559a604 100644 --- a/docs/cn/RocketMQ_Example.md +++ b/docs/cn/RocketMQ_Example.md @@ -500,48 +500,52 @@ try { 复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下: ```java - -public class ListSplitter implements Iterator> { - private final int SIZE_LIMIT = 1024 * 1024 * 4; - private final List messages; - private int currIndex; - public ListSplitter(List messages) { - this.messages = messages; - } - @Override public boolean hasNext() { - return currIndex < messages.size(); - } - @Override public List next() { - int nextIndex = currIndex; - int totalSize = 0; - for (; nextIndex < messages.size(); nextIndex++) { - Message message = messages.get(nextIndex); - int tmpSize = message.getTopic().length() + message.getBody().length; - Map properties = message.getProperties(); - for (Map.Entry entry : properties.entrySet()) { - tmpSize += entry.getKey().length() + entry.getValue().length(); - } - tmpSize = tmpSize + 20; // 增加日志的开销20字节 - if (tmpSize > SIZE_LIMIT) { - //单个消息超过了最大的限制 - //忽略,否则会阻塞分裂的进程 - if (nextIndex - currIndex == 0) { - //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 - nextIndex++; - } - break; - } - if (tmpSize + totalSize > SIZE_LIMIT) { - break; - } else { - totalSize += tmpSize; - } - - } - List subList = messages.subList(currIndex, nextIndex); - currIndex = nextIndex; - return subList; - } +public class ListSplitter implements Iterator> { + private final int SIZE_LIMIT = 1024 * 1024 * 4; + private final List messages; + private int currIndex; + public ListSplitter(List messages) { + this.messages = messages; + } + @Override public boolean hasNext() { + return currIndex < messages.size(); + } + @Override public List next() { + int startIndex = getStartIndex(); + int nextIndex = startIndex; + int totalSize = 0; + for (; nextIndex < messages.size(); nextIndex++) { + Message message = messages.get(nextIndex); + int tmpSize = calcMessageSize(message); + if (tmpSize + totalSize > SIZE_LIMIT) { + break; + } else { + totalSize += tmpSize; + } + } + List subList = messages.subList(startIndex, nextIndex); + currIndex = nextIndex; + return subList; + } + private int getStartIndex() { + Message currMessage = messages.get(currIndex); + int tmpSize = calcMessageSize(currMessage); + while(tmpSize > SIZE_LIMIT) { + currIndex += 1; + Message message = messages.get(curIndex); + tmpSize = calcMessageSize(message); + } + return currIndex; + } + private int calcMessageSize(Message message) { + int tmpSize = message.getTopic().length() + message.getBody().length(); + Map properties = message.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节 + return tmpSize; + } } //把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(messages); diff --git a/docs/en/Example_Batch.md b/docs/en/Example_Batch.md index d7199ca7..11bb42ff 100644 --- a/docs/en/Example_Batch.md +++ b/docs/en/Example_Batch.md @@ -19,47 +19,52 @@ try { ### 2 Split into Lists The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists: ```java -public class ListSplitter implements Iterator> { - private final int SIZE_LIMIT = 1000 * 1000; +public class ListSplitter implements Iterator> { + private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List messages; private int currIndex; - public ListSplitter(List messages) { - this.messages = messages; + public ListSplitter(List messages) { + this.messages = messages; } @Override public boolean hasNext() { - return currIndex < messages.size(); + return currIndex < messages.size(); } - @Override public List next() { - int nextIndex = currIndex; + @Override public List next() { + int startIndex = getStartIndex(); + int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { - Message message = messages.get(nextIndex); - int tmpSize = message.getTopic().length() + message.getBody().length; - Map properties = message.getProperties(); - for (Map.Entry entry : properties.entrySet()) { - tmpSize += entry.getKey().length() + entry.getValue().length(); - } - tmpSize = tmpSize + 20; //for log overhead - if (tmpSize > SIZE_LIMIT) { - //it is unexpected that single message exceeds the SIZE_LIMIT - //here just let it go, otherwise it will block the splitting process - if (nextIndex - currIndex == 0) { - //if the next sublist has no element, add this one and then break, otherwise just break - nextIndex++; - } - break; - } + Message message = messages.get(nextIndex); + int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { - break; + break; } else { - totalSize += tmpSize; + totalSize += tmpSize; } - } - List subList = messages.subList(currIndex, nextIndex); + List subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } + private int getStartIndex() { + Message currMessage = messages.get(currIndex); + int tmpSize = calcMessageSize(currMessage); + while(tmpSize > SIZE_LIMIT) { + currIndex += 1; + Message message = messages.get(curIndex); + tmpSize = calcMessageSize(message); + } + return currIndex; + } + private int calcMessageSize(Message message) { + int tmpSize = message.getTopic().length() + message.getBody().length(); + Map properties = message.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes + return tmpSize; + } } // then you could split the large list into small ones: -- GitLab