diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index ab62415d7f543e3adaaabf5bf4f90abf083dc533..e1c2e30a93bda0a87adb320825668ba59a7504e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -767,14 +767,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements try { requestHeader.setBname(mappingItem.getBname()); requestHeader.setPhysical(true); - //TODO check if it is leader - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); - RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); - if (rpcResponse.getException() != null) { - throw rpcResponse.getException(); + requestHeader.setQueueId(mappingItem.getQueueId()); + long physicalOffset; + //run in local + if (mappingItem.getBname().equals(mappingDetail.getBname())) { + physicalOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(mappingDetail.getTopic(), mappingItem.getQueueId()); + } else { + RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); + RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); + if (rpcResponse.getException() != null) { + throw rpcResponse.getException(); + } + GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); + physicalOffset = offsetResponseHeader.getOffset(); } - GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); - long offset = mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); + long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 52a2d9c7ccdccf390dd449a36115f5b103891633..bf2ca28af1f6da56d4d3fa5eb07584f304318308 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -725,6 +725,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); + brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); } diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java index 6bbec6cb948e2510bd321a7b15a9953c07f9fb96..018623d8199d8709443c4628346c9bedcff13589 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java @@ -132,6 +132,12 @@ public class RMQNormalProducer extends AbstractMQProducer { } } + public void send(int num, MessageQueue mq) { + for (int i = 0; i < num; i++) { + sendMQ((Message) getMessageByTag(null), mq); + } + } + public ResultWrapper sendMQ(Message msg, MessageQueue mq) { org.apache.rocketmq.client.producer.SendResult metaqResult = null; try { @@ -145,6 +151,9 @@ public class RMQNormalProducer extends AbstractMQProducer { sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK)); sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName()); msgBodys.addData(new String(msg.getBody())); + if (originMsgs.getAllData().contains(msg)) { + System.out.println("Hash collision"); + } originMsgs.addData(msg); originMsgIndex.put(new String(msg.getBody()), metaqResult); } catch (Exception e) { diff --git a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java index df6abfc662b3ad25bff885d76ce6c91b879cdc8b..1e0a19a2b3164ab73aef6a41063195fd55bd6792 100644 --- a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java +++ b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java @@ -34,6 +34,7 @@ public abstract class AbstractMQProducer extends MQCollector implements MQProduc protected String producerInstanceName = null; protected boolean isDebug = false; + public AbstractMQProducer(String topic) { super(); producerGroupName = RandomUtil.getStringByUUID(); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 2d59f31d4143dac523a2a2f0145983dac28ef9e2..e573180b6022c74bb9910ffc7a769583ee795b4a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.MQVersion; @@ -129,6 +130,7 @@ public class BaseConf { return mqAdminExt; } + public static RMQNormalProducer getProducer(String nsAddr, String topic) { return getProducer(nsAddr, topic, false); } diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java index a4fa864b476549c76cb5e678df1cfdf89068b194..7c3f0e5ae475b2313509cf6082c03f5de76e2255 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java @@ -1,12 +1,15 @@ package org.apache.rocketmq.test.smoke; import org.apache.log4j.Logger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.junit.After; @@ -17,6 +20,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -61,6 +65,7 @@ public class StaticTopicIT extends BaseConf { @Test public void testCreateAndRemappingStaticTopic() throws Exception { String topic = "static" + MQRandomUtils.getRandomTopic(); + RMQNormalProducer producer = getProducer(nsAddr, topic); int queueNum = 10; Map localBrokerConfigMap = createStaticTopic(topic, queueNum, getBrokers()); { @@ -77,6 +82,26 @@ public class StaticTopicIT extends BaseConf { Map globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true); Assert.assertEquals(queueNum, globalIdMap.size()); } + List messageQueueList = producer.getMessageQueue(); + Assert.assertEquals(queueNum, messageQueueList.size()); + producer.setDebug(true); + for (int i = 0; i < queueNum; i++) { + MessageQueue messageQueue = messageQueueList.get(i); + Assert.assertEquals(topic, messageQueue.getTopic()); + Assert.assertEquals(i, messageQueue.getQueueId()); + Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName()); + } + for(MessageQueue messageQueue: messageQueueList) { + producer.send(100, messageQueue); + } + //leave the time to build the cq + Thread.sleep(500); + for(MessageQueue messageQueue: messageQueueList) { + Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue)); + Assert.assertEquals(100, defaultMQAdminExt.maxOffset(messageQueue)); + } + Assert.assertEquals(100 * queueNum, producer.getAllOriginMsg().size()); + Assert.assertEquals(0, producer.getSendErrorMsg().size()); /*{ Set targetBrokers = Collections.singleton(broker1Name); Map brokerConfigMapFromRemote = defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index d6a680a27825268269f044c8ef45774b9b827823..a32dc8d08141aa2dd59abb4b13867f8d295af554 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -215,6 +215,8 @@ public class UpdateStaticTopicSubCommand implements SubCommand { String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false); System.out.println("The old mapping data is written to file " + oldMappingDataFile); } + //add the existed brokers to target brokers + targetBrokers.addAll(brokerConfigMap.keySet()); //calculate the new data TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, new HashSet<>(), brokerConfigMap);