From 1f897336097e34bb02546dfdd80c814382adb87a Mon Sep 17 00:00:00 2001 From: dongeforever Date: Tue, 9 Nov 2021 20:20:27 +0800 Subject: [PATCH] Polish compile errrors --- .../consumer/DefaultMQPushConsumer.java | 2 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 5 - .../client/impl/consumer/PullAPIWrapper.java | 350 ++---------------- ...DefaultMQPullConsumerLogicalQueueTest.java | 248 ------------- .../DefaultMQProducerLogicalQueueTest.java | 311 ---------------- .../producer/DefaultMQProducerTest.java | 28 +- .../DefaultMQProducerWithOpenTracingTest.java | 14 +- .../trace/DefaultMQProducerWithTraceTest.java | 26 +- ...nsactionMQProducerWithOpenTracingTest.java | 16 +- .../TransactionMQProducerWithTraceTest.java | 40 +- .../protocol/route/TopicRouteDataNameSrv.java | 64 ---- .../common/RegisterBrokerBodyTest.java | 3 +- .../DefaultRequestProcessorTest.java | 17 +- 13 files changed, 78 insertions(+), 1046 deletions(-) delete mode 100644 client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java delete mode 100644 client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java delete mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index cf8cbb0d..8a6340b9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -691,7 +691,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { msg.setTopic(withNamespace(msg.getTopic())); - this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, (String) null); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c07f9fab..67fd9372 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.impl; -import com.google.common.base.Function; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -180,9 +179,7 @@ import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.InvokeCallback; @@ -201,8 +198,6 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; -import static com.google.common.base.Optional.fromNullable; - public class MQClientAPIImpl { private final static InternalLogger log = ClientLogger.getLog(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 6d966a6f..8cd4bab0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -16,15 +16,20 @@ */ package org.apache.rocketmq.client.impl.consumer; -import com.alibaba.fastjson.JSON; -import com.google.common.base.Objects; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.PopCallback; +import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.exception.MQRedirectException; import org.apache.rocketmq.client.hook.FilterMessageContext; import org.apache.rocketmq.client.hook.FilterMessageHook; import org.apache.rocketmq.client.impl.CommunicationMode; @@ -34,35 +39,18 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; -import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import static com.google.common.base.Optional.fromNullable; - public class PullAPIWrapper { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; @@ -83,36 +71,13 @@ public class PullAPIWrapper { public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { - final PullResultExt pullResultExt = (PullResultExt) pullResult; - - LogicalQueueRouteData queueRouteData = null; - PullResultWithLogicalQueues pullResultWithLogicalQueues = null; - if (pullResultExt instanceof PullResultWithLogicalQueues) { - pullResultWithLogicalQueues = (PullResultWithLogicalQueues) pullResultExt; - queueRouteData = pullResultWithLogicalQueues.getQueueRouteData(); - } - - if (queueRouteData != null) { - pullResultWithLogicalQueues.setOrigPullResultExt(new PullResultExt(pullResultExt.getPullStatus(), - queueRouteData.toLogicalQueueOffset(pullResultExt.getNextBeginOffset()), - queueRouteData.toLogicalQueueOffset(pullResultExt.getMinOffset()), - // although this maxOffset may not belong to this queue route, but the actual value must be a larger one, and since maxOffset here is not an accurate value, we just do it to make things simple. - queueRouteData.toLogicalQueueOffset(pullResultExt.getMaxOffset()), - pullResultExt.getMsgFoundList(), - pullResultExt.getSuggestWhichBrokerId(), - pullResultExt.getMessageBinary())); - } + PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List msgList = MessageDecoder.decodes(byteBuffer); - if (queueRouteData != null) { - // prevent pulled data is out of current queue route, this happens when some commit log data is cleaned in the broker but still pull from it. - msgList = queueRouteData.filterMessages(msgList); - } - List msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList(msgList.size()); @@ -142,10 +107,6 @@ public class PullAPIWrapper { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); msg.setBrokerName(mq.getBrokerName()); - msg.setQueueId(mq.getQueueId()); - if (queueRouteData != null) { - msg.setQueueOffset(queueRouteData.toLogicalQueueOffset(msg.getQueueOffset())); - } } pullResultExt.setMsgFoundList(msgListFilterAgain); @@ -153,7 +114,7 @@ public class PullAPIWrapper { pullResultExt.setMessageBinary(null); - return pullResultExt; + return pullResult; } public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { @@ -182,67 +143,24 @@ public class PullAPIWrapper { } public PullResult pullKernelImpl( - MessageQueue mq, + final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, - long offset, + final long offset, final int maxNums, final int sysFlag, - long commitOffset, + final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, - PullCallback pullCallback + final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) { - LogicalQueueContext logicalQueueContext = new LogicalQueueContext(mq, subExpression, expressionType, subVersion, offset, maxNums, sysFlag, commitOffset, brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, pullCallback); - while (true) { - try { - MessageQueue messageQueue = logicalQueueContext.getModifiedMessageQueue(); - if (messageQueue == null) { - if (pullCallback != null) { - pullCallback.onSuccess(logicalQueueContext.getPullResult()); - return null; - } else { - return logicalQueueContext.getPullResult(); - } - } - PullResult pullResult = this.pullKernelImplWithoutRetry(messageQueue, subExpression, expressionType, subVersion, logicalQueueContext.getModifiedOffset(), maxNums, sysFlag, logicalQueueContext.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, logicalQueueContext.wrapPullCallback()); - return logicalQueueContext.wrapPullResult(pullResult); - } catch (MQRedirectException e) { - if (!logicalQueueContext.shouldRetry(e)) { - throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "redirect"); - } - } - } - } else { - return this.pullKernelImplWithoutRetry(mq, subExpression, expressionType, subVersion, offset, maxNums, sysFlag, commitOffset, brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, pullCallback); - } - } - - public PullResult pullKernelImplWithoutRetry( - MessageQueue mq, - final String subExpression, - final String expressionType, - final long subVersion, - long offset, - final int maxNums, - final int sysFlag, - long commitOffset, - final long brokerSuspendMaxTimeMillis, - final long timeoutMillis, - final CommunicationMode communicationMode, - PullCallback pullCallback - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - String topic = mq.getTopic(); - int queueId = mq.getQueueId(); - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); @@ -265,8 +183,8 @@ public class PullAPIWrapper { PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); - requestHeader.setTopic(topic); - requestHeader.setQueueId(queueId); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); @@ -278,15 +196,17 @@ public class PullAPIWrapper { String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { - brokerAddr = computePullFromWhichFilterServer(topic, brokerAddr); + brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } - return this.mQClientFactory.getMQClientAPIImpl().pullMessage( + PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); + + return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); @@ -403,230 +323,4 @@ public class PullAPIWrapper { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } - private class LogicalQueueContext implements PullCallback { - private final MessageQueue mq; - private final String subExpression; - private final String expressionType; - private final long subVersion; - private final long offset; - private final int maxNums; - private final int sysFlag; - private final long commitOffset; - private final long brokerSuspendMaxTimeMillis; - private final long timeoutMillis; - private final CommunicationMode communicationMode; - private final PullCallback pullCallback; - - private volatile LogicalQueuesInfo logicalQueuesInfo; - private volatile LogicalQueueRouteData logicalQueueRouteData; - private volatile boolean isMaxReadableQueueRoute; - - private volatile PullResultExt pullResult = null; - - private final AtomicInteger retry = new AtomicInteger(); - - public LogicalQueueContext(MessageQueue mq, String subExpression, String expressionType, long subVersion, - long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, - long timeoutMillis, CommunicationMode communicationMode, - PullCallback pullCallback) { - this.mq = mq; - this.subExpression = subExpression; - this.expressionType = expressionType; - this.subVersion = subVersion; - this.offset = offset; - this.maxNums = maxNums; - this.sysFlag = sysFlag; - this.commitOffset = commitOffset; - this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; - this.timeoutMillis = timeoutMillis; - this.communicationMode = communicationMode; - this.pullCallback = pullCallback; - - this.buildLogicalQueuesInfo(); - } - - private boolean notUsingLogicalQueue() { - return !Objects.equal(mq.getBrokerName(), MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME) || this.logicalQueuesInfo == null; - } - - private void buildLogicalQueuesInfo() { - TopicRouteData topicRouteData = PullAPIWrapper.this.mQClientFactory.queryTopicRouteData(mq.getTopic()); - if (topicRouteData != null) { - //TODO - //this.logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo(); - } - } - - @Override public void onSuccess(PullResult pullResult) { - this.pullCallback.onSuccess(this.wrapPullResult(pullResult)); - } - - @Override public void onException(Throwable t) { - if (!this.shouldRetry(t)) { - this.pullCallback.onException(t); - return; - } - MessageQueue messageQueue = this.getModifiedMessageQueue(); - if (messageQueue == null) { - this.pullCallback.onSuccess(this.getPullResult()); - return; - } - try { - PullAPIWrapper.this.pullKernelImplWithoutRetry(messageQueue, subExpression, expressionType, subVersion, this.getModifiedOffset(), maxNums, sysFlag, this.getModifiedCommitOffset(), brokerSuspendMaxTimeMillis, timeoutMillis, communicationMode, this); - } catch (Exception e) { - this.pullCallback.onException(e); - } - } - - public MessageQueue getModifiedMessageQueue() { - if (this.notUsingLogicalQueue()) { - return this.mq; - } - this.logicalQueuesInfo.readLock().lock(); - try { - List queueRouteDataList = fromNullable(this.logicalQueuesInfo.get(this.mq.getQueueId())).or(Collections.emptyList()); - LogicalQueueRouteData searchKey = new LogicalQueueRouteData(); - searchKey.setState(MessageQueueRouteState.Normal); - searchKey.setLogicalQueueDelta(offset); - // it's sorted after getTopicRouteInfoFromNameServer - int startIdx = Collections.binarySearch(queueRouteDataList, searchKey); - if (startIdx < 0) { - startIdx = -startIdx - 1; - // lower entry - startIdx -= 1; - } - this.logicalQueueRouteData = null; - this.pullResult = null; - LogicalQueueRouteData lastReadableLogicalQueueRouteData = null; // first item which delta > offset - LogicalQueueRouteData minReadableLogicalQueueRouteData = null; - LogicalQueueRouteData maxReadableLogicalQueueRouteData = null; - for (int i = 0, size = queueRouteDataList.size(); i < size; i++) { - LogicalQueueRouteData queueRouteData = queueRouteDataList.get(i); - if (!queueRouteData.isReadable()) { - continue; - } - maxReadableLogicalQueueRouteData = queueRouteData; - if (minReadableLogicalQueueRouteData == null) { - minReadableLogicalQueueRouteData = queueRouteData; - if (i < startIdx) { - // must consider following `i++` operation when invoke `continue`, so decrease first - i = startIdx - 1; - continue; - } - } - if (queueRouteData.getLogicalQueueDelta() > offset) { - if (this.logicalQueueRouteData != null) { - if (this.logicalQueueRouteData.toLogicalQueueOffset(this.logicalQueueRouteData.getOffsetMax()) <= offset) { - this.logicalQueueRouteData = queueRouteData; - } - break; - } else { - if (lastReadableLogicalQueueRouteData == null) { - lastReadableLogicalQueueRouteData = queueRouteData; - } - } - } else { - this.logicalQueueRouteData = queueRouteData; - } - } - if (this.logicalQueueRouteData == null) { - if (lastReadableLogicalQueueRouteData != null) { - this.pullResult = new PullResultExt(PullStatus.OFFSET_ILLEGAL, lastReadableLogicalQueueRouteData.getLogicalQueueDelta(), minReadableLogicalQueueRouteData.getLogicalQueueDelta(), maxReadableLogicalQueueRouteData.getLogicalQueueDelta(), null, 0, null); - return null; - } else { - if (maxReadableLogicalQueueRouteData != null) { - this.logicalQueueRouteData = maxReadableLogicalQueueRouteData; - } else { - if (!queueRouteDataList.isEmpty()) { - this.logicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1); - } else { - pullResult = new PullResultExt(PullStatus.NO_NEW_MSG, 0, 0, 0, null, 0, null); - return null; - } - } - } - } - this.isMaxReadableQueueRoute = this.logicalQueueRouteData.isSameTo(maxReadableLogicalQueueRouteData); - return this.logicalQueueRouteData.getMessageQueue(); - } finally { - this.logicalQueuesInfo.readLock().unlock(); - } - } - - public PullResultExt getPullResult() { - return pullResult; - } - - public PullCallback wrapPullCallback() { - if (this.notUsingLogicalQueue()) { - return this.pullCallback; - } - if (!CommunicationMode.ASYNC.equals(this.communicationMode)) { - return this.pullCallback; - } - return this; - } - - public long getModifiedOffset() { - return this.logicalQueueRouteData.toMessageQueueOffset(this.offset); - } - - public long getModifiedCommitOffset() { - // TODO should this be modified too? If offset is not in current broker's range, how do we handle it? - return this.commitOffset; - } - - public void incrRetry() { - this.retry.incrementAndGet(); - } - - public boolean shouldRetry(Throwable t) { - this.incrRetry(); - if (this.retry.get() >= 3) { - return false; - } - if (t instanceof MQRedirectException) { - MQRedirectException e = (MQRedirectException) t; - this.processResponseBody(e.getBody()); - return true; - } - return false; - } - - public PullResult wrapPullResult(PullResult pullResult) { - if (pullResult == null) { - return null; - } - if (this.logicalQueueRouteData == null) { - return pullResult; - } - if (!this.isMaxReadableQueueRoute && PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus())) { - PullStatus status = PullStatus.OFFSET_ILLEGAL; - if (pullResult instanceof PullResultExt) { - PullResultExt pullResultExt = (PullResultExt) pullResult; - pullResult = new PullResultExt(status, pullResultExt.getNextBeginOffset(), pullResultExt.getMinOffset(), pullResultExt.getMaxOffset(), pullResultExt.getMsgFoundList(), pullResultExt.getSuggestWhichBrokerId(), pullResultExt.getMessageBinary()); - } else { - pullResult = new PullResult(status, pullResult.getNextBeginOffset(), pullResult.getMinOffset(), pullResult.getMaxOffset(), pullResult.getMsgFoundList()); - } - } - // method PullAPIWrapper#processPullResult will modify queueOffset/nextBeginOffset/minOffset/maxOffset - return new PullResultWithLogicalQueues(pullResult, this.logicalQueueRouteData); - } - - public void processResponseBody(byte[] responseBody) { - log.info("LogicalQueueContext.processResponseBody got redirect {}: {}", this.logicalQueueRouteData, responseBody != null ? new String(responseBody, MessageDecoder.CHARSET_UTF8) : null); - if (responseBody != null) { - try { - List queueRouteDataList = JSON.parseObject(responseBody, MixAll.TYPE_LIST_LOGICAL_QUEUE_ROUTE_DATA); - this.logicalQueuesInfo.updateLogicalQueueRouteDataList(this.mq.getQueueId(), queueRouteDataList); - return; - } catch (Exception e) { - log.warn("LogicalQueueContext.processResponseBody {} update exception, fallback to updateTopicRouteInfoFromNameServer", this.logicalQueueRouteData, e); - } - } - //TODO - //PullAPIWrapper.this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic(), false, null, Collections.singleton(this.mq.getQueueId())); - this.buildLogicalQueuesInfo(); - } - } } diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java deleted file mode 100644 index 15ec5642..00000000 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerLogicalQueueTest.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.consumer; - -import com.alibaba.fastjson.JSON; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQRedirectException; -import org.apache.rocketmq.client.impl.CommunicationMode; -import org.apache.rocketmq.client.impl.FindBrokerResult; -import org.apache.rocketmq.client.impl.MQClientAPIImpl; -import org.apache.rocketmq.client.impl.MQClientManager; -import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; -import org.apache.rocketmq.client.impl.consumer.PullResultExt; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.assertj.core.util.Lists; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class DefaultMQPullConsumerLogicalQueueTest { - private MQClientInstance mQClientFactory; - @Mock - private MQClientAPIImpl mQClientAPIImpl; - private DefaultMQPullConsumer pullConsumer; - private String topic; - private static final String cluster = "DefaultCluster"; - private static final String broker1Name = "BrokerA"; - private static final String broker1Addr = "127.0.0.2:10911"; - private static final String broker2Name = "BrokerB"; - private static final String broker2Addr = "127.0.0.3:10911"; - - @Before - public void init() throws Exception { - topic = "FooBar" + System.nanoTime(); - - mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig())); - - FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true); - - pullConsumer = new DefaultMQPullConsumer("FooBarGroup" + System.nanoTime()); - pullConsumer.setNamesrvAddr("127.0.0.1:9876"); - pullConsumer.start(); - - PullAPIWrapper pullAPIWrapper = pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper(); - FieldUtils.writeDeclaredField(pullAPIWrapper, "mQClientFactory", mQClientFactory, true); - - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRouteData()); - - doReturn(new FindBrokerResult(broker1Addr, false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker1Name), anyLong(), anyBoolean()); - doReturn(new FindBrokerResult(broker2Addr, false)).when(mQClientFactory).findBrokerAddressInSubscribe(eq(broker2Name), anyLong(), anyBoolean()); - } - - @After - public void terminate() { - pullConsumer.shutdown(); - } - - @Test - public void testStart_OffsetShouldNotNUllAfterStart() { - Assert.assertNotNull(pullConsumer.getOffsetStore()); - } - - @Test - public void testPullMessage_Success() throws Exception { - doAnswer(new Answer() { - @Override public PullResultExt answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt())); - } - }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull()); - - MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0); - PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3); - assertThat(pullResult).isNotNull(); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1); - assertThat(pullResult.getMinOffset()).isEqualTo(123); - assertThat(pullResult.getMaxOffset()).isEqualTo(2048); - assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList()); - } - - @Test - public void testPullMessage_NotFound() throws Exception { - doAnswer(new Answer() { - @Override public PullResult answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.NO_NEW_MSG, new ArrayList()); - } - }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull()); - - MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0); - PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG); - } - - @Test - public void testPullMessageAsync_Success() throws Exception { - doAnswer(new Answer() { - @Override public PullResult answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - PullResult pullResult = DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt())); - - PullCallback pullCallback = mock.getArgument(4); - pullCallback.onSuccess(pullResult); - return null; - } - }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.ASYNC), any(PullCallback.class)); - - final SettableFuture future = SettableFuture.create(); - MessageQueue messageQueue = new MessageQueue(topic, broker1Name, 0); - pullConsumer.pull(messageQueue, "*", 1024, 3, new PullCallback() { - @Override - public void onSuccess(PullResult pullResult) { - future.set(pullResult); - } - - @Override - public void onException(Throwable e) { - future.setException(e); - } - }); - PullResult pullResult = future.get(3, TimeUnit.SECONDS); - assertThat(pullResult).isNotNull(); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1); - assertThat(pullResult.getMinOffset()).isEqualTo(123); - assertThat(pullResult.getMaxOffset()).isEqualTo(2048); - assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList()); - } - - @Test - public void testPullMessageSync_Redirect() throws Exception { - doAnswer(new Answer() { - @Override public PullResult answer(InvocationOnMock mock) throws Throwable { - throw new MQRedirectException(JSON.toJSONBytes(ImmutableList.of( - new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr), - new LogicalQueueRouteData(0, 10, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr) - ))); - } - }).when(mQClientAPIImpl).pullMessage(eq(broker1Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull()); - doAnswer(new Answer() { - @Override public PullResult answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - return DefaultMQPullConsumerLogicalQueueTest.this.createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(new MessageExt())); - } - }).when(mQClientAPIImpl).pullMessage(eq(broker2Addr), any(PullMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), (PullCallback) isNull()); - - MessageQueue messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0); - PullResult pullResult = pullConsumer.pull(messageQueue, "*", 1024, 3); - assertThat(pullResult).isNotNull(); - assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND); - assertThat(pullResult.getNextBeginOffset()).isEqualTo(1024 + 1); - assertThat(pullResult.getMinOffset()).isEqualTo(123 + 10); - assertThat(pullResult.getMaxOffset()).isEqualTo(2048 + 10); - assertThat(pullResult.getMsgFoundList()).isEqualTo(Collections.emptyList()); - } - - private TopicRouteData createTopicRouteData() { - TopicRouteData topicRouteData = new TopicRouteData(); - - topicRouteData.setFilterServerTable(new HashMap>()); - topicRouteData.setBrokerDatas(ImmutableList.of( - new BrokerData(cluster, broker1Name, new HashMap(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))), - new BrokerData(cluster, broker2Name, new HashMap(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr))) - )); - - List queueDataList = new ArrayList(); - QueueData queueData; - queueData = new QueueData(); - queueData.setBrokerName(broker1Name); - queueData.setPerm(6); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(4); - queueData.setTopicSysFlag(0); - queueDataList.add(queueData); - queueData = new QueueData(); - queueData.setBrokerName(broker2Name); - queueData.setPerm(6); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(4); - queueData.setTopicSysFlag(0); - queueDataList.add(queueData); - topicRouteData.setQueueDatas(queueDataList); - - LogicalQueuesInfo info = new LogicalQueuesInfo(); - info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, broker1Addr))); - topicRouteData.setLogicalQueuesInfo(info); - return topicRouteData; - } - - private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, - List messageExtList) throws Exception { - return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, new byte[] {}); - } -} \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java deleted file mode 100644 index 12d5cbaa..00000000 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerLogicalQueueTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.producer; - -import com.alibaba.fastjson.JSON; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.exception.MQRedirectException; -import org.apache.rocketmq.client.hook.SendMessageContext; -import org.apache.rocketmq.client.impl.CommunicationMode; -import org.apache.rocketmq.client.impl.MQClientAPIImpl; -import org.apache.rocketmq.client.impl.MQClientManager; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.assertj.core.api.ThrowableAssert; -import org.assertj.core.util.Lists; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class DefaultMQProducerLogicalQueueTest { - private MQClientInstance mQClientFactory; - @Mock - private MQClientAPIImpl mQClientAPIImpl; - - private DefaultMQProducer producer; - private Message message; - private String topic; - - private MessageQueue messageQueue; - - private static final String cluster = "DefaultCluster"; - private static final String broker1Name = "broker1"; - private static final String broker2Name = "broker2"; - private static final String broker1Addr = "127.0.0.2:10911"; - private static final String broker2Addr = "127.0.0.3:10911"; - - @Before - public void init() throws Exception { - topic = "Foobar" + System.nanoTime(); - messageQueue = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0); - - ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); - for (MQClientInstance instance : factoryTable.values()) { - instance.shutdown(); - } - factoryTable.clear(); - - mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig())); - factoryTable.put(new ClientConfig().buildMQClientId(), mQClientFactory); - - String producerGroupTemp = "FooBar_PID" + System.nanoTime(); - producer = new DefaultMQProducer(producerGroupTemp); - producer.setNamesrvAddr("127.0.0.1:9876"); - producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE); - message = new Message(topic, new byte[] {'a'}); - - mQClientFactory.registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); - - producer.start(); - - FieldUtils.writeDeclaredField(producer.getDefaultMQProducerImpl(), "mQClientFactory", mQClientFactory, true); - FieldUtils.writeField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true); - - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - any(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenAnswer(new Answer() { - @Override public SendResult answer(InvocationOnMock invocation) throws Throwable { - SendCallback sendCallback = invocation.getArgument(6); - sendCallback.onSuccess(DefaultMQProducerLogicalQueueTest.this.createSendResult(SendStatus.SEND_OK)); - return null; - } - }); - } - - @After - public void terminate() { - producer.shutdown(); - } - - @Test - public void testSendMessageSync_Success() throws Exception { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); - SendResult sendResult = producer.send(message, messageQueue); - - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); - assertThat(sendResult.getQueueOffset()).isEqualTo(456L); - } - - @Test - public void testSendMessageSync_Redirect() throws Exception { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); - - when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), - (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenThrow(new MQRedirectException(null)); - - assertThatThrownBy(new ThrowableAssert.ThrowingCallable() { - @Override public void call() throws Throwable { - producer.send(message, messageQueue); - } - }).isInstanceOf(MQBrokerException.class).hasMessageContaining("redirect"); - - when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), - (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenThrow(new MQRedirectException(JSON.toJSONBytes(ImmutableList.of( - new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Expired, 0, 0, 0, 0, broker1Addr), - new LogicalQueueRouteData(0, 10, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr))))); - when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), - (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); - - SendResult sendResult = producer.send(message, messageQueue); - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); - assertThat(sendResult.getQueueOffset()).isEqualTo(466L); - } - - @Test - public void testSendMessageSync_RemotingException() throws Exception { - TopicRouteData topicRouteData = createTopicRoute(); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(topicRouteData); - - when(mQClientAPIImpl.sendMessage(eq(broker1Addr), eq(broker1Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), - (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenThrow(new RemotingConnectException(broker1Addr)); - SendResult returnSendResult = createSendResult(SendStatus.SEND_OK); - when(mQClientAPIImpl.sendMessage(eq(broker2Addr), eq(broker2Name), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), eq(CommunicationMode.SYNC), - (SendCallback) isNull(), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(returnSendResult); - - assertThatThrownBy(new ThrowableAssert.ThrowingCallable() { - @Override public void call() throws Throwable { - producer.send(message, messageQueue); - } - }).isInstanceOf(RemotingConnectException.class).hasMessageContaining(broker1Addr); - - topicRouteData.getLogicalQueuesInfo().get(0).add(new LogicalQueueRouteData(0, -1, new MessageQueue(topic, broker2Name, 1), MessageQueueRouteState.WriteOnly, 0, -1, -1, -1, broker2Addr)); - - SendResult sendResult = producer.send(message, messageQueue); - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); - assertThat(sendResult.getQueueOffset()).isEqualTo(-1L); - } - - @Test - public void testSendMessageAsync_Success() throws Exception { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); - - final SettableFuture future = SettableFuture.create(); - producer.send(message, messageQueue, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - future.set(sendResult); - } - - @Override - public void onException(Throwable e) { - future.setException(e); - } - }); - - SendResult sendResult = future.get(3, TimeUnit.SECONDS); - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); - assertThat(sendResult.getQueueOffset()).isEqualTo(456L); - } - - @Test - public void testSendMessageAsync() throws Exception { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); - - final AtomicReference> future = new AtomicReference>(); - SendCallback sendCallback = new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - future.get().set(sendResult); - } - - @Override - public void onException(Throwable e) { - future.get().setException(e); - } - }; - - Message message = new Message(); - message.setTopic("test"); - message.setBody("hello world".getBytes()); - future.set(SettableFuture.create()); - producer.send(new Message(), messageQueue, sendCallback); - assertThatThrownBy(new ThrowableAssert.ThrowingCallable() { - @Override public void call() throws Throwable { - future.get().get(3, TimeUnit.SECONDS); - } - }).hasCauseInstanceOf(MQClientException.class).hasMessageContaining("The specified topic is blank"); - - //this message is send success - message.setTopic(topic); - future.set(SettableFuture.create()); - producer.send(message, messageQueue, sendCallback, 1000); - future.get().get(3, TimeUnit.SECONDS); - } - - public TopicRouteData createTopicRoute() { - TopicRouteData topicRouteData = new TopicRouteData(); - - topicRouteData.setFilterServerTable(new HashMap>()); - topicRouteData.setBrokerDatas(ImmutableList.of( - new BrokerData(cluster, broker1Name, new HashMap(Collections.singletonMap(MixAll.MASTER_ID, broker1Addr))), - new BrokerData(cluster, broker2Name, new HashMap(Collections.singletonMap(MixAll.MASTER_ID, broker2Addr))) - )); - - List queueDataList = new ArrayList(); - QueueData queueData; - queueData = new QueueData(); - queueData.setBrokerName(broker1Name); - queueData.setPerm(6); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(4); - queueData.setTopicSysFlag(0); - queueDataList.add(queueData); - queueData = new QueueData(); - queueData.setBrokerName(broker2Name); - queueData.setPerm(6); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(4); - queueData.setTopicSysFlag(0); - queueDataList.add(queueData); - topicRouteData.setQueueDatas(queueDataList); - - LogicalQueuesInfo info = new LogicalQueuesInfo(); - info.put(0, Lists.newArrayList(new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.Normal, 0, 0, 0, 0, broker1Addr))); - topicRouteData.setLogicalQueuesInfo(info); - return topicRouteData; - } - - private SendResult createSendResult(SendStatus sendStatus) { - SendResult sendResult = new SendResult(); - sendResult.setMsgId("123"); - sendResult.setOffsetMsgId("123"); - sendResult.setQueueOffset(456); - sendResult.setSendStatus(sendStatus); - sendResult.setRegionId("HZ"); - sendResult.setMessageQueue(new MessageQueue(topic, broker1Name, 0)); - return sendResult; - } -} diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index a8906b33..5f29fe11 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -49,20 +48,21 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -153,7 +153,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); SendResult sendResult = producer.send(message); assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); @@ -163,7 +163,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageSync_WithBodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); SendResult sendResult = producer.send(bigMessage); assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); @@ -174,7 +174,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { final CountDownLatch countDownLatch = new CountDownLatch(1); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -197,7 +197,7 @@ public class DefaultMQProducerTest { final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(6); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -239,7 +239,7 @@ public class DefaultMQProducerTest { final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(4); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -260,7 +260,7 @@ public class DefaultMQProducerTest { } }; - List msgs = new ArrayList(); + List msgs = new ArrayList<>(); for (int i = 0; i < 5; i++) { Message message = new Message(); message.setTopic("test"); @@ -281,7 +281,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { final CountDownLatch countDownLatch = new CountDownLatch(1); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(bigMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -300,7 +300,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageSync_SuccessWithHook() throws Throwable { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); final Throwable[] assertionErrors = new Throwable[1]; final CountDownLatch countDownLatch = new CountDownLatch(2); producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() { @@ -368,7 +368,7 @@ public class DefaultMQProducerTest { @Test public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); final AtomicBoolean finish = new AtomicBoolean(false); new Thread(new Runnable() { @Override public void run() { @@ -394,13 +394,13 @@ public class DefaultMQProducerTest { @Test(expected = RequestTimeoutException.class) public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); Message result = producer.request(message, 3 * 1000L); } @Test public void testAsyncRequest_OnSuccess() throws Exception { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); final CountDownLatch countDownLatch = new CountDownLatch(1); RequestCallback requestCallback = new RequestCallback() { @Override public void onSuccess(Message message) { diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java index 0a1f6850..5d64a93f 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java @@ -20,11 +20,6 @@ package org.apache.rocketmq.client.trace; import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,14 +47,17 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -115,7 +113,7 @@ public class DefaultMQProducerWithOpenTracingTest { @Test public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.send(message); assertThat(tracer.finishedSpans().size()).isEqualTo(1); MockSpan span = tracer.finishedSpans().get(0); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 64f63c59..234e32e6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -17,13 +17,6 @@ package org.apache.rocketmq.client.trace; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -50,17 +43,18 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.nullable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -128,7 +122,7 @@ public class DefaultMQProducerWithTraceTest { @Test public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); final CountDownLatch countDownLatch = new CountDownLatch(1); try { producer.send(message); @@ -140,7 +134,7 @@ public class DefaultMQProducerWithTraceTest { @Test public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); final CountDownLatch countDownLatch = new CountDownLatch(1); try { producer.send(message); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java index aca62544..dd6d1083 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java @@ -20,12 +20,6 @@ package org.apache.rocketmq.client.trace; import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -59,14 +53,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -133,7 +131,7 @@ public class TransactionMQProducerWithOpenTracingTest { @Test public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); producer.sendMessageInTransaction(message, null); assertThat(tracer.finishedSpans().size()).isEqualTo(2); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java index b3a44143..f838817b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -17,13 +17,6 @@ package org.apache.rocketmq.client.trace; -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -57,20 +50,19 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Spy; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; + +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -135,7 +127,7 @@ public class TransactionMQProducerWithTraceTest { Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList"); fieldHooks.setAccessible(true); - Listhooks = new ArrayList(); + Listhooks = new ArrayList<>(); hooks.add(endTransactionHook); fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks); @@ -150,14 +142,12 @@ public class TransactionMQProducerWithTraceTest { @Test public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), ArgumentMatchers.>any())).thenReturn(createTopicRoute()); - final AtomicReference context = new AtomicReference(); - doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock mock) throws Throwable { - context.set(mock.getArgument(0)); - return null; - } - }).when(endTransactionHook).endTransaction(ArgumentMatchers.any()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + AtomicReference context = new AtomicReference<>(); + doAnswer(mock -> { + context.set(mock.getArgument(0)); + return null; + }).when(endTransactionHook).endTransaction(any()); producer.sendMessageInTransaction(message, null); EndTransactionContext ctx = context.get(); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java deleted file mode 100644 index e9fb84e9..00000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteDataNameSrv.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.common.protocol.route; - -import com.google.common.base.Objects; - -public class TopicRouteDataNameSrv extends TopicRouteData { - private LogicalQueuesInfoUnordered logicalQueuesInfoUnordered; - - public TopicRouteDataNameSrv() { - } - - public LogicalQueuesInfoUnordered getLogicalQueuesInfoUnordered() { - return logicalQueuesInfoUnordered; - } - - public void setLogicalQueuesInfoUnordered( - LogicalQueuesInfoUnordered logicalQueuesInfoUnordered) { - this.logicalQueuesInfoUnordered = logicalQueuesInfoUnordered; - } - - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - if (!super.equals(o)) - return false; - TopicRouteDataNameSrv srv = (TopicRouteDataNameSrv) o; - return Objects.equal(logicalQueuesInfoUnordered, srv.logicalQueuesInfoUnordered); - } - - @Override public int hashCode() { - return Objects.hashCode(super.hashCode(), logicalQueuesInfoUnordered); - } - - @Override public String toString() { - return "TopicRouteDataNameSrv{" + - "logicalQueuesInfoUnordered=" + logicalQueuesInfoUnordered + - "} " + super.toString(); - } - - public TopicRouteData toTopicRouteData() { - TopicRouteData topicRouteData = new TopicRouteData(this); - if (this.logicalQueuesInfoUnordered != null) { - topicRouteData.setLogicalQueuesInfo(this.logicalQueuesInfoUnordered.toLogicalQueuesInfoOrdered()); - } - return topicRouteData; - } -} diff --git a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java index 87a0fc00..428a9289 100644 --- a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; +import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import static org.junit.Assert.assertEquals; import org.junit.Test; @@ -29,7 +30,7 @@ public class RegisterBrokerBodyTest { @Test public void test_encode_decode() throws IOException { RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); - TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper(); registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper); ConcurrentMap topicConfigTable = new ConcurrentHashMap(); diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java index 38bff74b..e80dec7e 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java @@ -16,42 +16,27 @@ */ package org.apache.rocketmq.namesrv.processor; -import com.alibaba.fastjson.JSON; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.rocketmq.common.DataVersion; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.MixAll; + import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; -import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager; -- GitLab