From 48db31b4815af2d2a3c07c1944a69f572e4425d7 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 8 Dec 2021 17:27:55 +0800 Subject: [PATCH] Finish the test for topicStats --- .../processor/AdminBrokerProcessor.java | 97 ++++- .../processor/ConsumerManageProcessor.java | 12 +- .../processor/PullMessageProcessor.java | 16 +- .../topic/TopicQueueMappingManager.java | 34 +- .../GetTopicStatsInfoRequestHeader.java | 5 +- .../rocketmq/common/rpc/RequestBuilder.java | 16 +- .../rocketmq/common/rpc/RpcClientImpl.java | 1 + .../rocketmq/common/rpc/RpcRequestHeader.java | 32 +- .../common/rpc/TopicQueueRequestHeader.java | 14 +- .../common/rpc/TopicRequestHeader.java | 32 ++ .../test/statictopic/StaticTopicIT.java | 405 +++++------------- 11 files changed, 301 insertions(+), 363 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index cca96f17..fb4a711c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; @@ -104,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.rpc.RpcClient; import org.apache.rocketmq.common.rpc.RpcClientUtils; import org.apache.rocketmq.common.rpc.RpcException; import org.apache.rocketmq.common.rpc.RpcRequest; @@ -159,9 +161,13 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; + private final RpcClient rpcClient; + private final BrokerConfig brokerConfig; public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; + this.brokerConfig = brokerController.getBrokerConfig(); + this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient(); } @Override @@ -650,7 +656,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements break; } } else { - requestHeader.setPhysical(true); + requestHeader.setLo(false); requestHeader.setTimestamp(timestamp); requestHeader.setQueueId(item.getQueueId()); requestHeader.setBname(item.getBname()); @@ -720,7 +726,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements assert maxItem != null; assert maxItem.getLogicOffset() >= 0; requestHeader.setBname(maxItem.getBname()); - requestHeader.setPhysical(true); + requestHeader.setLo(false); requestHeader.setQueueId(mappingItem.getQueueId()); long maxPhysicalOffset = Long.MAX_VALUE; @@ -770,7 +776,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } - private CompletableFuture handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + private CompletableFuture handleGetMinOffsetForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) { if (mappingContext.getMappingDetail() == null) { return null; } @@ -778,14 +784,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (!mappingContext.isLeader()) { //this may not return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE, - String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())))); + String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode())))); }; - + GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader(); LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); assert mappingItem != null; try { requestHeader.setBname(mappingItem.getBname()); - requestHeader.setPhysical(true); + requestHeader.setLo(false); requestHeader.setQueueId(mappingItem.getQueueId()); long physicalOffset; //run in local @@ -815,7 +821,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements assert request.getCode() == RequestCode.GET_MIN_OFFSET; GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader(); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); - CompletableFuture rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext); + CompletableFuture rewriteResult = handleGetMinOffsetForStaticTopic(request, mappingContext); if (rewriteResult != null) { return rewriteResult; } @@ -851,7 +857,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements assert mappingItem != null; try { requestHeader.setBname(mappingItem.getBname()); - requestHeader.setPhysical(true); + requestHeader.setLo(false); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null); //TODO check if it is in current broker RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); @@ -1006,6 +1012,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } + private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) { + try { + assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO; + if (mappingContext.getMappingDetail() == null) { + return null; + } + final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader(); + String topic = requestHeader.getTopic(); + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + Map qidItemMap = new HashMap<>(); + Set brokers = new HashSet<>(); + mappingDetail.getHostedQueues().forEach((qid, items) -> { + if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { + LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2]; + itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true); + itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true); + assert itemPair[0] != null && itemPair[1] != null; + qidItemMap.put(qid, itemPair); + brokers.add(itemPair[0].getBname()); + brokers.add(itemPair[1].getBname()); + } + }); + Map statsTable = new HashMap<>(); + for (String broker: brokers) { + GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); + header.setTopic(topic); + header.setBname(broker); + header.setLo(false); + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); + RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); + } + TopicStatsTable topicStatsTable = new TopicStatsTable(); + qidItemMap.forEach( (qid, itemPair) -> { + LogicQueueMappingItem minItem = itemPair[0]; + LogicQueueMappingItem maxItem = itemPair[1]; + TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId())); + TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId())); + + assert minTopicOffset != null && maxTopicOffset != null; + + long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset()); + if (min < 0) + min = 0; + long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset()); + if (max < 0) + max = 0; + long timestamp = maxTopicOffset.getLastUpdateTimestamp(); + + TopicOffset topicOffset = new TopicOffset(); + topicOffset.setMinOffset(min); + topicOffset.setMaxOffset(max); + topicOffset.setLastUpdateTimestamp(timestamp); + topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset); + }); + return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable); + } catch (Throwable t) { + return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)); + } + } + private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -1019,8 +1089,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements response.setRemark("topic[" + topic + "] not exist"); return response; } - TopicStatsTable topicStatsTable = new TopicStatsTable(); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); + RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext); + if (rpcResponse != null) { + if (rpcResponse.getException() != null) { + return RpcClientUtils.createCommandForRpcResponse(rpcResponse); + } else { + topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable()); + } + } + for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setTopic(topic); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 04433964..66abe62b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen //by default, it is -1 long offset = -1; //double read, first from leader, then from second leader - for (int i = 1; i <= 2; i++) { - if (itemList.size() - i < 0) { - break; - } - LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i); + for (int i = itemList.size() - 1; i >= 0; i--) { + LogicQueueMappingItem mappingItem = itemList.get(i); if (mappingItem.getBname().equals(mappingDetail.getBname())) { offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId()); if (offset >= 0) { @@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen //maybe we need to reconstruct an object requestHeader.setBname(mappingItem.getBname()); requestHeader.setQueueId(mappingItem.getQueueId()); - requestHeader.setPhysical(true); + requestHeader.setLo(false); requestHeader.setSetZeroIfNotFound(false); RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); @@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen } if (rpcResponse.getCode() == ResponseCode.SUCCESS) { offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset(); - } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){ + break; + } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){ continue; } else { //this should not happen diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 9b8135e7..0f1ca23c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && requestHeader.getMaxMsgNums() != null) { requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); } - int sysFlag = requestHeader.getSysFlag(); - if (!mappingContext.isLeader()) { - sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); - requestHeader.setSysFlag(sysFlag); - } if (mappingDetail.getBname().equals(bname)) { //just let it go, do the local pull process return null; } - requestHeader.setPhysical(true); + int sysFlag = requestHeader.getSysFlag(); + requestHeader.setLo(false); requestHeader.setBname(bname); sysFlag = PullSysFlag.clearSuspendFlag(sysFlag); sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); @@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements long minOffset = responseHeader.getMinOffset(); long maxOffset = responseHeader.getMaxOffset(); int responseCode = code; + //consider the following situations // 1. read from slave, currently not supported // 2. the middle queue is truncated because of deleting commitlog if (code != ResponseCode.SUCCESS) { //note the currentItem maybe both the leader and the earliest + boolean isRevised = false; if (leaderItem.getGen() == currentItem.getGen()) { //read the leader if (requestOffset > maxOffset) { @@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements //just move to another item LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); if (nextItem != null) { + isRevised = true; currentItem = nextItem; nextBeginOffset = currentItem.getStartOffset(); minOffset = currentItem.getStartOffset(); @@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } //read from the middle item, ignore the PULL_OFFSET_MOVED - if (leaderItem.getGen() != currentItem.getGen() + if (!isRevised + && leaderItem.getGen() != currentItem.getGen() && earlistItem.getGen() != currentItem.getGen()) { if (requestOffset < minOffset) { nextBeginOffset = minOffset; @@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return null; } } catch (Throwable t) { - t.printStackTrace(); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } } @@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } + MessageFilter messageFilter; if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index e76d25e3..69650536 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.common.rpc.RpcRequest; import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; +import org.apache.rocketmq.common.rpc.TopicRequestHeader; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; @@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager { return dataVersion; } - public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) { + public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader) { return buildTopicQueueMappingContext(requestHeader, false); } //Do not return a null context - public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) { - if (requestHeader.getPhysical() != null - && Boolean.TRUE.equals(requestHeader.getPhysical())) { - return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); + public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) { + //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic + if (requestHeader.getLo() != null + && Boolean.FALSE.equals(requestHeader.getLo())) { + return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null); } - TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic()); + String topic = requestHeader.getTopic(); + Integer globalId = null; + if (requestHeader instanceof TopicQueueRequestHeader) { + globalId = ((TopicQueueRequestHeader) requestHeader).getQueueId(); + } + + TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(topic); if (mappingDetail == null) { //it is not static topic - return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); + return new TopicQueueMappingContext(topic, null, null, null, null); } - assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName()); + if (globalId == null) { + return new TopicQueueMappingContext(topic, null, mappingDetail, null, null); + } + //If not find mappingItem, it encounters some errors - Integer globalId = requestHeader.getQueueId(); if (globalId < 0 && !selectOneWhenMiss) { - return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); + return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null); } if (globalId < 0) { @@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager { } } if (globalId < 0) { - return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); + return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null); } List mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); @@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager { && mappingItemList.size() > 0) { leaderItem = mappingItemList.get(mappingItemList.size() - 1); } - return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem); + return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem); } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java index 8e921b29..f98e150b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java @@ -17,12 +17,11 @@ package org.apache.rocketmq.common.protocol.header; -import org.apache.rocketmq.common.rpc.RpcRequestHeader; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.TopicRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader { +public class GetTopicStatsInfoRequestHeader extends TopicRequestHeader { @CFNotNull private String topic; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java index 4b5c62b8..f9478e4a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java @@ -25,7 +25,7 @@ public class RequestBuilder { } try { RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setOneway(oneway); + requestHeader.setOway(oneway); requestHeader.setBname(destBrokerName); return requestHeader; } catch (Throwable t) { @@ -37,26 +37,26 @@ public class RequestBuilder { return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null); } - public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) { - return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean logic) { + return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic); } - public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) { - return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical); + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean logic) { + return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic); } - public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) { + public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean logic) { Class requestHeaderClass = requestCodeMap.get(requestCode); if (requestHeaderClass == null) { throw new UnsupportedOperationException("unknown " + requestCode); } try { TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance(); - requestHeader.setOneway(oneway); + requestHeader.setOway(oneway); requestHeader.setBname(destBrokerName); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); - requestHeader.setPhysical(physical); + requestHeader.setLo(logic); return requestHeader; } catch (Throwable t) { throw new RuntimeException(t); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index eaa22be7..97879d19 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient { } case ResponseCode.QUERY_NOT_FOUND: { rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null)); + break; } default:{ rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java index 577865ec..593df815 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java @@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader; public abstract class RpcRequestHeader implements CommandCustomHeader { //the namespace name - protected String namespace; + protected String ns; //if the data has been namespaced - protected Boolean namespaced; + protected Boolean nsd; //the abstract remote addr name, usually the physical broker name protected String bname; - - protected Boolean oneway; + //oneway + protected Boolean oway; public String getBname() { return bname; @@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader { this.bname = bname; } - public Boolean getOneway() { - return oneway; + public Boolean getOway() { + return oway; } - public void setOneway(Boolean oneway) { - this.oneway = oneway; + public void setOway(Boolean oway) { + this.oway = oway; } - public String getNamespace() { - return namespace; + public String getNs() { + return ns; } - public void setNamespace(String namespace) { - this.namespace = namespace; + public void setNs(String ns) { + this.ns = ns; } - public Boolean getNamespaced() { - return namespaced; + public Boolean getNsd() { + return nsd; } - public void setNamespaced(Boolean namespaced) { - this.namespaced = namespaced; + public void setNsd(Boolean nsd) { + this.nsd = nsd; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java index 5d0a151b..660f046e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java @@ -16,20 +16,8 @@ */ package org.apache.rocketmq.common.rpc; -public abstract class TopicQueueRequestHeader extends RpcRequestHeader { - //Physical or Logical - protected Boolean physical; +public abstract class TopicQueueRequestHeader extends TopicRequestHeader { - public Boolean getPhysical() { - return physical; - } - - public void setPhysical(Boolean physical) { - this.physical = physical; - } - - public abstract String getTopic(); - public abstract void setTopic(String topic); public abstract Integer getQueueId(); public abstract void setQueueId(Integer queueId); diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java new file mode 100644 index 00000000..a70cded6 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.rpc; + +public abstract class TopicRequestHeader extends RpcRequestHeader { + //logical + protected Boolean lo; + + public abstract String getTopic(); + public abstract void setTopic(String topic); + + public Boolean getLo() { + return lo; + } + public void setLo(Boolean lo) { + this.lo = lo; + } +} diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java index fdefb06c..e89dc3a5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java @@ -1,9 +1,12 @@ package org.apache.rocketmq.test.statictopic; -import com.alibaba.fastjson.JSON; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.log4j.Logger; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.rpc.ClientMetadata; @@ -86,71 +89,35 @@ public class StaticTopicIT extends BaseConf { } - - @Test - public void testCreateProduceConsumeStaticTopic() throws Exception { - String topic = "static" + MQRandomUtils.getRandomTopic(); - RMQNormalProducer producer = getProducer(nsAddr, topic); - RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); - - int queueNum = 10; - int msgEachQueue = 100; - //create static topic - Map localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt); - //check the static topic config - { - Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size()); - for (Map.Entry entry: remoteBrokerConfigMap.entrySet()) { - String broker = entry.getKey(); - TopicConfigAndQueueMapping configMapping = entry.getValue(); - TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker); - Assert.assertNotNull(localConfigMapping); - Assert.assertEquals(configMapping, localConfigMapping); - } - TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); - Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); - Assert.assertEquals(queueNum, globalIdMap.size()); - } - //check the route data + private void sendMessagesAndCheck(RMQNormalProducer producer, Set targetBrokers, String topic, int queueNum, int msgEachQueue, int gen) throws Exception { + ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); List messageQueueList = producer.getMessageQueue(); Assert.assertEquals(queueNum, messageQueueList.size()); - producer.setDebug(true); for (int i = 0; i < queueNum; i++) { MessageQueue messageQueue = messageQueueList.get(i); Assert.assertEquals(topic, messageQueue.getTopic()); - Assert.assertEquals(i, messageQueue.getQueueId()); Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); + Assert.assertEquals(i, messageQueue.getQueueId()); + String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); + Assert.assertTrue(targetBrokers.contains(destBrokerName)); } - //send and consume the msg for(MessageQueue messageQueue: messageQueueList) { producer.send(msgEachQueue, messageQueue); } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); //leave the time to build the cq - Thread.sleep(500); + Thread.sleep(100); for(MessageQueue messageQueue: messageQueueList) { Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); + Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue)); } - Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size()); - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); - assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), - consumer.getListener().getAllMsgBody())) - .containsExactlyElementsIn(producer.getAllMsgBody()); - Map> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); - Assert.assertEquals(queueNum, messagesByQueue.size()); - for (int i = 0; i < queueNum; i++) { - List messageExts = messagesByQueue.get(i); - Assert.assertEquals(msgEachQueue, messageExts.size()); - for (int j = 0; j < msgEachQueue; j++) { - Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); - } + TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic); + for(MessageQueue messageQueue: messageQueueList) { + Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset()); + Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset()); } } - private Map> computeMessageByQueue(Collection msgs) { Map> messagesByQueue = new HashMap<>(); for (Object object : msgs) { @@ -171,217 +138,133 @@ public class StaticTopicIT extends BaseConf { return messagesByQueue; } - @Test - public void testDoubleReadCheckConsumerOffset() throws Exception { - String topic = "static" + MQRandomUtils.getRandomTopic(); - String group = initConsumerGroup(); - RMQNormalProducer producer = getProducer(nsAddr, topic); + private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) { + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000); + /*System.out.println("produce:" + producer.getAllMsgBody().size()); + System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/ - RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); - - //System.out.printf("Group:%s\n", consumer.getConsumerGroup()); - //System.out.printf("Topic:%s\n", topic); - - int queueNum = 10; - int msgEachQueue = 100; - //create static topic - { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker1Name); - MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); - } - //produce the messages - { - List messageQueueList = producer.getMessageQueue(); - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); - } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size()); - } - - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())) .containsExactlyElementsIn(producer.getAllMsgBody()); - producer.shutdown(); - consumer.shutdown(); + Map> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); + Assert.assertEquals(queueNum, messagesByQueue.size()); + for (int i = 0; i < queueNum; i++) { + List messageExts = messagesByQueue.get(i); + /*for (MessageExt messageExt:messageExts) { + System.out.printf("%d %d\n", messageExt.getQueueId(), messageExt.getQueueOffset()); + }*/ + int totalEachQueue = msgEachQueue * genNum; + Assert.assertEquals(totalEachQueue, messageExts.size()); + for (int j = 0; j < totalEachQueue; j++) { + MessageExt messageExt = messageExts.get(j); + int currGen = startGen + j / msgEachQueue; + Assert.assertEquals(topic, messageExt.getTopic()); + Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageExt.getBrokerName()); + Assert.assertEquals(i, messageExt.getQueueId()); + Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset()); + } + } + } - //remapping the static topic - { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker2Name); - MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); - } - //make the metadata - Thread.sleep(500); - //System.out.printf("Group:%s\n", consumer.getConsumerGroup()); + @Test + public void testCreateProduceConsumeStaticTopic() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + RMQNormalProducer producer = getProducer(nsAddr, topic); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); + int queueNum = 10; + int msgEachQueue = 100; + //create static topic + Map localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt); + //check the static topic config { - producer = getProducer(nsAddr, topic); - ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); - //just refresh the metadata - List messageQueueList = producer.getMessageQueue(); - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); - Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue)); - } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size()); - for(MessageQueue messageQueue: messageQueueList) { - Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue)); - } - //leave the time to build the cq - Thread.sleep(100); - } - { - consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000); - //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size()); - assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), - consumer.getListener().getAllMsgBody())) - .containsExactlyElementsIn(producer.getAllMsgBody()); - - Map> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); - - Assert.assertEquals(queueNum, messagesByQueue.size()); - for (int i = 0; i < queueNum; i++) { - List messageExts = messagesByQueue.get(i); - Assert.assertEquals(msgEachQueue, messageExts.size()); - for (int j = 0; j < msgEachQueue; j++) { - Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset()); - } + Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); + Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size()); + for (Map.Entry entry: remoteBrokerConfigMap.entrySet()) { + String broker = entry.getKey(); + TopicConfigAndQueueMapping configMapping = entry.getValue(); + TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker); + Assert.assertNotNull(localConfigMapping); + Assert.assertEquals(configMapping, localConfigMapping); } + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); } + //send and check + sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0); + //consume and check + consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1); } + @Test public void testRemappingProduceConsumeStaticTopic() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); - - int queueNum = 10; + int queueNum = 1; int msgEachQueue = 100; - //create static topic + //create send consume { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker1Name); + Set targetBrokers = ImmutableSet.of(broker1Name); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0); + consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1); } - //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); - //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name)); - - //produce the messages - { - List messageQueueList = producer.getMessageQueue(); - for (int i = 0; i < queueNum; i++) { - MessageQueue messageQueue = messageQueueList.get(i); - Assert.assertEquals(i, messageQueue.getQueueId()); - Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); - } - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); - } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - //leave the time to build the cq - Thread.sleep(100); - for(MessageQueue messageQueue: messageQueueList) { - //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); - } - } - - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); - assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), - consumer.getListener().getAllMsgBody())) - .containsExactlyElementsIn(producer.getAllMsgBody()); - + System.out.println("============================================================="); //remapping the static topic { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker2Name); + Set targetBrokers = ImmutableSet.of(broker2Name); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); for (TopicQueueMappingOne mappingOne: globalIdMap.values()) { Assert.assertEquals(broker2Name, mappingOne.getBname()); + Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset()); } - } - //leave the time to refresh the metadata - Thread.sleep(500); - producer.setDebug(true); - { - ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); - List messageQueueList = producer.getMessageQueue(); - for (int i = 0; i < queueNum; i++) { - MessageQueue messageQueue = messageQueueList.get(i); - Assert.assertEquals(i, messageQueue.getQueueId()); - Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); - String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); - Assert.assertEquals(destBrokerName, broker2Name); - } - - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); - } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - //leave the time to build the cq - Thread.sleep(100); - for(MessageQueue messageQueue: messageQueueList) { - Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue)); - } - } - { - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000); - assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), - consumer.getListener().getAllMsgBody())) - .containsExactlyElementsIn(producer.getAllMsgBody()); - Map> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); - Assert.assertEquals(queueNum, messagesByQueue.size()); - for (int i = 0; i < queueNum; i++) { - List messageExts = messagesByQueue.get(i); - Assert.assertEquals(msgEachQueue * 2, messageExts.size()); - for (int j = 0; j < msgEachQueue; j++) { - Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); - } - for (int j = msgEachQueue; j < msgEachQueue * 2; j++) { - Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset()); - } - } + Thread.sleep(500); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1); + consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2); } } - public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception { - ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); - List messageQueueList = producer.getMessageQueue(); - Assert.assertEquals(queueNum, messageQueueList.size()); - for (int i = 0; i < queueNum; i++) { - MessageQueue messageQueue = messageQueueList.get(i); - Assert.assertEquals(i, messageQueue.getQueueId()); - Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); - String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); - Assert.assertEquals(destBrokerName, broker); - } + @Test + public void testDoubleReadCheckConsumerOffset() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + String group = initConsumerGroup(); + RMQNormalProducer producer = getProducer(nsAddr, topic); + RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); + int queueNum = 10; + int msgEachQueue = 100; + //create static topic + { + Set targetBrokers = ImmutableSet.of(broker1Name); + MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0); + consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1); } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - //leave the time to build the cq - Thread.sleep(100); - for(MessageQueue messageQueue: messageQueueList) { - Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue)); + producer.shutdown(); + consumer.shutdown(); + //use a new producer + producer = getProducer(nsAddr, topic); + + List brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name); + for (int i = 0; i < brokers.size(); i++) { + Set targetBrokers = ImmutableSet.of(brokers.get(i)); + MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); + //make the metadata + Thread.sleep(500); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, i + 1); } + consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); + consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size()); } @@ -393,32 +276,29 @@ public class StaticTopicIT extends BaseConf { int msgEachQueue = 100; //create to broker1Name { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker1Name); + Set targetBrokers = ImmutableSet.of(broker1Name); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); //leave the time to refresh the metadata Thread.sleep(500); - sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0); } //remapping to broker2Name { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker2Name); + Set targetBrokers = ImmutableSet.of(broker2Name); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); //leave the time to refresh the metadata Thread.sleep(500); - sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1); } //remapping to broker3Name { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker3Name); + Set targetBrokers = ImmutableSet.of(broker3Name); MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt); //leave the time to refresh the metadata Thread.sleep(500); - sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2); + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2); } // 1 -> 2 -> 3, currently 1 should not has any mappings @@ -469,10 +349,7 @@ public class StaticTopicIT extends BaseConf { for (List items : config3.getMappingDetail().getHostedQueues().values()) { Assert.assertEquals(1, items.size()); } - } - - } @@ -482,42 +359,18 @@ public class StaticTopicIT extends BaseConf { RMQNormalProducer producer = getProducer(nsAddr, topic); int queueNum = 10; int msgEachQueue = 100; - //create static topic + //create and send { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker1Name); + Set targetBrokers = ImmutableSet.of(broker1Name); MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt); - } - //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); - //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name)); - - //produce the messages - { - List messageQueueList = producer.getMessageQueue(); - for (int i = 0; i < queueNum; i++) { - MessageQueue messageQueue = messageQueueList.get(i); - Assert.assertEquals(i, messageQueue.getQueueId()); - Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); - } - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); - } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - //leave the time to build the cq - Thread.sleep(100); - for(MessageQueue messageQueue: messageQueueList) { - //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); - } + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0); } //remapping the static topic with -1 logic offset { - Set targetBrokers = new HashSet<>(); - targetBrokers.add(broker2Name); + Set targetBrokers = ImmutableSet.of(broker2Name); MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt); Map remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt); - TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); @@ -525,32 +378,10 @@ public class StaticTopicIT extends BaseConf { Assert.assertEquals(broker2Name, mappingOne.getBname()); Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset()); } - } - //leave the time to refresh the metadata - Thread.sleep(500); - producer.setDebug(true); - { - ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt); - List messageQueueList = producer.getMessageQueue(); - for (int i = 0; i < queueNum; i++) { - MessageQueue messageQueue = messageQueueList.get(i); - Assert.assertEquals(i, messageQueue.getQueueId()); - String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); - Assert.assertEquals(destBrokerName, broker2Name); - } - - for(MessageQueue messageQueue: messageQueueList) { - producer.send(msgEachQueue, messageQueue); - } - Assert.assertEquals(0, producer.getSendErrorMsg().size()); - Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size()); - //leave the time to build the cq - Thread.sleep(100); - for(MessageQueue messageQueue: messageQueueList) { - Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); - //the max offset should still be msgEachQueue - Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); - } + //leave the time to refresh the metadata + Thread.sleep(500); + //here the gen should be 0 + sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0); } } -- GitLab