diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 646c530a86df483feb79415ccf3eb4ac44c1d928..9230d9559b3907f0931fccc7dad49ff45d620556 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -492,11 +492,11 @@ public class BrokerController { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { - BrokerController.this.refreshBrokerNameMapping(); + BrokerController.this.brokerOuterAPI.refreshMetadata(); } catch (Exception e) { - log.error("ScheduledTask examineBrokerClusterInfo exception", e); + log.error("ScheduledTask refresh metadata exception", e); } - }, 10, 10, TimeUnit.SECONDS); + }, 1, 5, TimeUnit.SECONDS); if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { @@ -624,13 +624,6 @@ public class BrokerController { } } - private void refreshBrokerNameMapping() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - ClusterInfo brokerClusterInfo = this.brokerOuterAPI.getBrokerClusterInfo(); - brokerClusterInfo.getBrokerAddrTable().forEach((brokerName, data) -> { - String masterBrokerAddr = data.getBrokerAddrs().get(MixAll.MASTER_ID); - this.brokerName2AddrMap.put(brokerName, masterBrokerAddr); - }); - } public String getBrokerAddrByName(String brokerName) { return this.brokerName2AddrMap.get(brokerName); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 384eef0741867890491f891a0182a60957a57819..e9ef46eff0004bffb5a9a42f5e9104aa50b7bb58 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -477,6 +478,12 @@ public class BrokerOuterAPI { } + public void refreshMetadata() throws Exception { + ClusterInfo brokerClusterInfo = getBrokerClusterInfo(); + clientMetadata.refreshClusterInfo(brokerClusterInfo); + } + + public ClientMetadata getClientMetadata() { return clientMetadata; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a4e3fd41859fe2843440f8c58eef425ac9cf6adb..d9684e71f28d70026b7d30d6613c5cb4a499bb0c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -621,13 +621,16 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } + if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null - || !mappingDetail.getBname().equals(mappingItem.getBname())) { + List mappingItems = mappingContext.getMappingItemList(); + if (mappingItems == null + || mappingItems.isEmpty()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } - List mappingItems = mappingContext.getMappingItemList(); //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp Long timestamp = requestHeader.getTimestamp(); long offset = -1; @@ -699,6 +702,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } + if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); if (mappingItem == null @@ -743,10 +750,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } + if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { + return null; + } + TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null - || !mappingDetail.getBname().equals(mappingItem.getBname())) { + if (mappingItem == null) { + //this may not return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { @@ -774,7 +785,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; - }catch (Throwable t) { + } catch (Throwable t) { + t.printStackTrace(); + log.error("rewriteRequestForStaticTopic failed", t); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } } @@ -786,6 +799,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements final GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); + TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { @@ -804,10 +818,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements if (mappingContext.getMappingDetail() == null) { return null; } + if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) { + return null; + } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - if (mappingItem == null - || !mappingDetail.getBname().equals(mappingItem.getBname())) { + if (mappingItem == null) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); }; try { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 6388c06aec30623e652787c03cd34f49f2f90e3d..b98295c9f37f165f651bc9446560a6d7b23ee6a3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -68,6 +68,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.PutMessageResult; @@ -78,6 +79,7 @@ import java.nio.ByteBuffer; import java.util.List; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; +import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode; public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -134,6 +136,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements && requestHeader.getMaxMsgNums() != null) { requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums())); } + int sysFlag = requestHeader.getSysFlag(); + if (!mappingContext.isLeader()) { + sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); + requestHeader.setSysFlag(sysFlag); + } if (mappingDetail.getBname().equals(bname)) { //just let it go, do the local pull process @@ -142,6 +149,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements requestHeader.setPhysical(true); requestHeader.setBname(bname); + sysFlag = PullSysFlag.clearSuspendFlag(sysFlag); + sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag); + requestHeader.setSysFlag(sysFlag); RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { @@ -150,7 +160,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader(); { - RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, rpcResponse.getCode()); if (rewriteResult != null) { return rewriteResult; } @@ -161,35 +171,73 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements } } - private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) { + private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, + TopicQueueMappingContext mappingContext, final int code) { try { if (mappingContext == null) { return null; } TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); - //handle nextBeginOffset - { - long nextBeginOffset = responseHeader.getNextBeginOffset(); - assert nextBeginOffset >= requestHeader.getQueueOffset(); - //the next begin offset should no more than the end offset - if (mappingItem.checkIfEndOffsetDecided() - && nextBeginOffset >= mappingItem.getEndOffset()) { - nextBeginOffset = mappingItem.getEndOffset(); + + long requestOffset = requestHeader.getQueueOffset(); + long nextBeginOffset = responseHeader.getNextBeginOffset(); + long minOffset = responseHeader.getMinOffset(); + long maxOffset = responseHeader.getMaxOffset(); + int responseCode = code; + if (responseCode != ResponseCode.SUCCESS + && responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) { + if (mappingContext.isLeader()) { + if (requestOffset < minOffset) { + nextBeginOffset = minOffset; + responseCode = ResponseCode.PULL_NOT_FOUND; + } else if (requestOffset > maxOffset) { + responseCode = ResponseCode.PULL_OFFSET_MOVED; + } else if (requestOffset == maxOffset) { + responseCode = ResponseCode.PULL_NOT_FOUND; + } else { + //let it go + } + } else { + if (requestOffset < minOffset) { + nextBeginOffset = minOffset; + responseCode = ResponseCode.PULL_NOT_FOUND; + } else if (requestOffset >= maxOffset) { + responseCode = ResponseCode.PULL_NOT_FOUND; + //just move to another item + mappingItem = mappingContext.findNext(); + assert mappingItem != null; + nextBeginOffset = mappingItem.getStartOffset(); + minOffset = mappingItem.getStartOffset(); + maxOffset = minOffset; + } } - responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); } + + //handle nextBeginOffset + //the next begin offset should no more than the end offset + if (mappingItem.checkIfEndOffsetDecided() + && nextBeginOffset >= mappingItem.getEndOffset()) { + nextBeginOffset = mappingItem.getEndOffset(); + } + responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); //handle min offset - responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), responseHeader.getMinOffset()))); + responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), minOffset))); //handle max offset - responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()), + responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); //set the offsetDelta responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta()); + + if (code != ResponseCode.SUCCESS + && code != ResponseCode.PULL_RETRY_IMMEDIATELY) { + return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader); + } else { + return null; + } } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); } - return null; } @@ -440,13 +488,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements break; } - //rewrite the response for the - { - RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext); - if (rewriteResult != null) { - return rewriteResult; - } - } + if (this.hasConsumeMessageHook()) { @@ -491,6 +533,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements this.executeConsumeMessageHookBefore(context); } + //rewrite the response for the + RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode()); + if (rewriteResult != null) { + response = rewriteResult; + } + switch (response.getCode()) { case ResponseCode.SUCCESS: diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 4b9d328b6d0244ab5ce313ebb412e3fc6de685ed..5fa3a31c933b198d37b8c17c8a546b4de3b1daf1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -82,6 +82,7 @@ public class TopicQueueMappingManager extends ConfigManager { return; } if (force) { + //bakeup the old items oldDetail.getHostedQueues().forEach( (queueId, items) -> { newDetail.getHostedQueues().putIfAbsent(queueId, items); }); @@ -90,17 +91,21 @@ public class TopicQueueMappingManager extends ConfigManager { return; } //do more check - if (newDetail.getEpoch() <= oldDetail.getEpoch()) { + if (newDetail.getEpoch() < oldDetail.getEpoch()) { throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch())); } + boolean epochEqual = newDetail.getEpoch() == oldDetail.getEpoch(); for (Integer globalId : oldDetail.getHostedQueues().keySet()) { List oldItems = oldDetail.getHostedQueues().get(globalId); List newItems = newDetail.getHostedQueues().get(globalId); if (newItems == null) { - //keep the old - newDetail.getHostedQueues().put(globalId, oldItems); + if (epochEqual) { + throw new RuntimeException("Cannot accept equal epoch with null data"); + } else { + newDetail.getHostedQueues().put(globalId, oldItems); + } } else { - TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems); + TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, newItems, epochEqual); } } topicQueueMappingTable.put(newDetail.getTopic(), newDetail); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 594415557e4e4434e9230608e071efd3a8677e06..77add204a3b28d9a29bbed3511103c4acec09156 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java index 245cc3a3930baad4a1dbcdde7d10418d58bf99c3..4e00bc33a550c7a0694e60443900392f4a5172ba 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java @@ -1,5 +1,6 @@ package org.apache.rocketmq.common.rpc; +import com.alibaba.fastjson.JSON; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.apache.rocketmq.common.message.MessageQueue; @@ -65,6 +66,15 @@ public class RpcClientImpl implements RpcClient { case RequestCode.PULL_MESSAGE: rpcResponsePromise = handlePullMessage(addr, request, timeoutMs); break; + case RequestCode.GET_MIN_OFFSET: + rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs); + break; + case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: + rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs); + break; + case RequestCode.GET_EARLIEST_MSG_STORETIME: + rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs); + break; default: throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode()); } @@ -146,8 +156,9 @@ public class RpcClientImpl implements RpcClient { return rpcResponsePromise; } - public RpcResponse handleSearchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = getBrokerAddrByNameOrException(bname); + public Promise handleSearchOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise rpcResponsePromise = createResponseFuture(); + RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis); assert responseCommand != null; @@ -155,17 +166,18 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); - return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; } default:{ - RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); - return rpcResponse; + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } + return rpcResponsePromise; } - public RpcResponse handleGetMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = getBrokerAddrByNameOrException(bname); + public Promise handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise rpcResponsePromise = createResponseFuture(); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); @@ -175,17 +187,18 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); - return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; } default:{ - RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); - return rpcResponse; + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } + return rpcResponsePromise; } - public RpcResponse handleGetEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception { - String addr = getBrokerAddrByNameOrException(bname); + public Promise handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { + final Promise rpcResponsePromise = createResponseFuture(); RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest); @@ -195,14 +208,14 @@ public class RpcClientImpl implements RpcClient { case ResponseCode.SUCCESS: { GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); - return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()); - + rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody())); + break; } default:{ - RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")); - return rpcResponse; + rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error"))); } } + return rpcResponsePromise; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java index ebae12b519776f9100ead388a19eed5f1b0e0a0a..61dce641ddd1809b375fa054f3de011ebcc9b4f4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java @@ -21,6 +21,9 @@ public class RpcClientUtils { } public static byte[] encodeBody(Object body) { + if (body == null) { + return null; + } if (body instanceof byte[]) { return (byte[])body; } else if (body instanceof RemotingSerializable) { diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java index d6d359d934e0560e8fabd9074fcb326ee32ab9fa..4a788ab11e6a2d07aa42adc401df5db1fb6e5ec7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java @@ -35,6 +35,7 @@ public class TopicQueueMappingContext { this.mappingDetail = mappingDetail; this.mappingItemList = mappingItemList; this.mappingItem = mappingItem; + } public boolean checkIfAsPhysical() { @@ -43,6 +44,37 @@ public class TopicQueueMappingContext { || (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0); } + public boolean isLeader() { + if (mappingDetail == null + || mappingItemList == null + || mappingItemList.isEmpty()) { + return false; + } + LogicQueueMappingItem mappingItem = mappingItemList.get(mappingItemList.size() - 1); + return mappingItem.getBname().equals(mappingDetail.getBname()); + } + + public LogicQueueMappingItem findNext() { + if (mappingDetail == null + || mappingItem == null + || mappingItemList == null + || mappingItemList.isEmpty()) { + return null; + } + for (int i = 0; i < mappingItemList.size(); i++) { + LogicQueueMappingItem item = mappingItemList.get(i); + if (item.getGen() == mappingItem.getGen()) { + if (i < mappingItemList.size() - 1) { + return mappingItemList.get(i + 1); + } else { + return null; + } + } + } + return null; + } + + public String getTopic() { return topic; } @@ -90,4 +122,6 @@ public class TopicQueueMappingContext { public void setMappingItem(LogicQueueMappingItem mappingItem) { this.mappingItem = mappingItem; } + + } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index 86a6cec11c52f44706bd1af7227ffacf77b5e05d..06595720a52336c7cdb67e2df0975c93ddea00e8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -85,7 +85,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { //Could use bi-search to polish performance for (int i = mappingItems.size() - 1; i >= 0; i--) { LogicQueueMappingItem item = mappingItems.get(i); - if (logicOffset >= item.getLogicOffset()) { + if (item.getLogicOffset() >= 0 + && logicOffset >= item.getLogicOffset()) { return item; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index ef565a0bed9a362edf9e7abc1c26d017dc8e25dc..5527974e1c4e340303bac5ab950e10537c5e1b7b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -36,6 +36,8 @@ import java.util.Set; public class TopicQueueMappingUtils { + public static final int DEFAULT_BLOCK_SEQ_SIZE = 10000; + public static class MappingAllocator { Map brokerNumMap = new HashMap(); Map idToBroker = new HashMap(); @@ -191,7 +193,7 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleEntry(maxEpoch, maxNum); } - public static void makeSureLogicQueueMappingItemImmutable(List oldItems, List newItems) { + public static void makeSureLogicQueueMappingItemImmutable(List oldItems, List newItems, boolean epochEqual) { if (oldItems == null || oldItems.isEmpty()) { return; } @@ -218,6 +220,16 @@ public class TopicQueueMappingUtils { inew++; } } + if (epochEqual) { + LogicQueueMappingItem oldLeader = oldItems.get(oldItems.size() - 1); + LogicQueueMappingItem newLeader = newItems.get(newItems.size() - 1); + if (newLeader.getGen() != oldLeader.getGen() + || !newLeader.getBname().equals(oldLeader.getBname()) + || newLeader.getQueueId() != oldLeader.getQueueId() + || newLeader.getStartOffset() != oldLeader.getStartOffset()) { + throw new RuntimeException("The new leader is different but epoch equal"); + } + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java index ce7558f2bcffd66b02b2b15b8a1e0a3306bce459..20b8ad2081a1c338a254f4d8c2a07a7f52ba6efd 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java @@ -69,6 +69,10 @@ public class PullSysFlag { return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND; } + public static int clearSuspendFlag(final int sysFlag) { + return sysFlag & (~FLAG_SUSPEND); + } + public static boolean hasSubscriptionFlag(final int sysFlag) { return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION; } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java index 9b9ab6b7722bd5733b60a02993fbce51d333e231..b4a8ddaf66d53e16ed079dfccb0cda2f4917f725 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java @@ -4,9 +4,11 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.ImmutableList; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.junit.Assert; import org.junit.Test; +import org.junit.experimental.theories.suppliers.TestedOn; import java.util.Map; @@ -53,4 +55,9 @@ public class TopicQueueMappingTest { Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false)); } } + + @Test + public void test() { + + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java similarity index 99% rename from common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java rename to common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java index bd4b13ca71ca7464472f1c85eaca5a357b5fe1d5..93cad48a3d85b79db4a253f208835c19a810d8bb 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicMappingUtilsTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtilsTest.java @@ -13,7 +13,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; -public class TopicMappingUtilsTest { +public class TopicQueueMappingUtilsTest { private Set buildTargetBrokers(int num) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index f895b63b1df39099c47bd5fa170217c0c776f04a..0e32226f67082014c2160f3675ca4eac0874f2e2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -28,8 +28,13 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; public class RemotingCommand { @@ -313,11 +318,17 @@ public class RemotingCommand { return objectHeader; } - private Field[] getClazzFields(Class classHeader) { + //make it able to test + Field[] getClazzFields(Class classHeader) { Field[] field = CLASS_HASH_MAP.get(classHeader); if (field == null) { - field = classHeader.getDeclaredFields(); + Set fieldList = new HashSet(); + for (Class className = classHeader; className != Object.class; className = className.getSuperclass()) { + Field[] fields = className.getDeclaredFields(); + fieldList.addAll(Arrays.asList(fields)); + } + field = fieldList.toArray(new Field[0]); synchronized (CLASS_HASH_MAP) { CLASS_HASH_MAP.put(classHeader, field); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java index 2bd41cec839d8365eb46da5e029f03310f74d225..f2f693571111eaffaa5c0acd06b9c54cc3ea292d 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java @@ -19,9 +19,14 @@ package org.apache.rocketmq.remoting.protocol; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; + +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.junit.Assert; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -198,6 +203,28 @@ public class RemotingCommandTest { Field value = FieldTestClass.class.getDeclaredField("value"); assertThat(method.invoke(remotingCommand, value)).isEqualTo(false); } + + @Test + public void testParentField() throws Exception { + SubExtFieldsHeader subExtFieldsHeader = new SubExtFieldsHeader(); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(1, subExtFieldsHeader); + Field[] fields = remotingCommand.getClazzFields(subExtFieldsHeader.getClass()); + Assert.assertEquals(7, fields.length); + Set names = new HashSet<>(); + names.add("stringValue"); + names.add("intValue"); + names.add("longValue"); + names.add("booleanValue"); + names.add("doubleValue"); + names.add("name"); + names.add("value"); + for (Field field : fields) { + Assert.assertTrue(names.contains(field.getName())); + } + remotingCommand.makeCustomHeaderToNet(); + SubExtFieldsHeader other = (SubExtFieldsHeader) remotingCommand.decodeCommandCustomHeader(subExtFieldsHeader.getClass()); + Assert.assertEquals(other, subExtFieldsHeader); + } } class FieldTestClass { @@ -246,4 +273,72 @@ class ExtFieldsHeader implements CommandCustomHeader { public double getDoubleValue() { return doubleValue; } -} \ No newline at end of file + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ExtFieldsHeader)) return false; + + ExtFieldsHeader that = (ExtFieldsHeader) o; + + if (intValue != that.intValue) return false; + if (longValue != that.longValue) return false; + if (booleanValue != that.booleanValue) return false; + if (Double.compare(that.doubleValue, doubleValue) != 0) return false; + return stringValue != null ? stringValue.equals(that.stringValue) : that.stringValue == null; + } + + @Override + public int hashCode() { + int result; + long temp; + result = stringValue != null ? stringValue.hashCode() : 0; + result = 31 * result + intValue; + result = 31 * result + (int) (longValue ^ (longValue >>> 32)); + result = 31 * result + (booleanValue ? 1 : 0); + temp = Double.doubleToLongBits(doubleValue); + result = 31 * result + (int) (temp ^ (temp >>> 32)); + return result; + } +} + + +class SubExtFieldsHeader extends ExtFieldsHeader { + private String name = "12321"; + private int value = 111; + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof SubExtFieldsHeader)) return false; + if (!super.equals(o)) return false; + + SubExtFieldsHeader that = (SubExtFieldsHeader) o; + + if (value != that.value) return false; + return name != null ? name.equals(that.name) : that.name == null; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + value; + return result; + } +} diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java index ce739be591452f733f7f8def22898dd8a29a7834..71f9088875ee6941cfb09c28a21fd4f40398a20e 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java @@ -49,6 +49,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setInstanceName(RandomUtil.getStringByUUID()); consumer.setNamesrvAddr(nsAddr); + consumer.setPollNameServerInterval(100); try { consumer.subscribe(topic, subExpression); } catch (MQClientException e) { @@ -92,4 +93,8 @@ public class RMQNormalConsumer extends AbstractMQConsumer { create(); start(); } + + public DefaultMQPushConsumer getConsumer() { + return consumer; + } } diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java index 018623d8199d8709443c4628346c9bedcff13589..4f5d38e7779727f5554136198bca8041d95e7bb6 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java @@ -73,6 +73,7 @@ public class RMQNormalProducer extends AbstractMQProducer { producer.setProducerGroup(getProducerGroupName()); producer.setInstanceName(getProducerInstanceName()); producer.setUseTLS(useTLS); + producer.setPollNameServerInterval(100); if (nsAddr != null) { producer.setNamesrvAddr(nsAddr); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index e573180b6022c74bb9910ffc7a769583ee795b4a..53a7ab3bcb3d840c26f3b15f8219e108c6d80074 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -100,7 +100,10 @@ public class BaseConf { List brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas(); return brokerDatas.size() == brokerNum; }); - } catch (MQClientException e) { + for (BrokerController brokerController: brokerControllerList) { + brokerController.getBrokerOuterAPI().refreshMetadata(); + } + } catch (Exception e) { log.error("init failed, please check BaseConf"); } ForkJoinPool.commonPool().execute(mqAdminExt::shutdown); @@ -126,6 +129,7 @@ public class BaseConf { public static DefaultMQAdminExt getAdmin(String nsAddr) { final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500); mqAdminExt.setNamesrvAddr(nsAddr); + mqAdminExt.setPollNameServerInterval(100); mqClients.add(mqAdminExt); return mqAdminExt; } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index feb8730f6b6eb11c971f3287a77f422de48107a6..03a92e867454d51f3d19969a9bb6877ceb8aba8e 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -2,6 +2,7 @@ package org.apache.rocketmq.test.smoke; import org.apache.log4j.Logger; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; @@ -11,6 +12,7 @@ import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; +import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper; import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; @@ -23,6 +25,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; +import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess; import java.util.ArrayList; import java.util.Collections; @@ -61,7 +64,7 @@ public class StaticTopicIT extends BaseConf { Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); Assert.assertTrue(brokerConfigMap.isEmpty()); TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap); - Assert.assertEquals(2, brokerConfigMap.size()); + Assert.assertEquals(targetBrokers.size(), brokerConfigMap.size()); //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : brokerConfigMap.entrySet()) { String broker = entry.getKey(); @@ -72,6 +75,22 @@ public class StaticTopicIT extends BaseConf { return brokerConfigMap; } + public void remappingStaticTopic(String topic, Set targetBrokers) throws Exception { + Map brokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + Assert.assertFalse(brokerConfigMap.isEmpty()); + TopicRemappingDetailWrapper wrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers); + defaultMQAdminExt.remappingStaticTopic(clientMetadata, topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), brokerConfigMap, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, false); + } + + + + + @Test + public void testNonTargetBrokers() { + + } + + @Test public void testCreateProduceConsumeStaticTopic() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); @@ -120,7 +139,7 @@ public class StaticTopicIT extends BaseConf { Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size()); Assert.assertEquals(0, producer.getSendErrorMsg().size()); - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())) .containsExactlyElementsIn(producer.getAllMsgBody()); @@ -149,6 +168,110 @@ public class StaticTopicIT extends BaseConf { } + @Test + public void testRemappingProduceConsumeStaticTopic() throws Exception { + String topic = "static" + MQRandomUtils.getRandomTopic(); + RMQNormalProducer producer = getProducer(nsAddr, topic); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); + producer.getProducer().setPollNameServerInterval(100); + + int queueNum = 10; + int msgEachQueue = 100; + //create static topic + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker1Name); + createStaticTopic(topic, queueNum, targetBrokers); + } + //produce the messages + { + List messageQueueList = producer.getMessageQueue(); + for (int i = 0; i < queueNum; i++) { + MessageQueue messageQueue = messageQueueList.get(i); + Assert.assertEquals(i, messageQueue.getQueueId()); + Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); + } + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + //leave the time to build the cq + Thread.sleep(100); + for(MessageQueue messageQueue: messageQueueList) { + //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue)); + } + } + + //remapping the static topic + { + Set targetBrokers = new HashSet<>(); + targetBrokers.add(broker2Name); + remappingStaticTopic(topic, targetBrokers); + Map remoteBrokerConfigMap = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); + + TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap); + Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); + Assert.assertEquals(queueNum, globalIdMap.size()); + for (TopicQueueMappingOne mappingOne: globalIdMap.values()) { + Assert.assertEquals(broker2Name, mappingOne.getBname()); + } + } + //leave the time to refresh the metadata + Thread.sleep(500); + producer.setDebug(true); + { + List messageQueueList = producer.getMessageQueue(); + for (int i = 0; i < queueNum; i++) { + MessageQueue messageQueue = messageQueueList.get(i); + Assert.assertEquals(i, messageQueue.getQueueId()); + Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); + String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue); + Assert.assertEquals(destBrokerName, broker2Name); + } + + for(MessageQueue messageQueue: messageQueueList) { + producer.send(msgEachQueue, messageQueue); + } + Assert.assertEquals(0, producer.getSendErrorMsg().size()); + //leave the time to build the cq + Thread.sleep(100); + for(MessageQueue messageQueue: messageQueueList) { + Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue)); + } + } + { + consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); + System.out.println("Consume: " + consumer.getListener().getAllMsgBody().size()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListener().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + Map> messagesByQueue = new HashMap<>(); + for (Object object : consumer.getListener().getAllOriginMsg()) { + MessageExt messageExt = (MessageExt) object; + if (!messagesByQueue.containsKey(messageExt.getQueueId())) { + messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>()); + } + messagesByQueue.get(messageExt.getQueueId()).add(messageExt); + } + Assert.assertEquals(queueNum, messagesByQueue.size()); + for (int i = 0; i < queueNum; i++) { + List messageExts = messagesByQueue.get(i); + Assert.assertEquals(msgEachQueue, messageExts.size()); + Collections.sort(messageExts, new Comparator() { + @Override + public int compare(MessageExt o1, MessageExt o2) { + return (int) (o1.getQueueOffset() - o2.getQueueOffset()); + } + }); + for (int j = 0; j < msgEachQueue; j++) { + Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); + } + } + } + } + @After public void tearDown() { super.shutdown(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 98694d2a383bc5cb7db1c3dfccdc75d4c069a492..07c4bf345d0a03f7c4e48accde1ad22d14d128be 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -1112,6 +1112,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); } + //Step5: write the non-target brokers + for (String broker: brokerConfigMap.keySet()) { + if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) { + continue; + } + String addr = clientMetadata.findMasterBrokerAddr(broker); + TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker); + createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force); + } } @Override