diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md index 66980d577af0528229793ce2e589f690bcc1689e..9559a6049bd84b1cb27592e7c8c4751e9e4a5fa5 100644 --- a/docs/cn/RocketMQ_Example.md +++ b/docs/cn/RocketMQ_Example.md @@ -500,48 +500,52 @@ try { 复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下: ```java - -public class ListSplitter implements Iterator> { - private final int SIZE_LIMIT = 1024 * 1024 * 4; - private final List messages; - private int currIndex; - public ListSplitter(List messages) { - this.messages = messages; - } - @Override public boolean hasNext() { - return currIndex < messages.size(); - } - @Override public List next() { - int nextIndex = currIndex; - int totalSize = 0; - for (; nextIndex < messages.size(); nextIndex++) { - Message message = messages.get(nextIndex); - int tmpSize = message.getTopic().length() + message.getBody().length; - Map properties = message.getProperties(); - for (Map.Entry entry : properties.entrySet()) { - tmpSize += entry.getKey().length() + entry.getValue().length(); - } - tmpSize = tmpSize + 20; // 增加日志的开销20字节 - if (tmpSize > SIZE_LIMIT) { - //单个消息超过了最大的限制 - //忽略,否则会阻塞分裂的进程 - if (nextIndex - currIndex == 0) { - //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 - nextIndex++; - } - break; - } - if (tmpSize + totalSize > SIZE_LIMIT) { - break; - } else { - totalSize += tmpSize; - } - - } - List subList = messages.subList(currIndex, nextIndex); - currIndex = nextIndex; - return subList; - } +public class ListSplitter implements Iterator> { + private final int SIZE_LIMIT = 1024 * 1024 * 4; + private final List messages; + private int currIndex; + public ListSplitter(List messages) { + this.messages = messages; + } + @Override public boolean hasNext() { + return currIndex < messages.size(); + } + @Override public List next() { + int startIndex = getStartIndex(); + int nextIndex = startIndex; + int totalSize = 0; + for (; nextIndex < messages.size(); nextIndex++) { + Message message = messages.get(nextIndex); + int tmpSize = calcMessageSize(message); + if (tmpSize + totalSize > SIZE_LIMIT) { + break; + } else { + totalSize += tmpSize; + } + } + List subList = messages.subList(startIndex, nextIndex); + currIndex = nextIndex; + return subList; + } + private int getStartIndex() { + Message currMessage = messages.get(currIndex); + int tmpSize = calcMessageSize(currMessage); + while(tmpSize > SIZE_LIMIT) { + currIndex += 1; + Message message = messages.get(curIndex); + tmpSize = calcMessageSize(message); + } + return currIndex; + } + private int calcMessageSize(Message message) { + int tmpSize = message.getTopic().length() + message.getBody().length(); + Map properties = message.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节 + return tmpSize; + } } //把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(messages); diff --git a/docs/en/Example_Batch.md b/docs/en/Example_Batch.md index d7199ca7c78c03b02df56935ba8dcc31a7f02dfd..11bb42ffbe27b28046795892c30b7adcf14f8213 100644 --- a/docs/en/Example_Batch.md +++ b/docs/en/Example_Batch.md @@ -19,47 +19,52 @@ try { ### 2 Split into Lists The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists: ```java -public class ListSplitter implements Iterator> { - private final int SIZE_LIMIT = 1000 * 1000; +public class ListSplitter implements Iterator> { + private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List messages; private int currIndex; - public ListSplitter(List messages) { - this.messages = messages; + public ListSplitter(List messages) { + this.messages = messages; } @Override public boolean hasNext() { - return currIndex < messages.size(); + return currIndex < messages.size(); } - @Override public List next() { - int nextIndex = currIndex; + @Override public List next() { + int startIndex = getStartIndex(); + int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { - Message message = messages.get(nextIndex); - int tmpSize = message.getTopic().length() + message.getBody().length; - Map properties = message.getProperties(); - for (Map.Entry entry : properties.entrySet()) { - tmpSize += entry.getKey().length() + entry.getValue().length(); - } - tmpSize = tmpSize + 20; //for log overhead - if (tmpSize > SIZE_LIMIT) { - //it is unexpected that single message exceeds the SIZE_LIMIT - //here just let it go, otherwise it will block the splitting process - if (nextIndex - currIndex == 0) { - //if the next sublist has no element, add this one and then break, otherwise just break - nextIndex++; - } - break; - } + Message message = messages.get(nextIndex); + int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { - break; + break; } else { - totalSize += tmpSize; + totalSize += tmpSize; } - } - List subList = messages.subList(currIndex, nextIndex); + List subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } + private int getStartIndex() { + Message currMessage = messages.get(currIndex); + int tmpSize = calcMessageSize(currMessage); + while(tmpSize > SIZE_LIMIT) { + currIndex += 1; + Message message = messages.get(curIndex); + tmpSize = calcMessageSize(message); + } + return currIndex; + } + private int calcMessageSize(Message message) { + int tmpSize = message.getTopic().length() + message.getBody().length(); + Map properties = message.getProperties(); + for (Map.Entry entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes + return tmpSize; + } } // then you could split the large list into small ones: