提交 c434ff43 编写于 作者: Q qqeasonchen

Merge branch 'develop' of https://github.com/apache/rocketmq into rocketmq-dev-rpc

......@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.client;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
......@@ -95,7 +97,6 @@ public class ClientConfig {
}
}
public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
......@@ -124,9 +125,21 @@ public class ClientConfig {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}
return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}
public Collection<MessageQueue> queuesWithNamespace(Collection<MessageQueue> queues) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queues;
}
Iterator<MessageQueue> iter = queues.iterator();
while (iter.hasNext()) {
MessageQueue queue = iter.next();
queue.setTopic(withNamespace(queue.getTopic()));
}
return queues;
}
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
......@@ -170,6 +183,7 @@ public class ClientConfig {
/**
* Domain name mode access way does not support the delimiter(;), and only one domain name can be set.
*
* @param namesrvAddr name server address
*/
public void setNamesrvAddr(String namesrvAddr) {
......
......@@ -53,14 +53,17 @@ public class Validators {
if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
* Consumers belonging to the same consumer group share a group id. The consumers in a group then
* divides the topic as fairly amongst themselves as possible by establishing that each queue is only
* consumed by a single consumer from the group. If all consumers are from the same group, it functions
* as a traditional message queue. Each message would be consumed by one consumer of the group only.
* When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional
* publish-subscribe model. The messages are broadcast to all consumer groups.
*/
private String consumerGroup;
/**
* Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
/**
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
* The socket timeout in milliseconds
*/
private long consumerPullTimeoutMillis = 1000 * 10;
/**
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* Message queue listener
*/
private MessageQueueListener messageQueueListener;
/**
* Offset Storage
*/
private OffsetStore offsetStore;
/**
* Queue allocation algorithm
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
/**
* Whether the unit of subscription group
*/
private boolean unitMode = false;
/**
* The flag for auto commit offset
*/
private boolean autoCommit = true;
/**
* Pull thread number
*/
private int pullThreadNums = 20;
/**
* Maximum commit offset interval time in milliseconds.
*/
private long autoCommitIntervalMillis = 5 * 1000;
/**
* Maximum number of messages pulled each time.
*/
private int pullBatchSize = 10;
/**
* Flow control threshold for consume request, each consumer will cache at most 10000 consume requests by default.
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private long pullThresholdForAll = 10000;
/**
* Consume max span offset.
*/
private int consumeMaxSpan = 2000;
/**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private int pullThresholdSizeForQueue = 100;
/**
* The poll timeout in milliseconds
*/
private long pollTimeoutMillis = 1000 * 5;
/**
* Interval time in in milliseconds for checking changes in topic metadata.
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
/**
* Default constructor.
*/
public DefaultLitePullConsumer() {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultLitePullConsumer(final String consumerGroup) {
this(null, consumerGroup, null);
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(RPCHook rpcHook) {
this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this(null, consumerGroup, rpcHook);
}
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
@Override
public void start() throws MQClientException {
this.defaultLitePullConsumerImpl.start();
}
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
}
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
@Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
}
@Override
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
}
@Override
public List<MessageExt> poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
}
@Override
public List<MessageExt> poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout);
}
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
}
@Override
public void pause(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
}
@Override
public void resume(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
}
@Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
@Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
}
@Override
public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
}
@Override
public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync();
}
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue);
}
@Override
public boolean isAutoCommit() {
return autoCommit;
}
@Override
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
public int getPullThreadNums() {
return pullThreadNums;
}
public void setPullThreadNums(int pullThreadNums) {
this.pullThreadNums = pullThreadNums;
}
public long getAutoCommitIntervalMillis() {
return autoCommitIntervalMillis;
}
public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) {
this.autoCommitIntervalMillis = autoCommitIntervalMillis;
}
public int getPullBatchSize() {
return pullBatchSize;
}
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
public long getPullThresholdForAll() {
return pullThresholdForAll;
}
public void setPullThresholdForAll(long pullThresholdForAll) {
this.pullThresholdForAll = pullThresholdForAll;
}
public int getConsumeMaxSpan() {
return consumeMaxSpan;
}
public void setConsumeMaxSpan(int consumeMaxSpan) {
this.consumeMaxSpan = consumeMaxSpan;
}
public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}
public int getPullThresholdSizeForQueue() {
return pullThresholdSizeForQueue;
}
public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
}
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
return allocateMessageQueueStrategy;
}
public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
public long getBrokerSuspendMaxTimeMillis() {
return brokerSuspendMaxTimeMillis;
}
public long getPollTimeoutMillis() {
return pollTimeoutMillis;
}
public void setPollTimeoutMillis(long pollTimeoutMillis) {
this.pollTimeoutMillis = pollTimeoutMillis;
}
public OffsetStore getOffsetStore() {
return offsetStore;
}
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
public boolean isUnitMode() {
return unitMode;
}
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
public MessageModel getMessageModel() {
return messageModel;
}
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
public String getConsumerGroup() {
return consumerGroup;
}
public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}
public long getConsumerPullTimeoutMillis() {
return consumerPullTimeoutMillis;
}
public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
}
public long getConsumerTimeoutMillisWhenSuspend() {
return consumerTimeoutMillisWhenSuspend;
}
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
public long getTopicMetadataCheckIntervalMillis() {
return topicMetadataCheckIntervalMillis;
}
public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
}
}
......@@ -38,9 +38,13 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Default pulling consumer
* Default pulling consumer.
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public interface LitePullConsumer {
/**
* Start the consumer
*/
void start() throws MQClientException;
/**
* Shutdown the consumer
*/
void shutdown();
/**
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* Subscribe some topic with selector.
*
* @param selector message selector({@link MessageSelector}), can be null.
* @throws MQClientException if there is any client error.
*/
void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
/**
* Unsubscribe consumption some topic
*
* @param topic Message topic that needs to be unsubscribe.
*/
void unsubscribe(final String topic);
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
*
* @param messageQueues Message queues that needs to be assigned.
*/
void assign(Collection<MessageQueue> messageQueues);
/**
* Fetch data for the topics or partitions specified using assign API
*
* @return list of message, can be null.
*/
List<MessageExt> poll();
/**
* Fetch data for the topics or partitions specified using assign API
*
* @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
* negative
* @return list of message, can be null.
*/
List<MessageExt> poll(long timeout);
/**
* Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
* message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
* this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
* @param offset
*/
void seek(MessageQueue messageQueue, long offset) throws MQClientException;
/**
* Suspend pulling from the requested message queues.
*
* Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
* messages of the requested message queues drain.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
* rebalance.
*
* @param messageQueues Message queues that needs to be paused.
*/
void pause(Collection<MessageQueue> messageQueues);
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}.
*
* @param messageQueues Message queues that needs to be resumed.
*/
void resume(Collection<MessageQueue> messageQueues);
/**
* Whether to enable auto-commit consume offset.
*
* @return true if enable auto-commit, false if disable auto-commit.
*/
boolean isAutoCommit();
/**
* Set whether to enable auto-commit consume offset.
*
* @param autoCommit Whether to enable auto-commit.
*/
void setAutoCommit(boolean autoCommit);
/**
* Get metadata about the message queues for a given topic.
*
* @param topic The topic that need to get metadata.
* @return collection of message queues
* @throws MQClientException if there is any client error.
*/
Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;
/**
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
* queue.
*
* @param messageQueue Message queues that needs to get offset by timestamp.
* @param timestamp
* @return offset
* @throws MQClientException if there is any client error.
*/
Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
/**
* Manually commit consume offset.
*/
void commitSync();
/**
* Get the last committed offset for the given message queue.
*
* @param messageQueue
* @return offset, if offset equals -1 means no offset in broker.
* @throws MQClientException if there is any client error.
*/
Long committed(MessageQueue messageQueue) throws MQClientException;
/**
* Register a callback for sensing topic metadata changes.
*
* @param topic The topic that need to monitor.
* @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link
* TopicMessageQueueChangeListener}
* @throws MQClientException if there is any client error.
*/
void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
}
......@@ -169,4 +169,5 @@ public interface MQPullConsumer extends MQConsumer {
*/
void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
......@@ -32,7 +32,9 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
/**
* Schedule service for pull consumer
* Schedule service for pull consumer.
* This Consumer will be removed in 2022, and a better implementation {@link
* DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
*/
public class MQPullConsumerScheduleService {
private final InternalLogger log = ClientLogger.getLog();
......@@ -157,7 +159,7 @@ public class MQPullConsumerScheduleService {
}
}
class PullTaskImpl implements Runnable {
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
public interface TopicMessageQueueChangeListener {
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param messageQueues
*/
void onChanged(String topic, Set<MessageQueue> messageQueues);
}
......@@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
......@@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
* Update the Consumer Offset in one way, once the Master is off, updated to Slave,
* here need to be optimized.
* Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
......@@ -196,15 +194,13 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
* here need to be optimized.
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
......
......@@ -40,11 +40,11 @@ public class MQClientManager {
return instance;
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {
return getAndCreateMQClientInstance(clientConfig, null);
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;
private RebalanceImpl rebalanceImpl;
public AssignedMessageQueue() {
assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueState>();
}
public void setRebalanceImpl(RebalanceImpl rebalanceImpl) {
this.rebalanceImpl = rebalanceImpl;
}
public Set<MessageQueue> messageQueues() {
return assignedMessageQueueState.keySet();
}
public boolean isPaused(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.isPaused();
}
return true;
}
public void pause(Collection<MessageQueue> messageQueues) {
for (MessageQueue messageQueue : messageQueues) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.setPaused(true);
}
}
}
public void resume(Collection<MessageQueue> messageQueueCollection) {
for (MessageQueue messageQueue : messageQueueCollection) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.setPaused(false);
}
}
}
public ProcessQueue getProcessQueue(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getProcessQueue();
}
return null;
}
public long getPullOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getPullOffset();
}
return -1;
}
public void updatePullOffset(MessageQueue messageQueue, long offset) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
messageQueueState.setPullOffset(offset);
}
}
public long getConusmerOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getConsumeOffset();
}
return -1;
}
public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
messageQueueState.setConsumeOffset(offset);
}
}
public void setSeekOffset(MessageQueue messageQueue, long offset) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
messageQueueState.setSeekOffset(offset);
}
}
public long getSeekOffset(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getSeekOffset();
}
return -1;
}
public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
}
addAssignedMessageQueue(assigned);
}
}
public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (!assigned.contains(next.getKey())) {
next.getValue().getProcessQueue().setDropped(true);
it.remove();
}
}
addAssignedMessageQueue(assigned);
}
}
private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueState messageQueueState;
if (rebalanceImpl != null && rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
messageQueueState = new MessageQueueState(messageQueue, rebalanceImpl.getProcessQueueTable().get(messageQueue));
} else {
ProcessQueue processQueue = new ProcessQueue();
messageQueueState = new MessageQueueState(messageQueue, processQueue);
}
this.assignedMessageQueueState.put(messageQueue, messageQueueState);
}
}
}
public void removeAssignedMessageQueue(String topic) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueState> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
it.remove();
}
}
}
}
private class MessageQueueState {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private volatile boolean paused = false;
private volatile long pullOffset = -1;
private volatile long consumeOffset = -1;
private volatile long seekOffset = -1;
private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public boolean isPaused() {
return paused;
}
public void setPaused(boolean paused) {
this.paused = paused;
}
public long getPullOffset() {
return pullOffset;
}
public void setPullOffset(long pullOffset) {
this.pullOffset = pullOffset;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public void setProcessQueue(ProcessQueue processQueue) {
this.processQueue = processQueue;
}
public long getConsumeOffset() {
return consumeOffset;
}
public void setConsumeOffset(long consumeOffset) {
this.consumeOffset = consumeOffset;
}
public long getSeekOffset() {
return seekOffset;
}
public void setSeekOffset(long seekOffset) {
this.seekOffset = seekOffset;
}
}
}
......@@ -69,6 +69,11 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumerImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
......@@ -77,7 +82,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
protected MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
......@@ -632,7 +637,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
......
......@@ -581,7 +581,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
......
......@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -431,4 +432,5 @@ public class ProcessQueue {
public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
}
......@@ -41,8 +41,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
/**
* Base class for rebalance algorithm
* This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl} is recommend to use
* in the scenario of actively pulling messages.
*/
@Deprecated
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
public class RebalanceLitePullImpl extends RebalanceImpl {
private final DefaultLitePullConsumerImpl litePullConsumerImpl;
public RebalanceLitePullImpl(DefaultLitePullConsumerImpl litePullConsumerImpl) {
this(null, null, null, null, litePullConsumerImpl);
}
public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.litePullConsumerImpl = litePullConsumerImpl;
}
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.litePullConsumerImpl.getOffsetStore().persist(mq);
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
return true;
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
}
@Override
public void removeDirtyOffset(final MessageQueue mq) {
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
}
......@@ -246,10 +246,6 @@ public class MQClientInstance {
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
......@@ -366,7 +362,6 @@ public class MQClientInstance {
}
/**
*
* @param offsetTable
* @param namespace
* @return newOffsetTable
......@@ -385,6 +380,7 @@ public class MQClientInstance {
return newOffsetTable;
}
/**
* Remove offline broker
*/
......@@ -676,10 +672,13 @@ public class MQClientInstance {
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
......@@ -743,9 +742,10 @@ public class MQClientInstance {
return false;
}
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
* This method will be removed in the version 5.0.0,because filterServer was removed,and method
* <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*/
@Deprecated
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
......
......@@ -187,7 +187,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
......@@ -533,6 +533,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
private void validateNameServerSetting() throws MQClientException {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
}
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
......@@ -541,7 +550,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
......@@ -672,13 +680,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
......@@ -1125,6 +1129,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
......
......@@ -54,7 +54,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQPullConsumer pullConsumer;
......
......@@ -59,8 +59,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -73,7 +75,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
......@@ -102,10 +105,12 @@ public class DefaultMQPushConsumerTest {
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar";
private String group = "FooBarGroup";
......
......@@ -70,7 +70,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
......
......@@ -70,8 +70,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
......@@ -83,7 +85,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
......@@ -101,7 +104,6 @@ public class DefaultMQConsumerWithTraceTest {
private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer;
private AsyncTraceDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
......@@ -112,17 +114,16 @@ public class DefaultMQConsumerWithTraceTest {
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, "");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,"");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, false, "");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher();
asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
......@@ -131,12 +132,14 @@ public class DefaultMQConsumerWithTraceTest {
}
});
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
......
......@@ -60,7 +60,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQProducerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
......@@ -87,7 +87,7 @@ public class DefaultMQProducerWithTraceTest {
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[]{'a', 'b', 'c'});
message = new Message(topic, new byte[] {'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
asyncTraceDispatcher.getHostProducer();
......@@ -108,14 +108,13 @@ public class DefaultMQProducerWithTraceTest {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
......
......@@ -19,7 +19,7 @@
### 3. 样例
- [样例(Example)](RocketMQ_Example.md) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事消息样例等。
- [样例(Example)](RocketMQ_Example.md) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事消息样例等。
### 4. 最佳实践
......
......@@ -26,7 +26,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
## 9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
## 10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
......
......@@ -1033,30 +1033,6 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-s</td>
<td class=xl68 width=87 style='width:65pt'>是否执行jstack</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td rowspan=5 height=181 class=xl69 width=87 style='border-bottom:1.0pt
height:135.0pt;border-top:none;width:65pt'>getConsumerStatus</td>
<td rowspan=5 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>获取 Consumer 消费进度</td>
<td class=xl67 width=87 style='width:65pt'>-g</td>
<td class=xl68 width=87 style='width:65pt'>消费者所属组名</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-t</td>
<td class=xl68 width=87 style='width:65pt'>查询主题</td>
</tr>
<tr height=39 style='height:29.0pt'>
<td height=39 class=xl67 width=87 style='height:29.0pt;width:65pt'>-i</td>
<td class=xl68 width=87 style='width:65pt'>Consumer 客户端 ip</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
<td class=xl68 width=87 style='width:65pt'>NameServer 服务地址,格式 ip:port</td>
</tr>
<tr height=23 style='height:17.0pt'>
<td height=23 class=xl67 width=87 style='height:17.0pt;width:65pt'>-h</td>
<td class=xl68 width=87 style='width:65pt'>打印帮助</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td rowspan=13 height=761 class=xl69 width=87 style='border-bottom:1.0pt
height:569.0pt;border-top:none;width:65pt'>updateSubGroup</td>
......
......@@ -25,7 +25,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
```
- HTTP static server addressing(default)
After client started, it will access a http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
After client started, it will access the http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
```text
192.168.0.1:9876;192.168.0.2:9876
......
......@@ -50,4 +50,4 @@ public class PushConsumer {
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
......@@ -52,7 +52,7 @@ import static org.mockito.Mockito.when;
public class ClusterTestRequestProcessorTest {
private ClusterTestRequestProcessor clusterTestProcessor;
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
private ChannelHandlerContext ctx;
......
......@@ -73,7 +73,7 @@ class LocalMessageCache implements ServiceLifecycle {
pullOffsetTable.putIfAbsent(remoteQueue,
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
} catch (MQClientException e) {
log.error("A error occurred in fetch consume offset process.", e);
log.error("An error occurred in fetch consume offset process.", e);
}
}
return pullOffsetTable.get(remoteQueue);
......@@ -124,7 +124,7 @@ class LocalMessageCache implements ServiceLifecycle {
try {
rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
log.error("An error occurred in update consume offset process.", e);
}
}
}
......@@ -135,7 +135,7 @@ class LocalMessageCache implements ServiceLifecycle {
try {
rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
log.error("An error occurred in update consume offset process.", e);
}
}
......
......@@ -167,7 +167,7 @@ public class PullConsumerImpl implements PullConsumer {
}
localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("A error occurred in pull message process.", e);
log.error("An error occurred in pull message process.", e);
}
}
});
......
......@@ -106,6 +106,7 @@
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
<sonar.exclusions>file:**/generated-sources/**,**/test/**</sonar.exclusions>
<powermock.version>2.0.2</powermock.version>
</properties>
......@@ -458,6 +459,18 @@
<version>2.23.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
......
......@@ -24,6 +24,6 @@ public class RemotingConnectException extends RemotingException {
}
public RemotingConnectException(String addr, Throwable cause) {
super("connect to <" + addr + "> failed", cause);
super("connect to " + addr + " failed", cause);
}
}
......@@ -358,8 +358,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
......@@ -393,7 +391,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
}
......@@ -406,7 +404,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.createChannel(addr);
}
private Channel getAndCreateNameserverChannel() throws InterruptedException {
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
......@@ -440,9 +438,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.lockNamesrvChannel.unlock();
}
......@@ -456,8 +453,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
cw.getChannel().close();
channelTables.remove(addr);
return cw.getChannel();
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
......@@ -467,9 +463,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (cw != null) {
if (cw.isOK()) {
cw.getChannel().close();
this.channelTables.remove(addr);
createNewConnection = true;
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
......@@ -587,7 +581,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
@Override
public ExecutorService getCallbackExecutor() {
return callbackExecutor != null ? callbackExecutor : publicExecutor;
......
......@@ -373,7 +373,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
switch (tlsMode) {
case DISABLED:
ctx.close();
log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
......@@ -384,7 +384,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish a SSL connection but sslContext is null");
log.error("Trying to establish an SSL connection but sslContext is null");
}
break;
......
......@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
public class ConsumeQueue {
......@@ -397,6 +398,10 @@ public class ConsumeQueue {
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
......
......@@ -116,7 +116,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
......
......@@ -85,7 +85,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQAdminExtTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static Properties properties = new Properties();
private static TopicList topicList = new TopicList();
......
......@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class CommandUtilTest {
private DefaultMQAdminExt defaultMQAdminExt;
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
@Before
......
......@@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
......@@ -54,7 +53,7 @@ public class BrokerConsumeStatsSubCommadTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class BrokerStatusSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
public class CleanExpiredCQSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
public class CleanUnusedTopicCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class GetBrokerConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock;
public class SendMsgStatusCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -44,7 +44,7 @@ import static org.mockito.Mockito.mock;
public class UpdateBrokerConfigSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -54,7 +54,7 @@ import static org.mockito.Mockito.when;
public class ConsumerConnectionSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -49,7 +49,7 @@ import static org.mockito.Mockito.when;
public class ProducerConnectionSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
public class ConsumerProgressSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -65,7 +65,7 @@ import static org.mockito.Mockito.when;
public class ConsumerStatusSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -61,7 +61,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static MQAdminImpl mQAdminImpl;
......
......@@ -41,7 +41,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
......@@ -52,7 +51,7 @@ import static org.mockito.Mockito.when;
public class GetNamesrvConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
public class UpdateKvConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -49,7 +49,7 @@ import static org.mockito.Mockito.when;
public class WipeWritePermSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -47,7 +47,7 @@ import static org.mockito.Mockito.when;
public class GetConsumerStatusCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
public class ResetOffsetByTimeCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -69,7 +69,7 @@ import static org.mockito.Mockito.when;
public class MonitorServiceTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static MonitorConfig monitorConfig;
private static MonitorListener monitorListener;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册