提交 f5285b00 编写于 作者: D dongeforever

Polish and add some test

上级 5e04a278
...@@ -36,9 +36,7 @@ import org.apache.rocketmq.broker.longpolling.PullRequest; ...@@ -36,9 +36,7 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
...@@ -56,8 +54,6 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -56,8 +54,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
...@@ -126,7 +122,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -126,7 +122,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
Integer globalId = requestHeader.getQueueId(); Integer globalId = requestHeader.getQueueId();
Long globalOffset = requestHeader.getQueueOffset(); Long globalOffset = requestHeader.getQueueOffset();
LogicQueueMappingItem mappingItem = mappingDetail.getLogicQueueMappingItem(globalId, globalOffset); LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset);
return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem); return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
} }
...@@ -153,10 +149,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -153,10 +149,10 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//below are physical info //below are physical info
String bname = mappingItem.getBname(); String bname = mappingItem.getBname();
Integer phyQueueId = mappingItem.getQueueId(); Integer phyQueueId = mappingItem.getQueueId();
Long phyQueueOffset = mappingItem.convertToPhysicalQueueOffset(globalOffset); Long phyQueueOffset = mappingItem.computePhysicalQueueOffset(globalOffset);
requestHeader.setQueueId(phyQueueId); requestHeader.setQueueId(phyQueueId);
requestHeader.setQueueOffset(phyQueueOffset); requestHeader.setQueueOffset(phyQueueOffset);
if (mappingItem.isEndOffsetDecided() if (mappingItem.checkIfEndOffsetDecided()
&& requestHeader.getMaxMsgNums() != null) { && requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
} }
...@@ -205,24 +201,24 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -205,24 +201,24 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
long nextBeginOffset = responseHeader.getNextBeginOffset(); long nextBeginOffset = responseHeader.getNextBeginOffset();
assert nextBeginOffset >= requestHeader.getQueueOffset(); assert nextBeginOffset >= requestHeader.getQueueOffset();
//the next begin offset should no more than the end offset //the next begin offset should no more than the end offset
if (mappingItem.isEndOffsetDecided() if (mappingItem.checkIfEndOffsetDecided()
&& nextBeginOffset >= mappingItem.getEndOffset()) { && nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset(); nextBeginOffset = mappingItem.getEndOffset();
} }
responseHeader.setNextBeginOffset(mappingItem.convertToStaticQueueOffset(nextBeginOffset)); responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset));
} }
//handle min offset //handle min offset
responseHeader.setMinOffset(mappingItem.convertToStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset())));
//handle max offset //handle max offset
{ {
if (mappingItem.isEndOffsetDecided()) { if (mappingItem.checkIfEndOffsetDecided()) {
responseHeader.setMaxOffset(Math.max(mappingItem.convertToMaxStaticQueueOffset(), mappingDetail.getMaxOffsetFromMapping(mappingContext.getGlobalId()))); responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
} else { } else {
responseHeader.setMaxOffset(mappingItem.convertToStaticQueueOffset(responseHeader.getMaxOffset())); responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset()));
} }
} }
//set the offsetDelta //set the offsetDelta
responseHeader.setOffsetDelta(mappingItem.convertOffsetDelta()); responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
} catch (Throwable t) { } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
......
...@@ -174,7 +174,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -174,7 +174,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
long staticLogicOffset = mappingDetail.convertToLogicOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset()); long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
if (staticLogicOffset < 0) { if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
} }
......
...@@ -8,42 +8,45 @@ public class LogicQueueMappingItem { ...@@ -8,42 +8,45 @@ public class LogicQueueMappingItem {
private long logicOffset; // the start of the logic offset private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset private long startOffset; // the start of the physical offset
private long endOffset; // the end of the physical offset private long endOffset; // the end of the physical offset
private long timeOfStart = -1; //mutable private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable
public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) { public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
this.gen = gen; this.gen = gen;
this.queueId = queueId; this.queueId = queueId;
this.bname = bname; this.bname = bname;
this.logicOffset = logicOffset; this.logicOffset = logicOffset;
this.startOffset = startOffset; this.startOffset = startOffset;
this.endOffset = endOffset;
this.timeOfStart = timeOfStart; this.timeOfStart = timeOfStart;
this.timeOfEnd = timeOfEnd;
} }
public long convertToStaticQueueOffset(long physicalQueueOffset) { public long computeStaticQueueOffset(long physicalQueueOffset) {
return logicOffset + (physicalQueueOffset - startOffset); return logicOffset + (physicalQueueOffset - startOffset);
} }
public long convertToPhysicalQueueOffset(long staticQueueOffset) { public long computePhysicalQueueOffset(long staticQueueOffset) {
return (staticQueueOffset - logicOffset) + startOffset; return (staticQueueOffset - logicOffset) + startOffset;
} }
public long convertToMaxStaticQueueOffset() { public long computeMaxStaticQueueOffset() {
if (endOffset >= startOffset) { if (endOffset >= startOffset) {
return logicOffset + endOffset - startOffset; return logicOffset + endOffset - startOffset;
} else { } else {
return logicOffset; return logicOffset;
} }
} }
public boolean isShouldDeleted() { public boolean checkIfShouldDeleted() {
return endOffset == startOffset; return endOffset == startOffset;
} }
public boolean isEndOffsetDecided() { public boolean checkIfEndOffsetDecided() {
//if the endOffset == startOffset, then the item should be deleted //if the endOffset == startOffset, then the item should be deleted
return endOffset > startOffset; return endOffset > startOffset;
} }
public long convertOffsetDelta() { public long computeOffsetDelta() {
return logicOffset - startOffset; return logicOffset - startOffset;
} }
...@@ -51,20 +54,6 @@ public class LogicQueueMappingItem { ...@@ -51,20 +54,6 @@ public class LogicQueueMappingItem {
return gen; return gen;
} }
public void setGen(int gen) {
this.gen = gen;
}
public long getTimeOfStart() {
return timeOfStart;
}
public void setTimeOfStart(long timeOfStart) {
this.timeOfStart = timeOfStart;
}
public int getQueueId() { public int getQueueId() {
return queueId; return queueId;
} }
...@@ -85,8 +74,11 @@ public class LogicQueueMappingItem { ...@@ -85,8 +74,11 @@ public class LogicQueueMappingItem {
return endOffset; return endOffset;
} }
public long getTimeOfStart() {
return timeOfStart;
}
public void setEndOffset(long endOffset) { public long getTimeOfEnd() {
this.endOffset = endOffset; return timeOfEnd;
} }
} }
...@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap; ...@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver // the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>(); private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname); super(topic, totalQueues, bname);
...@@ -47,16 +47,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -47,16 +47,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
this.prevIdMap = buildIdMap(LEVEL_1); this.prevIdMap = buildIdMap(LEVEL_1);
} }
public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
if (original == null || original.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
}
ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
tmpIdMap.put(entry.getValue(), entry.getKey());
}
return tmpIdMap;
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) { public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker //level 0 means current leader in this broker
...@@ -92,24 +82,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -92,24 +82,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
} }
public long convertToLogicOffset(Integer globalId, long physicalLogicOffset) { public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId); List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null if (mappingItems == null
|| mappingItems.isEmpty()) { || mappingItems.isEmpty()) {
return -1; return -1;
} }
if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) { if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
return mappingItems.get(mappingItems.size() - 1).convertToStaticQueueOffset(physicalLogicOffset); return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset);
} }
//Consider the "switch" process, reduce the error //Consider the "switch" process, reduce the error
if (mappingItems.size() >= 2 if (mappingItems.size() >= 2
&& bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) { && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
return mappingItems.get(mappingItems.size() - 2).convertToStaticQueueOffset(physicalLogicOffset); return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset);
} }
return -1; return -1;
} }
public LogicQueueMappingItem getLogicQueueMappingItem(Integer globalId, long logicOffset) { public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId); List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null if (mappingItems == null
|| mappingItems.isEmpty()) { || mappingItems.isEmpty()) {
...@@ -124,21 +114,21 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -124,21 +114,21 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
} }
//if not found, maybe out of range, return the first one //if not found, maybe out of range, return the first one
for (int i = 0; i < mappingItems.size(); i++) { for (int i = 0; i < mappingItems.size(); i++) {
if (!mappingItems.get(i).isShouldDeleted()) { if (!mappingItems.get(i).checkIfShouldDeleted()) {
return mappingItems.get(i); return mappingItems.get(i);
} }
} }
return null; return null;
} }
public long getMaxOffsetFromMapping(Integer globalId) { public long computeMaxOffsetFromMapping(Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId); List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
if (mappingItems == null if (mappingItems == null
|| mappingItems.isEmpty()) { || mappingItems.isEmpty()) {
return -1; return -1;
} }
LogicQueueMappingItem item = mappingItems.get(mappingItems.size() - 1); LogicQueueMappingItem item = mappingItems.get(mappingItems.size() - 1);
return item.convertToMaxStaticQueueOffset(); return item.computeMaxStaticQueueOffset();
} }
...@@ -150,20 +140,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -150,20 +140,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return topicQueueMappingInfo; return topicQueueMappingInfo;
} }
public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
public int getTotalQueues() { return hostedQueues;
return totalQueues;
}
public void setTotalQueues(int totalQueues) {
this.totalQueues = totalQueues;
}
public String getBname() {
return bname;
}
public String getTopic() {
return topic;
} }
} }
...@@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable { ...@@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
int totalQueues; int totalQueues;
String bname; //identify the hosted broker name String bname; //identify the hosted broker name
//register to broker to construct the route //register to broker to construct the route
ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure //register to broker to help detect remapping failure
ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>(); transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname) { public TopicQueueMappingInfo(String topic, int totalQueues, String bname) {
this.topic = topic; this.topic = topic;
......
package org.apache.rocketmq.common;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class TopicQueueMappingTest {
@Test
public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
String mappingItemJson = JSON.toJSONString(mappingItem) ;
System.out.println(mappingItemJson);
Map<String, Object> mappingItemMap = JSON.parseObject(mappingItemJson, Map.class);
Assert.assertEquals(8, mappingItemMap.size());
Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen());
Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset());
Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId());
Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset());
Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01");
mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail);
Map mappingDetailMap = JSON.parseObject(mappingDetailJson);
Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
Assert.assertEquals(4, mappingDetailMap.size());
Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
}
}
...@@ -23,6 +23,9 @@ import java.util.Arrays; ...@@ -23,6 +23,9 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册