提交 48db31b4 编写于 作者: D dongeforever

Finish the test for topicStats

上级 8b747f97
...@@ -30,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
...@@ -104,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead ...@@ -104,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcClientUtils; import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcException; import org.apache.rocketmq.common.rpc.RpcException;
import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcRequest;
...@@ -159,9 +161,13 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe ...@@ -159,9 +161,13 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController; private final BrokerController brokerController;
private final RpcClient rpcClient;
private final BrokerConfig brokerConfig;
public AdminBrokerProcessor(final BrokerController brokerController) { public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController; this.brokerController = brokerController;
this.brokerConfig = brokerController.getBrokerConfig();
this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
} }
@Override @Override
...@@ -650,7 +656,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -650,7 +656,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
break; break;
} }
} else { } else {
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setTimestamp(timestamp); requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId()); requestHeader.setQueueId(item.getQueueId());
requestHeader.setBname(item.getBname()); requestHeader.setBname(item.getBname());
...@@ -720,7 +726,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -720,7 +726,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert maxItem != null; assert maxItem != null;
assert maxItem.getLogicOffset() >= 0; assert maxItem.getLogicOffset() >= 0;
requestHeader.setBname(maxItem.getBname()); requestHeader.setBname(maxItem.getBname());
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
long maxPhysicalOffset = Long.MAX_VALUE; long maxPhysicalOffset = Long.MAX_VALUE;
...@@ -770,7 +776,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -770,7 +776,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
...@@ -778,14 +784,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -778,14 +784,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (!mappingContext.isLeader()) { if (!mappingContext.isLeader()) {
//this may not //this may not
return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE, return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())))); String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
}; };
GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null; assert mappingItem != null;
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
long physicalOffset; long physicalOffset;
//run in local //run in local
...@@ -815,7 +821,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -815,7 +821,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert request.getCode() == RequestCode.GET_MIN_OFFSET; assert request.getCode() == RequestCode.GET_MIN_OFFSET;
GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader(); GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext); CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(request, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
} }
...@@ -851,7 +857,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -851,7 +857,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert mappingItem != null; assert mappingItem != null;
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
//TODO check if it is in current broker //TODO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
...@@ -1006,6 +1012,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -1006,6 +1012,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
try {
assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
if (mappingContext.getMappingDetail() == null) {
return null;
}
final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader();
String topic = requestHeader.getTopic();
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
Set<String> brokers = new HashSet<>();
mappingDetail.getHostedQueues().forEach((qid, items) -> {
if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2];
itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
assert itemPair[0] != null && itemPair[1] != null;
qidItemMap.put(qid, itemPair);
brokers.add(itemPair[0].getBname());
brokers.add(itemPair[1].getBname());
}
});
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
header.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
}
TopicStatsTable topicStatsTable = new TopicStatsTable();
qidItemMap.forEach( (qid, itemPair) -> {
LogicQueueMappingItem minItem = itemPair[0];
LogicQueueMappingItem maxItem = itemPair[1];
TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
assert minTopicOffset != null && maxTopicOffset != null;
long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
if (min < 0)
min = 0;
long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
if (max < 0)
max = 0;
long timestamp = maxTopicOffset.getLastUpdateTimestamp();
TopicOffset topicOffset = new TopicOffset();
topicOffset.setMinOffset(min);
topicOffset.setMaxOffset(max);
topicOffset.setLastUpdateTimestamp(timestamp);
topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset);
});
return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
} catch (Throwable t) {
return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t));
}
}
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException { RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
...@@ -1019,8 +1089,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -1019,8 +1089,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("topic[" + topic + "] not exist"); response.setRemark("topic[" + topic + "] not exist");
return response; return response;
} }
TopicStatsTable topicStatsTable = new TopicStatsTable(); TopicStatsTable topicStatsTable = new TopicStatsTable();
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext);
if (rpcResponse != null) {
if (rpcResponse.getException() != null) {
return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} else {
topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
}
}
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(); MessageQueue mq = new MessageQueue();
mq.setTopic(topic); mq.setTopic(topic);
......
...@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//by default, it is -1 //by default, it is -1
long offset = -1; long offset = -1;
//double read, first from leader, then from second leader //double read, first from leader, then from second leader
for (int i = 1; i <= 2; i++) { for (int i = itemList.size() - 1; i >= 0; i--) {
if (itemList.size() - i < 0) { LogicQueueMappingItem mappingItem = itemList.get(i);
break;
}
LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
if (mappingItem.getBname().equals(mappingDetail.getBname())) { if (mappingItem.getBname().equals(mappingDetail.getBname())) {
offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId()); offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
if (offset >= 0) { if (offset >= 0) {
...@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//maybe we need to reconstruct an object //maybe we need to reconstruct an object
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setPhysical(true); requestHeader.setLo(false);
requestHeader.setSetZeroIfNotFound(false); requestHeader.setSetZeroIfNotFound(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
...@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen ...@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
} }
if (rpcResponse.getCode() == ResponseCode.SUCCESS) { if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset(); offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
} else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){ break;
} else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
continue; continue;
} else { } else {
//this should not happen //this should not happen
......
...@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& requestHeader.getMaxMsgNums() != null) { && requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
} }
int sysFlag = requestHeader.getSysFlag();
if (!mappingContext.isLeader()) {
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
requestHeader.setSysFlag(sysFlag);
}
if (mappingDetail.getBname().equals(bname)) { if (mappingDetail.getBname().equals(bname)) {
//just let it go, do the local pull process //just let it go, do the local pull process
return null; return null;
} }
requestHeader.setPhysical(true); int sysFlag = requestHeader.getSysFlag();
requestHeader.setLo(false);
requestHeader.setBname(bname); requestHeader.setBname(bname);
sysFlag = PullSysFlag.clearSuspendFlag(sysFlag); sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
...@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
long minOffset = responseHeader.getMinOffset(); long minOffset = responseHeader.getMinOffset();
long maxOffset = responseHeader.getMaxOffset(); long maxOffset = responseHeader.getMaxOffset();
int responseCode = code; int responseCode = code;
//consider the following situations //consider the following situations
// 1. read from slave, currently not supported // 1. read from slave, currently not supported
// 2. the middle queue is truncated because of deleting commitlog // 2. the middle queue is truncated because of deleting commitlog
if (code != ResponseCode.SUCCESS) { if (code != ResponseCode.SUCCESS) {
//note the currentItem maybe both the leader and the earliest //note the currentItem maybe both the leader and the earliest
boolean isRevised = false;
if (leaderItem.getGen() == currentItem.getGen()) { if (leaderItem.getGen() == currentItem.getGen()) {
//read the leader //read the leader
if (requestOffset > maxOffset) { if (requestOffset > maxOffset) {
...@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//just move to another item //just move to another item
LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
if (nextItem != null) { if (nextItem != null) {
isRevised = true;
currentItem = nextItem; currentItem = nextItem;
nextBeginOffset = currentItem.getStartOffset(); nextBeginOffset = currentItem.getStartOffset();
minOffset = currentItem.getStartOffset(); minOffset = currentItem.getStartOffset();
...@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
} }
//read from the middle item, ignore the PULL_OFFSET_MOVED //read from the middle item, ignore the PULL_OFFSET_MOVED
if (leaderItem.getGen() != currentItem.getGen() if (!isRevised
&& leaderItem.getGen() != currentItem.getGen()
&& earlistItem.getGen() != currentItem.getGen()) { && earlistItem.getGen() != currentItem.getGen()) {
if (requestOffset < minOffset) { if (requestOffset < minOffset) {
nextBeginOffset = minOffset; nextBeginOffset = minOffset;
...@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return null; return null;
} }
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace();
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
} }
...@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
MessageFilter messageFilter; MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
......
...@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
...@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager {
return dataVersion; return dataVersion;
} }
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) { public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader) {
return buildTopicQueueMappingContext(requestHeader, false); return buildTopicQueueMappingContext(requestHeader, false);
} }
//Do not return a null context //Do not return a null context
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) { public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
if (requestHeader.getPhysical() != null //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
&& Boolean.TRUE.equals(requestHeader.getPhysical())) { if (requestHeader.getLo() != null
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); && Boolean.FALSE.equals(requestHeader.getLo())) {
return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
} }
TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic()); String topic = requestHeader.getTopic();
Integer globalId = null;
if (requestHeader instanceof TopicQueueRequestHeader) {
globalId = ((TopicQueueRequestHeader) requestHeader).getQueueId();
}
TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(topic);
if (mappingDetail == null) { if (mappingDetail == null) {
//it is not static topic //it is not static topic
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); return new TopicQueueMappingContext(topic, null, null, null, null);
} }
assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName()); assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
if (globalId == null) {
return new TopicQueueMappingContext(topic, null, mappingDetail, null, null);
}
//If not find mappingItem, it encounters some errors //If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) { if (globalId < 0 && !selectOneWhenMiss) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
} }
if (globalId < 0) { if (globalId < 0) {
...@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
} }
if (globalId < 0) { if (globalId < 0) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
} }
List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
...@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager {
&& mappingItemList.size() > 0) { && mappingItemList.size() > 0) {
leaderItem = mappingItemList.get(mappingItemList.size() - 1); leaderItem = mappingItemList.get(mappingItemList.size() - 1);
} }
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem); return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);
} }
......
...@@ -17,12 +17,11 @@ ...@@ -17,12 +17,11 @@
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader { public class GetTopicStatsInfoRequestHeader extends TopicRequestHeader {
@CFNotNull @CFNotNull
private String topic; private String topic;
......
...@@ -25,7 +25,7 @@ public class RequestBuilder { ...@@ -25,7 +25,7 @@ public class RequestBuilder {
} }
try { try {
RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
requestHeader.setOneway(oneway); requestHeader.setOway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
return requestHeader; return requestHeader;
} catch (Throwable t) { } catch (Throwable t) {
...@@ -37,26 +37,26 @@ public class RequestBuilder { ...@@ -37,26 +37,26 @@ public class RequestBuilder {
return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null); return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null);
} }
public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) { public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean logic) {
return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
} }
public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) { public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean logic) {
return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
} }
public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) { public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean logic) {
Class requestHeaderClass = requestCodeMap.get(requestCode); Class requestHeaderClass = requestCodeMap.get(requestCode);
if (requestHeaderClass == null) { if (requestHeaderClass == null) {
throw new UnsupportedOperationException("unknown " + requestCode); throw new UnsupportedOperationException("unknown " + requestCode);
} }
try { try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
requestHeader.setOneway(oneway); requestHeader.setOway(oneway);
requestHeader.setBname(destBrokerName); requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic); requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId); requestHeader.setQueueId(queueId);
requestHeader.setPhysical(physical); requestHeader.setLo(logic);
return requestHeader; return requestHeader;
} catch (Throwable t) { } catch (Throwable t) {
throw new RuntimeException(t); throw new RuntimeException(t);
......
...@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient { ...@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient {
} }
case ResponseCode.QUERY_NOT_FOUND: { case ResponseCode.QUERY_NOT_FOUND: {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null)); rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
break;
} }
default:{ default:{
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
......
...@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; ...@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
public abstract class RpcRequestHeader implements CommandCustomHeader { public abstract class RpcRequestHeader implements CommandCustomHeader {
//the namespace name //the namespace name
protected String namespace; protected String ns;
//if the data has been namespaced //if the data has been namespaced
protected Boolean namespaced; protected Boolean nsd;
//the abstract remote addr name, usually the physical broker name //the abstract remote addr name, usually the physical broker name
protected String bname; protected String bname;
//oneway
protected Boolean oneway; protected Boolean oway;
public String getBname() { public String getBname() {
return bname; return bname;
...@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader { ...@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader {
this.bname = bname; this.bname = bname;
} }
public Boolean getOneway() { public Boolean getOway() {
return oneway; return oway;
} }
public void setOneway(Boolean oneway) { public void setOway(Boolean oway) {
this.oneway = oneway; this.oway = oway;
} }
public String getNamespace() { public String getNs() {
return namespace; return ns;
} }
public void setNamespace(String namespace) { public void setNs(String ns) {
this.namespace = namespace; this.ns = ns;
} }
public Boolean getNamespaced() { public Boolean getNsd() {
return namespaced; return nsd;
} }
public void setNamespaced(Boolean namespaced) { public void setNsd(Boolean nsd) {
this.namespaced = namespaced; this.nsd = nsd;
} }
} }
...@@ -16,20 +16,8 @@ ...@@ -16,20 +16,8 @@
*/ */
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
public abstract class TopicQueueRequestHeader extends RpcRequestHeader { public abstract class TopicQueueRequestHeader extends TopicRequestHeader {
//Physical or Logical
protected Boolean physical;
public Boolean getPhysical() {
return physical;
}
public void setPhysical(Boolean physical) {
this.physical = physical;
}
public abstract String getTopic();
public abstract void setTopic(String topic);
public abstract Integer getQueueId(); public abstract Integer getQueueId();
public abstract void setQueueId(Integer queueId); public abstract void setQueueId(Integer queueId);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.rpc;
public abstract class TopicRequestHeader extends RpcRequestHeader {
//logical
protected Boolean lo;
public abstract String getTopic();
public abstract void setTopic(String topic);
public Boolean getLo() {
return lo;
}
public void setLo(Boolean lo) {
this.lo = lo;
}
}
package org.apache.rocketmq.test.statictopic; package org.apache.rocketmq.test.statictopic;
import com.alibaba.fastjson.JSON; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.rpc.ClientMetadata;
...@@ -86,71 +89,35 @@ public class StaticTopicIT extends BaseConf { ...@@ -86,71 +89,35 @@ public class StaticTopicIT extends BaseConf {
} }
private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, int gen) throws Exception {
@Test ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
public void testCreateProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
//check the static topic config
{
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
String broker = entry.getKey();
TopicConfigAndQueueMapping configMapping = entry.getValue();
TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
Assert.assertNotNull(localConfigMapping);
Assert.assertEquals(configMapping, localConfigMapping);
}
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
//check the route data
List<MessageQueue> messageQueueList = producer.getMessageQueue(); List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size()); Assert.assertEquals(queueNum, messageQueueList.size());
producer.setDebug(true);
for (int i = 0; i < queueNum; i++) { for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i); MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(topic, messageQueue.getTopic()); Assert.assertEquals(topic, messageQueue.getTopic());
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
Assert.assertEquals(i, messageQueue.getQueueId());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertTrue(targetBrokers.contains(destBrokerName));
} }
//send and consume the msg
for(MessageQueue messageQueue: messageQueueList) { for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue); producer.send(msgEachQueue, messageQueue);
} }
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq //leave the time to build the cq
Thread.sleep(500); Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) { for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
} }
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size()); TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
Assert.assertEquals(0, producer.getSendErrorMsg().size()); for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}
} }
} }
private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) { private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>(); Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
for (Object object : msgs) { for (Object object : msgs) {
...@@ -171,217 +138,133 @@ public class StaticTopicIT extends BaseConf { ...@@ -171,217 +138,133 @@ public class StaticTopicIT extends BaseConf {
return messagesByQueue; return messagesByQueue;
} }
@Test private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
public void testDoubleReadCheckConsumerOffset() throws Exception { consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
String topic = "static" + MQRandomUtils.getRandomTopic(); /*System.out.println("produce:" + producer.getAllMsgBody().size());
String group = initConsumerGroup(); System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
//System.out.printf("Group:%s\n", consumer.getConsumerGroup());
//System.out.printf("Topic:%s\n", topic);
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
}
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
}
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
producer.shutdown(); Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
consumer.shutdown(); Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
/*for (MessageExt messageExt:messageExts) {
System.out.printf("%d %d\n", messageExt.getQueueId(), messageExt.getQueueOffset());
}*/
int totalEachQueue = msgEachQueue * genNum;
Assert.assertEquals(totalEachQueue, messageExts.size());
for (int j = 0; j < totalEachQueue; j++) {
MessageExt messageExt = messageExts.get(j);
int currGen = startGen + j / msgEachQueue;
Assert.assertEquals(topic, messageExt.getTopic());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageExt.getBrokerName());
Assert.assertEquals(i, messageExt.getQueueId());
Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset());
}
}
}
//remapping the static topic
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
} @Test
//make the metadata public void testCreateProduceConsumeStaticTopic() throws Exception {
Thread.sleep(500); String topic = "static" + MQRandomUtils.getRandomTopic();
//System.out.printf("Group:%s\n", consumer.getConsumerGroup()); RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
//check the static topic config
{ {
producer = getProducer(nsAddr, topic); Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
//just refresh the metadata for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
List<MessageQueue> messageQueueList = producer.getMessageQueue(); String broker = entry.getKey();
for(MessageQueue messageQueue: messageQueueList) { TopicConfigAndQueueMapping configMapping = entry.getValue();
producer.send(msgEachQueue, messageQueue); TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue)); Assert.assertNotNull(localConfigMapping);
} Assert.assertEquals(configMapping, localConfigMapping);
Assert.assertEquals(0, producer.getSendErrorMsg().size());
Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
}
//leave the time to build the cq
Thread.sleep(100);
}
{
consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
//System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size());
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
}
} }
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
} }
//send and check
sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
//consume and check
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
} }
@Test @Test
public void testRemappingProduceConsumeStaticTopic() throws Exception { public void testRemappingProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic(); String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 1;
int queueNum = 10;
int msgEachQueue = 100; int msgEachQueue = 100;
//create static topic //create send consume
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker1Name);
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
} }
//System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); System.out.println("=============================================================");
//System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
//Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
}
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
//remapping the static topic //remapping the static topic
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker2Name);
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size()); Assert.assertEquals(queueNum, globalIdMap.size());
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) { for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
Assert.assertEquals(broker2Name, mappingOne.getBname()); Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
} }
} Thread.sleep(500);
//leave the time to refresh the metadata sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
Thread.sleep(500); consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
producer.setDebug(true);
{
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertEquals(destBrokerName, broker2Name);
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
}
}
{
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue * 2, messageExts.size());
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}
for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset());
}
}
} }
} }
public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception { @Test
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); public void testDoubleReadCheckConsumerOffset() throws Exception {
List<MessageQueue> messageQueueList = producer.getMessageQueue(); String topic = "static" + MQRandomUtils.getRandomTopic();
Assert.assertEquals(queueNum, messageQueueList.size()); String group = initConsumerGroup();
for (int i = 0; i < queueNum; i++) { RMQNormalProducer producer = getProducer(nsAddr, topic);
MessageQueue messageQueue = messageQueueList.get(i); RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertEquals(destBrokerName, broker);
}
for(MessageQueue messageQueue: messageQueueList) { int queueNum = 10;
producer.send(msgEachQueue, messageQueue); int msgEachQueue = 100;
//create static topic
{
Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
} }
Assert.assertEquals(0, producer.getSendErrorMsg().size()); producer.shutdown();
//leave the time to build the cq consumer.shutdown();
Thread.sleep(100); //use a new producer
for(MessageQueue messageQueue: messageQueueList) { producer = getProducer(nsAddr, topic);
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue)); List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
for (int i = 0; i < brokers.size(); i++) {
Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//make the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, i + 1);
} }
consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
} }
...@@ -393,32 +276,29 @@ public class StaticTopicIT extends BaseConf { ...@@ -393,32 +276,29 @@ public class StaticTopicIT extends BaseConf {
int msgEachQueue = 100; int msgEachQueue = 100;
//create to broker1Name //create to broker1Name
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker1Name);
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata //leave the time to refresh the metadata
Thread.sleep(500); Thread.sleep(500);
sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L); sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
} }
//remapping to broker2Name //remapping to broker2Name
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker2Name);
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata //leave the time to refresh the metadata
Thread.sleep(500); Thread.sleep(500);
sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE); sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
} }
//remapping to broker3Name //remapping to broker3Name
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker3Name);
targetBrokers.add(broker3Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata //leave the time to refresh the metadata
Thread.sleep(500); Thread.sleep(500);
sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2); sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2);
} }
// 1 -> 2 -> 3, currently 1 should not has any mappings // 1 -> 2 -> 3, currently 1 should not has any mappings
...@@ -469,10 +349,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -469,10 +349,7 @@ public class StaticTopicIT extends BaseConf {
for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) { for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(1, items.size()); Assert.assertEquals(1, items.size());
} }
} }
} }
...@@ -482,42 +359,18 @@ public class StaticTopicIT extends BaseConf { ...@@ -482,42 +359,18 @@ public class StaticTopicIT extends BaseConf {
RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10; int queueNum = 10;
int msgEachQueue = 100; int msgEachQueue = 100;
//create static topic //create and send
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker1Name);
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
} sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
//System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
//System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
//Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
} }
//remapping the static topic with -1 logic offset //remapping the static topic with -1 logic offset
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = ImmutableSet.of(broker2Name);
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt); MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size()); Assert.assertEquals(queueNum, globalIdMap.size());
...@@ -525,32 +378,10 @@ public class StaticTopicIT extends BaseConf { ...@@ -525,32 +378,10 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(broker2Name, mappingOne.getBname()); Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset()); Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
} }
} //leave the time to refresh the metadata
//leave the time to refresh the metadata Thread.sleep(500);
Thread.sleep(500); //here the gen should be 0
producer.setDebug(true); sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
{
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(i, messageQueue.getQueueId());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertEquals(destBrokerName, broker2Name);
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
//leave the time to build the cq
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
//the max offset should still be msgEachQueue
Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册