From f5285b008ada78413123ae6da1ec8d3b506f0f62 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 10 Nov 2021 20:48:39 +0800 Subject: [PATCH] Polish and add some test --- .../processor/PullMessageProcessor.java | 24 +++++------ .../processor/SendMessageProcessor.java | 2 +- .../common/LogicQueueMappingItem.java | 40 +++++++---------- .../common/TopicQueueMappingDetail.java | 43 +++++-------------- .../common/TopicQueueMappingInfo.java | 4 +- .../common/TopicQueueMappingTest.java | 42 ++++++++++++++++++ .../apache/rocketmq/common/UtilAllTest.java | 3 ++ 7 files changed, 84 insertions(+), 74 deletions(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 5ab3c01a..8d7758a7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -36,9 +36,7 @@ import org.apache.rocketmq.broker.longpolling.PullRequest; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; -import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -56,8 +54,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; @@ -126,7 +122,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements Integer globalId = requestHeader.getQueueId(); Long globalOffset = requestHeader.getQueueOffset(); - LogicQueueMappingItem mappingItem = mappingDetail.getLogicQueueMappingItem(globalId, globalOffset); + LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset); return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem); } @@ -153,10 +149,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements //below are physical info String bname = mappingItem.getBname(); Integer phyQueueId = mappingItem.getQueueId(); - Long phyQueueOffset = mappingItem.convertToPhysicalQueueOffset(globalOffset); + Long phyQueueOffset = mappingItem.computePhysicalQueueOffset(globalOffset); requestHeader.setQueueId(phyQueueId); requestHeader.setQueueOffset(phyQueueOffset); - if (mappingItem.isEndOffsetDecided() + if (mappingItem.checkIfEndOffsetDecided() && requestHeader.getMaxMsgNums() != null) { requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); } @@ -205,24 +201,24 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements long nextBeginOffset = responseHeader.getNextBeginOffset(); assert nextBeginOffset >= requestHeader.getQueueOffset(); //the next begin offset should no more than the end offset - if (mappingItem.isEndOffsetDecided() + if (mappingItem.checkIfEndOffsetDecided() && nextBeginOffset >= mappingItem.getEndOffset()) { nextBeginOffset = mappingItem.getEndOffset(); } - responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset)); + responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset)); } //handle min offset - responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); + responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); //handle max offset { - if (mappingItem.isEndOffsetDecided()) { - responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(), mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId()))); + if (mappingItem.checkIfEndOffsetDecided()) { + responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId()))); } else { - responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset())); + responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset())); } } //set the offsetDelta - responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta()); + responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta()); } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 62e28280..52508a4c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -174,7 +174,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - long staticLogicOffset = mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset()); + long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset()); if (staticLogicOffset < 0) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); } diff --git a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java index 0a9ee96f..8d1b164c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/LogicQueueMappingItem.java @@ -8,42 +8,45 @@ public class LogicQueueMappingItem { private long logicOffset; // the start of the logic offset private long startOffset; // the start of the physical offset private long endOffset; // the end of the physical offset - private long timeOfStart = -1; //mutable + private long timeOfStart = -1; // mutable + private long timeOfEnd = -1; // mutable - public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) { + public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) { this.gen = gen; this.queueId = queueId; this.bname = bname; this.logicOffset = logicOffset; this.startOffset = startOffset; + this.endOffset = endOffset; this.timeOfStart = timeOfStart; + this.timeOfEnd = timeOfEnd; } - public long convertToStaticQueueOffset(long physicalQueueOffset) { + public long computeStaticQueueOffset(long physicalQueueOffset) { return logicOffset + (physicalQueueOffset - startOffset); } - public long convertToPhysicalQueueOffset(long staticQueueOffset) { + public long computePhysicalQueueOffset(long staticQueueOffset) { return (staticQueueOffset - logicOffset) + startOffset; } - public long convertToMaxStaticQueueOffset() { + public long computeMaxStaticQueueOffset() { if (endOffset >= startOffset) { return logicOffset + endOffset - startOffset; } else { return logicOffset; } } - public boolean isShouldDeleted() { + public boolean checkIfShouldDeleted() { return endOffset == startOffset; } - public boolean isEndOffsetDecided() { + public boolean checkIfEndOffsetDecided() { //if the endOffset == startOffset, then the item should be deleted return endOffset > startOffset; } - public long convertOffsetDelta() { + public long computeOffsetDelta() { return logicOffset - startOffset; } @@ -51,20 +54,6 @@ public class LogicQueueMappingItem { return gen; } - public void setGen(int gen) { - this.gen = gen; - } - - - public long getTimeOfStart() { - return timeOfStart; - } - - public void setTimeOfStart(long timeOfStart) { - this.timeOfStart = timeOfStart; - } - - public int getQueueId() { return queueId; } @@ -85,8 +74,11 @@ public class LogicQueueMappingItem { return endOffset; } + public long getTimeOfStart() { + return timeOfStart; + } - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; + public long getTimeOfEnd() { + return timeOfEnd; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index 8c2aad9f..8d5c8e8e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap; public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // the mapping info in current broker, do not register to nameserver - ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); + private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { super(topic, totalQueues, bname); @@ -47,16 +47,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { this.prevIdMap = buildIdMap(LEVEL_1); } - public ConcurrentMap revert(ConcurrentMap original) { - if (original == null || original.isEmpty()) { - return new ConcurrentHashMap(); - } - ConcurrentMap tmpIdMap = new ConcurrentHashMap(); - for (Map.Entry entry: tmpIdMap.entrySet()) { - tmpIdMap.put(entry.getValue(), entry.getKey()); - } - return tmpIdMap; - } public ConcurrentMap buildIdMap(int level) { //level 0 means current leader in this broker @@ -92,24 +82,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { } - public long convertToLogicOffset(Integer globalId, long physicalLogicOffset) { + public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) { List mappingItems = getMappingInfo(globalId); if (mappingItems == null || mappingItems.isEmpty()) { return -1; } if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) { - return mappingItems.get(mappingItems.size() - 1).convertToStaticQueueOffset(physicalLogicOffset); + return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset); } //Consider the "switch" process, reduce the error if (mappingItems.size() >= 2 && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) { - return mappingItems.get(mappingItems.size() - 2).convertToStaticQueueOffset(physicalLogicOffset); + return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset); } return -1; } - public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, long logicOffset) { + public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) { List mappingItems = getMappingInfo(globalId); if (mappingItems == null || mappingItems.isEmpty()) { @@ -124,21 +114,21 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { } //if not found, maybe out of range, return the first one for (int i = 0; i < mappingItems.size(); i++) { - if (!mappingItems.get(i).isShouldDeleted()) { + if (!mappingItems.get(i).checkIfShouldDeleted()) { return mappingItems.get(i); } } return null; } - public long getMaxOffsetFromMapping(Integer globalId) { + public long computeMaxOffsetFromMapping(Integer globalId) { List mappingItems = getMappingInfo(globalId); if (mappingItems == null || mappingItems.isEmpty()) { return -1; } LogicQueueMappingItem item = mappingItems.get(mappingItems.size() - 1); - return item.convertToMaxStaticQueueOffset(); + return item.computeMaxStaticQueueOffset(); } @@ -150,20 +140,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return topicQueueMappingInfo; } - - public int getTotalQueues() { - return totalQueues; - } - - public void setTotalQueues(int totalQueues) { - this.totalQueues = totalQueues; - } - - public String getBname() { - return bname; - } - - public String getTopic() { - return topic; + public ConcurrentMap> getHostedQueues() { + return hostedQueues; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index 2d76365d..b4a92f38 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable { int totalQueues; String bname; //identify the hosted broker name //register to broker to construct the route - ConcurrentMap currIdMap = new ConcurrentHashMap(); + transient ConcurrentMap currIdMap = new ConcurrentHashMap(); //register to broker to help detect remapping failure - ConcurrentMap prevIdMap = new ConcurrentHashMap(); + transient ConcurrentMap prevIdMap = new ConcurrentHashMap(); public TopicQueueMappingInfo(String topic, int totalQueues, String bname) { this.topic = topic; diff --git a/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java new file mode 100644 index 00000000..8e125ea1 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/TopicQueueMappingTest.java @@ -0,0 +1,42 @@ +package org.apache.rocketmq.common; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class TopicQueueMappingTest { + + @Test + public void testJsonSerialize() { + LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L); + String mappingItemJson = JSON.toJSONString(mappingItem) ; + System.out.println(mappingItemJson); + + Map mappingItemMap = JSON.parseObject(mappingItemJson, Map.class); + Assert.assertEquals(8, mappingItemMap.size()); + Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname()); + Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen()); + Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset()); + Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId()); + Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset()); + Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset()); + Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart()); + Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd()); + + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01"); + mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem)); + + String mappingDetailJson = JSON.toJSONString(mappingDetail); + Map mappingDetailMap = JSON.parseObject(mappingDetailJson); + Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap")); + Assert.assertFalse(mappingDetailMap.containsKey("currIdMap")); + Assert.assertEquals(4, mappingDetailMap.size()); + Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size()); + Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size()); + } +} diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java index 32d2ba6c..7ef7eaca 100644 --- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java @@ -23,6 +23,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.ImmutableList; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; -- GitLab