diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index d9684e71f28d70026b7d30d6613c5cb4a499bb0c..16bf677cc7d4f45506f588babd450812d47c466b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -109,6 +109,7 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; @@ -621,14 +622,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } - if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { - return null; - } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); List mappingItems = mappingContext.getMappingItemList(); - if (mappingItems == null - || mappingItems.isEmpty()) { + if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp @@ -702,14 +699,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } - if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { - return null; - } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null - || !mappingDetail.getBname().equals(mappingItem.getBname())) { + LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem(); + if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); @@ -750,16 +743,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } - if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { - return null; - } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null) { + if (!mappingContext.isLeader()) { //this may not return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; + + LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); + assert mappingItem != null; try { requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); @@ -786,7 +778,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements response.setRemark(null); return response; } catch (Throwable t) { - t.printStackTrace(); log.error("rewriteRequestForStaticTopic failed", t); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } @@ -800,7 +791,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); - TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; @@ -818,19 +809,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } - if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { - return null; - } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null) { + if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; + LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); + assert mappingItem != null; try { requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); - //TODO check if it is leader + //TODO check if it is in current broker RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { throw rpcResponse.getException(); @@ -855,7 +845,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements final GetEarliestMsgStoretimeRequestHeader requestHeader = (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); - TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index b98295c9f37f165f651bc9446560a6d7b23ee6a3..aa6879b5afbf767659bad71804fc98d221c0e201 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -54,6 +54,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag; @@ -68,7 +69,6 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.PutMessageResult; @@ -79,7 +79,6 @@ import java.nio.ByteBuffer; import java.util.List; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; -import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode; public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -111,15 +110,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); String topic = mappingContext.getTopic(); Integer globalId = mappingContext.getGlobalId(); - Long globalOffset = mappingContext.getGlobalOffset(); - - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null) { + // if the leader? consider the order consumer, which will lock the mq + if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname())); } + Long globalOffset = requestHeader.getQueueOffset(); - //TODO Check if the leader? consider the order consumer, which will lock the mq - // + LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true); + mappingContext.setCurrentItem(mappingItem); if (globalOffset < mappingItem.getLogicOffset()) { //handleOffsetMoved @@ -174,68 +172,122 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext, final int code) { try { - if (mappingContext == null) { + if (mappingContext.getMappingDetail() == null) { return null; } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + LogicQueueMappingItem leaderItem = mappingContext.getLeaderItem(); + + LogicQueueMappingItem currentItem = mappingContext.getCurrentItem(); + + LogicQueueMappingItem earlistItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true); + + assert currentItem.getLogicOffset() >= 0; long requestOffset = requestHeader.getQueueOffset(); long nextBeginOffset = responseHeader.getNextBeginOffset(); long minOffset = responseHeader.getMinOffset(); long maxOffset = responseHeader.getMaxOffset(); int responseCode = code; - if (responseCode != ResponseCode.SUCCESS - && responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) { - if (mappingContext.isLeader()) { + //consider the following situations + // 1. read from slave, currently not supported + // 2. the middle queue is truncated because of deleting commitlog + if (code != ResponseCode.SUCCESS) { + //note the currentItem maybe both the leader and the earliest + if (leaderItem.getGen() == currentItem.getGen()) { + //read the leader + if (requestOffset > maxOffset) { + //actually, we need do nothing, but keep the code structure here + if (code == ResponseCode.PULL_OFFSET_MOVED) { + responseCode = ResponseCode.PULL_OFFSET_MOVED; + } else { + //maybe current broker is the slave + responseCode = code; + } + } else if (requestOffset < minOffset) { + nextBeginOffset = minOffset; + responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY; + } else { + responseCode = code; + } + } + //note the currentItem maybe both the leader and the earliest + if (earlistItem.getGen() == currentItem.getGen()) { + //read the earliest one if (requestOffset < minOffset) { + if (code == ResponseCode.PULL_OFFSET_MOVED) { + responseCode = ResponseCode.PULL_OFFSET_MOVED; + } else { + //maybe read from slave, but we still set it to moved + responseCode = ResponseCode.PULL_OFFSET_MOVED; + } nextBeginOffset = minOffset; - responseCode = ResponseCode.PULL_NOT_FOUND; - } else if (requestOffset > maxOffset) { - responseCode = ResponseCode.PULL_OFFSET_MOVED; - } else if (requestOffset == maxOffset) { - responseCode = ResponseCode.PULL_NOT_FOUND; + } else if (requestOffset >= maxOffset) { + //just move to another item + LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); + if (nextItem != null) { + currentItem = nextItem; + nextBeginOffset = currentItem.getStartOffset(); + minOffset = currentItem.getStartOffset(); + maxOffset = minOffset; + responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY; + } else { + //maybe the next one's logic offset is -1 + responseCode = ResponseCode.PULL_NOT_FOUND; + } } else { //let it go + responseCode = code; } - } else { + } + + //read from the middle item, ignore the PULL_OFFSET_MOVED + if (leaderItem.getGen() != currentItem.getGen() + && earlistItem.getGen() != currentItem.getGen()) { if (requestOffset < minOffset) { nextBeginOffset = minOffset; - responseCode = ResponseCode.PULL_NOT_FOUND; + responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY; } else if (requestOffset >= maxOffset) { - responseCode = ResponseCode.PULL_NOT_FOUND; //just move to another item - mappingItem = mappingContext.findNext(); - assert mappingItem != null; - nextBeginOffset = mappingItem.getStartOffset(); - minOffset = mappingItem.getStartOffset(); - maxOffset = minOffset; + LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); + if (nextItem != null) { + currentItem = nextItem; + nextBeginOffset = currentItem.getStartOffset(); + minOffset = currentItem.getStartOffset(); + maxOffset = minOffset; + responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY; + } else { + //maybe the next one's logic offset is -1 + responseCode = ResponseCode.PULL_NOT_FOUND; + } + } else { + responseCode = code; } } } //handle nextBeginOffset //the next begin offset should no more than the end offset - if (mappingItem.checkIfEndOffsetDecided() - && nextBeginOffset >= mappingItem.getEndOffset()) { - nextBeginOffset = mappingItem.getEndOffset(); + if (currentItem.checkIfEndOffsetDecided() + && nextBeginOffset >= currentItem.getEndOffset()) { + nextBeginOffset = currentItem.getEndOffset(); } - responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); + responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); //handle min offset - responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), minOffset))); + responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset))); //handle max offset - responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset), + responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); //set the offsetDelta - responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta()); + responseHeader.setOffsetDelta(currentItem.computeOffsetDelta()); - if (code != ResponseCode.SUCCESS - && code != ResponseCode.PULL_RETRY_IMMEDIATELY) { + if (code != ResponseCode.SUCCESS) { return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader); } else { return null; } } catch (Throwable t) { + t.printStackTrace(); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } } @@ -292,7 +344,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements return response; } - TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, requestHeader.getQueueOffset()); + + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); { RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index d254b8a7f1297800b22b827b2b73d736fce26182..03a5dbaf5cfb1266de0775448af2f3c451f185e4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -101,7 +101,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (requestHeader == null) { return CompletableFuture.completedFuture(null); } - TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true, Long.MAX_VALUE); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return CompletableFuture.completedFuture(rewriteResult); @@ -130,7 +130,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); + LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem(); if (mappingItem == null) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 5fa3a31c933b198d37b8c17c8a546b4de3b1daf1..1dd9cbf158131489f8415ede845b21994c87d270 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.broker.topic; import com.alibaba.fastjson.JSON; -import com.google.common.collect.ImmutableList; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -164,24 +163,24 @@ public class TopicQueueMappingManager extends ConfigManager { } public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) { - return buildTopicQueueMappingContext(requestHeader, false, Long.MAX_VALUE); + return buildTopicQueueMappingContext(requestHeader, false); } //Do not return a null context - public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss, Long globalOffset) { + public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) { if (requestHeader.getPhysical() != null && Boolean.TRUE.equals(requestHeader.getPhysical())) { - return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null); + return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); } TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic()); if (mappingDetail == null) { //it is not static topic - return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null); + return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null); } //If not find mappingItem, it encounters some errors Integer globalId = requestHeader.getQueueId(); if (globalId < 0 && !selectOneWhenMiss) { - return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); + return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); } if (globalId < 0) { @@ -194,24 +193,17 @@ public class TopicQueueMappingManager extends ConfigManager { } } if (globalId < 0) { - return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); + return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null); } List mappingItemList = null; - LogicQueueMappingItem mappingItem = null; - - if (globalOffset == null - || Long.MAX_VALUE == globalOffset) { - mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); - if (mappingItemList != null + LogicQueueMappingItem leaderItem = null; + mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); + if (mappingItemList != null && mappingItemList.size() > 0) { - mappingItem = mappingItemList.get(mappingItemList.size() - 1); - } - } else { - mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); - mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset); + leaderItem = mappingItemList.get(mappingItemList.size() - 1); } - return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem); + return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem); } @@ -221,11 +213,10 @@ public class TopicQueueMappingManager extends ConfigManager { return null; } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null - || !mappingDetail.getBname().equals(mappingItem.getBname())) { + if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); } + LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem(); requestHeader.setQueueId(mappingItem.getQueueId()); return null; } catch (Throwable t) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index da9c8b30edafbe2e9e0dd50a48fac8d33a900651..273add4f2154405e4f441273fc4f2e79bd75fd38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -23,6 +23,8 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; + +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.consumer.PopCallback; import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.client.consumer.PullCallback; @@ -170,6 +172,7 @@ public class PullAPIWrapper { this.recalculatePullFromWhichNode(mq), false); } + if (findBrokerResult != null) { { // check version @@ -203,6 +206,7 @@ public class PullAPIWrapper { brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } + PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index 959207e16dd76f28fab145c53e6a97150593d6ca..9f79e9d12d8a19d668e8ba6d17ba6711573e96be 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -32,6 +32,10 @@ public class LogicQueueMappingItem extends RemotingSerializable { } public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) { + //consider the newly mapped item + if (logicOffset < 0) { + return -1; + } if (physicalQueueOffset < startOffset) { return logicOffset; } @@ -43,6 +47,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { } public long computeStaticQueueOffset(long physicalQueueOffset) { + if (logicOffset < 0) { + return logicOffset; + } if (physicalQueueOffset < startOffset) { return logicOffset; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java index 4a788ab11e6a2d07aa42adc401df5db1fb6e5ec7..f705c43932d9856ffb40227538c8cdeb256bf0e9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java @@ -23,18 +23,18 @@ import java.util.List; public class TopicQueueMappingContext { private String topic; private Integer globalId; - private Long globalOffset; private TopicQueueMappingDetail mappingDetail; private List mappingItemList; - private LogicQueueMappingItem mappingItem; + private LogicQueueMappingItem leaderItem; - public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, List mappingItemList, LogicQueueMappingItem mappingItem) { + private LogicQueueMappingItem currentItem; + + public TopicQueueMappingContext(String topic, Integer globalId, TopicQueueMappingDetail mappingDetail, List mappingItemList, LogicQueueMappingItem leaderItem) { this.topic = topic; this.globalId = globalId; - this.globalOffset = globalOffset; this.mappingDetail = mappingDetail; this.mappingItemList = mappingItemList; - this.mappingItem = mappingItem; + this.leaderItem = leaderItem; } @@ -45,33 +45,7 @@ public class TopicQueueMappingContext { } public boolean isLeader() { - if (mappingDetail == null - || mappingItemList == null - || mappingItemList.isEmpty()) { - return false; - } - LogicQueueMappingItem mappingItem = mappingItemList.get(mappingItemList.size() - 1); - return mappingItem.getBname().equals(mappingDetail.getBname()); - } - - public LogicQueueMappingItem findNext() { - if (mappingDetail == null - || mappingItem == null - || mappingItemList == null - || mappingItemList.isEmpty()) { - return null; - } - for (int i = 0; i < mappingItemList.size(); i++) { - LogicQueueMappingItem item = mappingItemList.get(i); - if (item.getGen() == mappingItem.getGen()) { - if (i < mappingItemList.size() - 1) { - return mappingItemList.get(i + 1); - } else { - return null; - } - } - } - return null; + return leaderItem != null && leaderItem.getBname().equals(mappingDetail.getBname()); } @@ -91,13 +65,6 @@ public class TopicQueueMappingContext { this.globalId = globalId; } - public Long getGlobalOffset() { - return globalOffset; - } - - public void setGlobalOffset(Long globalOffset) { - this.globalOffset = globalOffset; - } public TopicQueueMappingDetail getMappingDetail() { return mappingDetail; @@ -115,13 +82,23 @@ public class TopicQueueMappingContext { this.mappingItemList = mappingItemList; } - public LogicQueueMappingItem getMappingItem() { - return mappingItem; + public LogicQueueMappingItem getLeaderItem() { + return leaderItem; + } + + public void setLeaderItem(LogicQueueMappingItem leaderItem) { + this.leaderItem = leaderItem; } - public void setMappingItem(LogicQueueMappingItem mappingItem) { - this.mappingItem = mappingItem; + public LogicQueueMappingItem getCurrentItem() { + return currentItem; } + public void setCurrentItem(LogicQueueMappingItem currentItem) { + this.currentItem = currentItem; + } + public void setMappingItemList(List mappingItemList) { + this.mappingItemList = mappingItemList; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 06595720a52336c7cdb67e2df0975c93ddea00e8..1749b8ed7cb97e55524b0c31ce8f0909808fd805 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -77,28 +77,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { } - public static LogicQueueMappingItem findLogicQueueMappingItem(List mappingItems, long logicOffset) { - if (mappingItems == null - || mappingItems.isEmpty()) { - return null; - } - //Could use bi-search to polish performance - for (int i = mappingItems.size() - 1; i >= 0; i--) { - LogicQueueMappingItem item = mappingItems.get(i); - if (item.getLogicOffset() >= 0 - && logicOffset >= item.getLogicOffset()) { - return item; - } - } - //if not found, maybe out of range, return the first one - for (int i = 0; i < mappingItems.size(); i++) { - if (!mappingItems.get(i).checkIfShouldDeleted()) { - return mappingItems.get(i); - } - } - return null; - } - public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail mappingDetail, Integer globalId) { List mappingItems = getMappingInfo(mappingDetail, globalId); if (mappingItems == null diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 5527974e1c4e340303bac5ab950e10537c5e1b7b..975a5ba003cd017e2463cc9cb8d2fb1a069a8331 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -618,4 +618,58 @@ public class TopicQueueMappingUtils { return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut); } + public static LogicQueueMappingItem findLogicQueueMappingItem(List mappingItems, long logicOffset, boolean ignoreNegative) { + if (mappingItems == null + || mappingItems.isEmpty()) { + return null; + } + //Could use bi-search to polish performance + for (int i = mappingItems.size() - 1; i >= 0; i--) { + LogicQueueMappingItem item = mappingItems.get(i); + if (ignoreNegative && item.getLogicOffset() < 0) { + continue; + } + if (logicOffset >= item.getLogicOffset()) { + return item; + } + } + //if not found, maybe out of range, return the first one + for (int i = 0; i < mappingItems.size(); i++) { + LogicQueueMappingItem item = mappingItems.get(i); + if (ignoreNegative && item.getLogicOffset() < 0) { + continue; + } + if (!item.checkIfShouldDeleted()) { + return mappingItems.get(i); + } + } + return null; + } + + public static LogicQueueMappingItem findNext(List items, LogicQueueMappingItem currentItem, boolean ignoreNegative) { + if (items == null + || currentItem == null) { + return null; + } + for (int i = 0; i < items.size(); i++) { + LogicQueueMappingItem item = items.get(i); + if (ignoreNegative && item.getLogicOffset() < 0) { + continue; + } + if (item.getGen() == currentItem.getGen()) { + if (i < items.size() - 1) { + item = items.get(i + 1); + if (ignoreNegative && item.getLogicOffset() < 0) { + return null; + } else { + return item; + } + } else { + return null; + } + } + } + return null; + } + } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index 03a92e867454d51f3d19969a9bb6877ceb8aba8e..abe1eeee399161f9cf6ee624a1b549b7dd1d0c46 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -48,6 +48,7 @@ public class StaticTopicIT extends BaseConf { @Before public void setUp() throws Exception { + System.setProperty("rocketmq.client.rebalance.waitInterval", "500"); defaultMQAdminExt = getAdmin(nsAddr); waitBrokerRegistered(nsAddr, clusterName); clientMetadata = new ClientMetadata(); @@ -173,7 +174,6 @@ public class StaticTopicIT extends BaseConf { String topic = "static" + MQRandomUtils.getRandomTopic(); RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); - producer.getProducer().setPollNameServerInterval(100); int queueNum = 10; int msgEachQueue = 100; @@ -183,6 +183,9 @@ public class StaticTopicIT extends BaseConf { targetBrokers.add(broker1Name); createStaticTopic(topic, queueNum, targetBrokers); } + //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name)); + //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name)); + //produce the messages { List messageQueueList = producer.getMessageQueue(); @@ -203,6 +206,11 @@ public class StaticTopicIT extends BaseConf { } } + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListener().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + //remapping the static topic { Set targetBrokers = new HashSet<>(); @@ -242,8 +250,7 @@ public class StaticTopicIT extends BaseConf { } } { - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); - System.out.println("Consume: " + consumer.getListener().getAllMsgBody().size()); + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000); assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())) .containsExactlyElementsIn(producer.getAllMsgBody()); @@ -258,7 +265,7 @@ public class StaticTopicIT extends BaseConf { Assert.assertEquals(queueNum, messagesByQueue.size()); for (int i = 0; i < queueNum; i++) { List messageExts = messagesByQueue.get(i); - Assert.assertEquals(msgEachQueue, messageExts.size()); + Assert.assertEquals(msgEachQueue * 2, messageExts.size()); Collections.sort(messageExts, new Comparator() { @Override public int compare(MessageExt o1, MessageExt o2) { @@ -268,12 +275,16 @@ public class StaticTopicIT extends BaseConf { for (int j = 0; j < msgEachQueue; j++) { Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); } + for (int j = msgEachQueue; j < msgEachQueue * 2; j++) { + Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset()); + } } } } @After public void tearDown() { + System.setProperty("rocketmq.client.rebalance.waitInterval", "20000"); super.shutdown(); }