diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 16bf677cc7d4f45506f588babd450812d47c466b..33d3025a3010b040f408eebdbf8a2024dcaa8d7f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -709,6 +709,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); + final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); responseHeader.setOffset(offset); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 930f1712d58608da59dcd7ec10a78e843f593f47..aad9454f4f7d50bd236f415682ff5986cefa1d4d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -20,8 +20,14 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.rpc.RpcRequest; +import org.apache.rocketmq.common.rpc.RpcResponse; +import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -39,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; + public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -111,6 +119,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); + RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; @@ -122,6 +131,76 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen return response; } + + public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) { + try { + if (mappingContext.getMappingDetail() == null) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); + if (!mappingContext.isLeader()) { + return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); + } + if (mappingContext.checkIfAsPhysical()) { + //let it go + requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId()); + return null; + } + //double read check + List<LogicQueueMappingItem> itemList = mappingContext.getMappingItemList(); + //by default, it is -1 + long offset = -1; + //double read, first from leader, then from second leader + for (int i = 1; i <= 2; i++) { + if (itemList.size() - i < 0) { + break; + } + LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i); + if (mappingItem.getBname().equals(mappingDetail.getBname())) { + offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId()); + if (offset >= 0) { + break; + } else { + //not found + continue; + } + } else { + //maybe we need to reconstruct an object + requestHeader.setBname(mappingItem.getBname()); + requestHeader.setQueueId(mappingItem.getQueueId()); + requestHeader.setPhysical(true); + RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + if (rpcResponse.getCode() == ResponseCode.SUCCESS) { + offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset(); + } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){ + continue; + } else { + //this should not happen + throw new RuntimeException("Unknown response code " + rpcResponse.getCode()); + } + } + } + final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); + final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); + if (offset >= 0) { + responseHeader.setOffset(offset); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + } else { + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("Not found, maybe this group consumer boot first"); + } + return response; + } catch (Throwable t) { + t.printStackTrace(); + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + } + private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = @@ -132,8 +211,9 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader); - RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); + RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } @@ -152,8 +232,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( - requestHeader.getTopic(), requestHeader.getQueueId(), 0) - && mappingContext.checkIfAsPhysical()) { + requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 1dd9cbf158131489f8415ede845b21994c87d270..9be37176a22fa75e24fe2378b6d517d5ce492fcf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -196,9 +196,8 @@ public class TopicQueueMappingManager extends ConfigManager { return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); } - List<LogicQueueMappingItem> mappingItemList = null; + List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); LogicQueueMappingItem leaderItem = null; - mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); if (mappingItemList != null && mappingItemList.size() > 0) { leaderItem = mappingItemList.get(mappingItemList.size() - 1); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 73648560e03297d03d72c7e1b48145b9a1f2f7b1..91d12a04c0473f011e94e9d6b6e3f81c5d3cf629 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.OffsetNotFoundException; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; @@ -94,7 +96,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { return brokerOffset; } // No offset in broker - catch (MQBrokerException e) { + catch (OffsetNotFoundException e) { return -1; } //Other exceptions @@ -108,7 +110,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } } - return -1; + return -3; } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java new file mode 100644 index 0000000000000000000000000000000000000000..c3d275f9af9012277df7c8df3c644c2de2fc66aa --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java @@ -0,0 +1,15 @@ +package org.apache.rocketmq.client.exception; + +public class OffsetNotFoundException extends MQBrokerException { + + public OffsetNotFoundException() { + } + + public OffsetNotFoundException(int responseCode, String errorMessage) { + super(responseCode, errorMessage); + } + + public OffsetNotFoundException(int responseCode, String errorMessage, String brokerAddr) { + super(responseCode, errorMessage, brokerAddr); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 4feb2251363bd7ee5302d62570ccdf3ce12415cf..7538866cd4962283f50062433b476f18f834f04d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQRedirectException; +import org.apache.rocketmq.client.exception.OffsetNotFoundException; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -1232,9 +1233,11 @@ public class MQClientAPIImpl { case ResponseCode.SUCCESS: { QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); - return responseHeader.getOffset(); } + case ResponseCode.PULL_NOT_FOUND:{ + throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr); + } default: break; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 09d1521654830b73e019c0a910e1f7a361fdf492..d28a0469864edd5e4c805725fd557f4d7306558f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 4e00bc33a550c7a0694e60443900392f4a5172ba..713bbf940e50a9e4418381c815a21952ccb719d5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -9,6 +9,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; @@ -75,6 +76,9 @@ public class RpcClientImpl implements RpcClient { case RequestCode.GET_EARLIEST_MSG_STORETIME: rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs); break; + case RequestCode.QUERY_CONSUMER_OFFSET: + rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs); + break; default: throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); } @@ -176,6 +180,31 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } + + + public Promise<RpcResponse> handleQueryConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); + + RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); + RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); + assert responseCommand != null; + switch (responseCommand.getCode()) { + case ResponseCode.SUCCESS: { + QueryConsumerOffsetResponseHeader responseHeader = + (QueryConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; + } + case ResponseCode.QUERY_NOT_FOUND: { + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null)); + } + default:{ + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); + } + } + return rpcResponsePromise; + } + public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index abe1eeee399161f9cf6ee624a1b549b7dd1d0c46..c1cc60be99c47fce62cdc3f2e222bbd628cd3242 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -28,6 +28,7 @@ import org.junit.Test; import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -144,30 +145,123 @@ public class StaticTopicIT extends BaseConf { assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())) .containsExactlyElementsIn(producer.getAllMsgBody()); + Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); + Assert.assertEquals(queueNum, messagesByQueue.size()); + for (int i = 0; i < queueNum; i++) { + List<MessageExt> messageExts = messagesByQueue.get(i); + Assert.assertEquals(msgEachQueue, messageExts.size()); + for (int j = 0; j < msgEachQueue; j++) { + Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); + } + } + } + + + private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) { Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>(); - for (Object object : consumer.getListener().getAllOriginMsg()) { + for (Object object : msgs) { MessageExt messageExt = (MessageExt) object; if (!messagesByQueue.containsKey(messageExt.getQueueId())) { messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>()); } messagesByQueue.get(messageExt.getQueueId()).add(messageExt); } - Assert.assertEquals(queueNum, messagesByQueue.size()); - for (int i = 0; i < queueNum; i++) { - List<MessageExt> messageExts = messagesByQueue.get(i); - Assert.assertEquals(msgEachQueue, messageExts.size()); - Collections.sort(messageExts, new Comparator<MessageExt>() { + for (List<MessageExt> msgEachQueue: messagesByQueue.values()) { + Collections.sort(msgEachQueue, new Comparator<MessageExt>() { @Override public int compare(MessageExt o1, MessageExt o2) { return (int) (o1.getQueueOffset() - o2.getQueueOffset()); } }); - for (int j = 0; j < msgEachQueue; j++) { - Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); - } } + return messagesByQueue; } + @Test + public void testDoubleReadCheck() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + String group = initConsumerGroup(); + RMQNormalProducer producer = getProducer(nsAddr, topic); + + RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); + + //System.out.printf("Group:%s\n", consumer.getConsumerGroup()); + //System.out.printf("Topic:%s\n", topic); + + int queueNum = 10; + int msgEachQueue = 100; + //create static topic + { + Set<String> targetBrokers = new HashSet<>(); + targetBrokers.add(broker1Name); + createStaticTopic(topic, queueNum, targetBrokers); + } + //produce the messages + { + List<MessageQueue> messageQueueList = producer.getMessageQueue(); + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size()); + } + + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListener().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + producer.shutdown(); + consumer.shutdown(); + + //remapping the static topic + { + Set<String> targetBrokers = new HashSet<>(); + targetBrokers.add(broker2Name); + remappingStaticTopic(topic, targetBrokers); + + } + //make the metadata + Thread.sleep(500); + //System.out.printf("Group:%s\n", consumer.getConsumerGroup()); + + { + producer = getProducer(nsAddr, topic); + //just refresh the metadata + defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + List<MessageQueue> messageQueueList = producer.getMessageQueue(); + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue)); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size()); + for(MessageQueue messageQueue: messageQueueList) { + Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue)); + } + //leave the time to build the cq + Thread.sleep(100); + } + { + consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener()); + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000); + //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListener().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); + + Assert.assertEquals(queueNum, messagesByQueue.size()); + for (int i = 0; i < queueNum; i++) { + List<MessageExt> messageExts = messagesByQueue.get(i); + Assert.assertEquals(msgEachQueue, messageExts.size()); + for (int j = 0; j < msgEachQueue; j++) { + Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset()); + } + } + } + } @Test public void testRemappingProduceConsumeStaticTopic() throws Exception { @@ -175,6 +269,7 @@ public class StaticTopicIT extends BaseConf { RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); + int queueNum = 10; int msgEachQueue = 100; //create static topic @@ -254,24 +349,11 @@ public class StaticTopicIT extends BaseConf { assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())) .containsExactlyElementsIn(producer.getAllMsgBody()); - Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>(); - for (Object object : consumer.getListener().getAllOriginMsg()) { - MessageExt messageExt = (MessageExt) object; - if (!messagesByQueue.containsKey(messageExt.getQueueId())) { - messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>()); - } - messagesByQueue.get(messageExt.getQueueId()).add(messageExt); - } + Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg()); Assert.assertEquals(queueNum, messagesByQueue.size()); for (int i = 0; i < queueNum; i++) { List<MessageExt> messageExts = messagesByQueue.get(i); Assert.assertEquals(msgEachQueue * 2, messageExts.size()); - Collections.sort(messageExts, new Comparator<MessageExt>() { - @Override - public int compare(MessageExt o1, MessageExt o2) { - return (int) (o1.getQueueOffset() - o2.getQueueOffset()); - } - }); for (int j = 0; j < msgEachQueue; j++) { Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); }