提交 9d8f4c2c 编写于 作者: H hill007299 提交者: xxxxx

[RIP-19] Pop Consuming (client)

上级 d3d02077
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
public interface AckCallback {
void onSuccess(final AckResult ackResult);
void onException(final Throwable e);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
public class AckResult {
private AckStatus status;
private String extraInfo;
private long popTime;
public void setPopTime(long popTime) {
this.popTime = popTime;
}
public long getPopTime() {
return popTime;
}
public AckStatus getStatus() {
return status;
}
public void setStatus(AckStatus status) {
this.status = status;
}
public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}
public String getExtraInfo() {
return extraInfo;
}
@Override
public String toString() {
return "AckResult [AckStatus=" + status + ",extraInfo=" + extraInfo + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
public enum AckStatus {
/**
* ack success
*/
OK,
/**
* msg not exist
*/
NO_EXIST,
}
......@@ -179,6 +179,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private int pullThresholdForQueue = 1000;
/**
* Flow control threshold on queue level, means max num of messages waiting to ack.
* in contrast with pull threshold, once a message is popped, it's considered the beginning of consumption.
*/
private int popThresholdForQueue = 96;
/**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
......@@ -254,6 +260,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;
/**
* Maximum amount of invisible time in millisecond of a message, rang is [5000, 300000]
*/
private long popInvisibleTime = 60000;
/**
* Batch pop size. range is [1, 32]
*/
private int popBatchNums = 32;
/**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
......@@ -264,6 +280,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private TraceDispatcher traceDispatcher = null;
// force to use client rebalance
private boolean clientRebalance = false;
/**
* Default constructor.
*/
......@@ -598,6 +617,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.pullThresholdForQueue = pullThresholdForQueue;
}
public int getPopThresholdForQueue() {
return popThresholdForQueue;
}
public void setPopThresholdForQueue(int popThresholdForQueue) {
this.popThresholdForQueue = popThresholdForQueue;
}
public int getPullThresholdForTopic() {
return pullThresholdForTopic;
}
......@@ -891,6 +918,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeTimeout = consumeTimeout;
}
public long getPopInvisibleTime() {
return popInvisibleTime;
}
public void setPopInvisibleTime(long popInvisibleTime) {
this.popInvisibleTime = popInvisibleTime;
}
public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}
......@@ -902,4 +937,20 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
public int getPopBatchNums() {
return popBatchNums;
}
public void setPopBatchNums(int popBatchNums) {
this.popBatchNums = popBatchNums;
}
public boolean isClientRebalance() {
return clientRebalance;
}
public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
/**
* Async message pop interface
*/
public interface PopCallback {
void onSuccess(final PopResult popResult);
void onException(final Throwable e);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
public class PopResult {
private List<MessageExt> msgFoundList;
private PopStatus popStatus;
private long popTime;
private long invisibleTime;
private long restNum;
public PopResult(PopStatus popStatus, List<MessageExt> msgFoundList) {
this.popStatus = popStatus;
this.msgFoundList = msgFoundList;
}
public long getPopTime() {
return popTime;
}
public void setPopTime(long popTime) {
this.popTime = popTime;
}
public long getRestNum() {
return restNum;
}
public void setRestNum(long restNum) {
this.restNum = restNum;
}
public long getInvisibleTime() {
return invisibleTime;
}
public void setInvisibleTime(long invisibleTime) {
this.invisibleTime = invisibleTime;
}
public void setPopStatus(PopStatus popStatus) {
this.popStatus = popStatus;
}
public PopStatus getPopStatus() {
return popStatus;
}
public List<MessageExt> getMsgFoundList() {
return msgFoundList;
}
public void setMsgFoundList(List<MessageExt> msgFoundList) {
this.msgFoundList = msgFoundList;
}
@Override
public String toString() {
return "PopResult [popStatus=" + popStatus + ",msgFoundList="
+ (msgFoundList == null ? 0 : msgFoundList.size()) + ",restNum=" + restNum + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
public enum PopStatus {
/**
* Founded
*/
FOUND,
/**
* No new message can be pull after polling time out
* delete after next realease
*/
NO_NEW_MSG,
/**
* polling pool is full, do not try again immediately.
*/
POLLING_FULL,
/**
* polling time out but no message find
*/
POLLING_NOT_FOUND
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
public abstract class BaseInvokeCallback implements InvokeCallback {
private final MQClientAPIImpl mqClientAPI;
public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) {
this.mqClientAPI = mqClientAPI;
}
@Override
public void operationComplete(final ResponseFuture responseFuture) {
onComplete(responseFuture);
}
public abstract void onComplete(final ResponseFuture responseFuture);
}
......@@ -29,7 +29,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
......@@ -237,6 +236,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
}
@Override
public void submitPopConsumeRequest(final List<MessageExt> msgs,
final PopProcessQueue processQueue,
final MessageQueue messageQueue) {
throw new UnsupportedOperationException();
}
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
......@@ -386,6 +391,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
......
......@@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
......@@ -39,17 +38,17 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
......@@ -205,6 +204,13 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
}
@Override
public void submitPopConsumeRequest(final List<MessageExt> msgs,
final PopProcessQueue processQueue,
final MessageQueue messageQueue) {
throw new UnsupportedOperationException();
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import io.netty.util.internal.ConcurrentSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessagePopOrderlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ConcurrentSet<ConsumeRequest> consumeRequestSet = new ConcurrentSet<ConsumeRequest>();
private final ThreadPoolExecutor consumeExecutor;
private final String consumerGroup;
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
private final MessageQueueLock consumeRequestLock = new MessageQueueLock();
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
public ConsumeMessagePopOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
@Override
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessagePopOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessagePopOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
@Override
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMessageQueues();
}
}
public synchronized void unlockAllMessageQueues() {
this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
}
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}
@Override
public void incCorePoolSize() {
}
@Override
public void decCorePoolSize() {
}
@Override
public int getCorePoolSize() {
return this.consumeExecutor.getCorePoolSize();
}
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
result.setOrder(true);
List<MessageExt> msgs = new ArrayList<MessageExt>();
msgs.add(msg);
MessageQueue mq = new MessageQueue();
mq.setBrokerName(brokerName);
mq.setTopic(msg.getTopic());
mq.setQueueId(msg.getQueueId());
ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
final long beginTime = System.currentTimeMillis();
log.info("consumeMessageDirectly receive new message: {}", msg);
try {
ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
if (status != null) {
switch (status) {
case COMMIT:
result.setConsumeResult(CMResult.CR_COMMIT);
break;
case ROLLBACK:
result.setConsumeResult(CMResult.CR_ROLLBACK);
break;
case SUCCESS:
result.setConsumeResult(CMResult.CR_SUCCESS);
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
result.setConsumeResult(CMResult.CR_LATER);
break;
default:
break;
}
} else {
result.setConsumeResult(CMResult.CR_RETURN_NULL);
}
} catch (Throwable e) {
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessagePopOrderlyService.this.consumerGroup,
msgs,
mq), e);
}
result.setAutoCommit(context.isAutoCommit());
result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
log.info("consumeMessageDirectly Result: {}", result);
return result;
}
@Override
public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue,
MessageQueue messageQueue, boolean dispathToConsume) {
throw new UnsupportedOperationException();
}
@Override
public void submitPopConsumeRequest(final List<MessageExt> msgs,
final PopProcessQueue processQueue,
final MessageQueue messageQueue) {
ConsumeRequest req = new ConsumeRequest(processQueue, messageQueue);
submitConsumeRequest(req, false);
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
private void removeConsumeRequest(final ConsumeRequest consumeRequest) {
consumeRequestSet.remove(consumeRequest);
}
private void submitConsumeRequest(final ConsumeRequest consumeRequest, boolean force) {
Object lock = consumeRequestLock.fetchLockObject(consumeRequest.getMessageQueue(), consumeRequest.shardingKeyIndex);
synchronized (lock) {
boolean isNewReq = consumeRequestSet.add(consumeRequest);
if (force || isNewReq) {
try {
consumeExecutor.submit(consumeRequest);
} catch (Exception e) {
log.error("error submit consume request: {}, mq: {}, shardingKeyIndex: {}",
e.toString(), consumeRequest.getMessageQueue(), consumeRequest.getShardingKeyIndex());
}
}
}
}
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest, final long suspendTimeMillis) {
long timeMillis = suspendTimeMillis;
if (timeMillis == -1) {
timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
}
if (timeMillis < 10) {
timeMillis = 10;
} else if (timeMillis > 30000) {
timeMillis = 30000;
}
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
submitConsumeRequest(consumeRequest, true);
}
}, timeMillis, TimeUnit.MILLISECONDS);
}
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
return true;
}
public ConsumerStatsManager getConsumerStatsManager() {
return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
}
private int getMaxReconsumeTimes() {
// default reconsume times: Integer.MAX_VALUE
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
return Integer.MAX_VALUE;
} else {
return this.defaultMQPushConsumer.getMaxReconsumeTimes();
}
}
private boolean checkReconsumeTimes(List<MessageExt> msgs) {
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}
public boolean sendMessageBack(final MessageExt msg) {
try {
// max reconsume times exceeded then send to dead letter queue.
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
public void resetNamespace(final List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
class ConsumeRequest implements Runnable {
private final PopProcessQueue processQueue;
private final MessageQueue messageQueue;
private int shardingKeyIndex = 0;
public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
this.shardingKeyIndex = 0;
}
public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue, int shardingKeyIndex) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
this.shardingKeyIndex = shardingKeyIndex;
}
public PopProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public int getShardingKeyIndex() {
return shardingKeyIndex;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, message queue not be able to consume, because it's dropped. {}", this.messageQueue);
ConsumeMessagePopOrderlyService.this.removeConsumeRequest(this);
return;
}
// lock on sharding key index
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue, shardingKeyIndex);
}
@Override
public int hashCode() {
int hash = shardingKeyIndex;
if (processQueue != null) {
hash += processQueue.hashCode() * 31;
}
if (messageQueue != null) {
hash += messageQueue.hashCode() * 31;
}
return hash;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ConsumeRequest other = (ConsumeRequest) obj;
if (shardingKeyIndex != other.shardingKeyIndex) {
return false;
}
if (processQueue != other.processQueue) {
return false;
}
if (messageQueue == other.messageQueue) {
return true;
}
if (messageQueue != null && messageQueue.equals(other.messageQueue)) {
return true;
}
return false;
}
}
}
......@@ -41,4 +41,9 @@ public interface ConsumeMessageService {
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
void submitPopConsumeRequest(
final List<MessageExt> msgs,
final PopProcessQueue processQueue,
final MessageQueue messageQueue);
}
......@@ -142,6 +142,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
// only for test purpose, will be modified by reflection in unit test.
@SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
......@@ -417,6 +420,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
return;
}
Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
......
......@@ -24,19 +24,32 @@ import org.apache.rocketmq.common.message.MessageQueue;
* Message lock,strictly ensure the single queue only one thread at a time consuming
*/
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();
private ConcurrentMap<MessageQueue, ConcurrentMap<Integer, Object>> mqLockTable =
new ConcurrentHashMap<MessageQueue, ConcurrentMap<Integer, Object>>(32);
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
return fetchLockObject(mq, -1);
}
public Object fetchLockObject(final MessageQueue mq, final int shardingKeyIndex) {
ConcurrentMap<Integer, Object> objMap = this.mqLockTable.get(mq);
if (null == objMap) {
objMap = new ConcurrentHashMap<Integer, Object>(32);
ConcurrentMap<Integer, Object> prevObjMap = this.mqLockTable.putIfAbsent(mq, objMap);
if (prevObjMap != null) {
objMap = prevObjMap;
}
}
Object lock = objMap.get(shardingKeyIndex);
if (null == lock) {
lock = new Object();
Object prevLock = objMap.putIfAbsent(shardingKeyIndex, lock);
if (prevLock != null) {
objLock = prevLock;
lock = prevLock;
}
}
return objLock;
return lock;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.message.MessageRequestMode;
public interface MessageRequest {
MessageRequestMode getMessageRequestMode();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
/**
* Queue consumption snapshot
*/
public class PopProcessQueue {
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
private long lastPopTimestamp;
private AtomicInteger waitAckCounter = new AtomicInteger(0);
private volatile boolean dropped = false;
public long getLastPopTimestamp() {
return lastPopTimestamp;
}
public void setLastPopTimestamp(long lastPopTimestamp) {
this.lastPopTimestamp = lastPopTimestamp;
}
public void incFoundMsg(int count) {
this.waitAckCounter.getAndAdd(count);
}
/**
* @return the value before decrement.
*/
public int ack() {
return this.waitAckCounter.getAndDecrement();
}
public void decFoundMsg(int count) {
this.waitAckCounter.addAndGet(count);
}
public int getWaiAckMsgCount() {
return this.waitAckCounter.get();
}
public boolean isDropped() {
return dropped;
}
public void setDropped(boolean dropped) {
this.dropped = dropped;
}
public void fillPopProcessQueueInfo(final PopProcessQueueInfo info) {
info.setWaitAckCount(getWaiAckMsgCount());
info.setDroped(isDropped());
info.setLastPopTimestamp(getLastPopTimestamp());
}
public boolean isPullExpired() {
return (System.currentTimeMillis() - this.lastPopTimestamp) > PULL_MAX_IDLE_TIME;
}
@Override
public String toString() {
return "PopProcessQueue[waitAckCounter:" + this.waitAckCounter.get()
+ ", lastPopTimestamp:" + getLastPopTimestamp()
+ ", drop:" + dropped + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
public class PopRequest implements MessageRequest {
private String topic;
private String consumerGroup;
private MessageQueue messageQueue;
private PopProcessQueue popProcessQueue;
private boolean lockedFirst = false;
private int initMode = ConsumeInitMode.MAX;
public boolean isLockedFirst() {
return lockedFirst;
}
public void setLockedFirst(boolean lockedFirst) {
this.lockedFirst = lockedFirst;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public PopProcessQueue getPopProcessQueue() {
return popProcessQueue;
}
public void setPopProcessQueue(PopProcessQueue popProcessQueue) {
this.popProcessQueue = popProcessQueue;
}
public int getInitMode() {
return initMode;
}
public void setInitMode(int initMode) {
this.initMode = initMode;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PopRequest other = (PopRequest) obj;
if (topic == null) {
if (other.topic != null)
return false;
} else if (!topic.equals(other.topic)) {
return false;
}
if (consumerGroup == null) {
if (other.consumerGroup != null)
return false;
} else if (!consumerGroup.equals(other.consumerGroup))
return false;
if (messageQueue == null) {
if (other.messageQueue != null)
return false;
} else if (!messageQueue.equals(other.messageQueue)) {
return false;
}
return true;
}
@Override
public String toString() {
return "PopRequest [topic=" + topic + ", consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue + "]";
}
@Override
public MessageRequestMode getMessageRequestMode() {
return MessageRequestMode.POP;
}
}
......@@ -23,6 +23,7 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
......@@ -37,16 +38,17 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class PullAPIWrapper {
......@@ -269,4 +271,55 @@ public class PullAPIWrapper {
public void setDefaultBrokerId(long defaultBrokerId) {
this.defaultBrokerId = defaultBrokerId;
}
/**
*
* @param mq
* @param invisibleTime
* @param maxNums
* @param consumerGroup
* @param timeout
* @param popCallback
* @param poll
* @param initMode
// * @param expressionType
// * @param expression
* @param order
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
*/
public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup,
long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression)
throws MQClientException, RemotingException, InterruptedException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
}
if (findBrokerResult != null) {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setInvisibleTime(invisibleTime);
requestHeader.setInitMode(initMode);
requestHeader.setExpType(expressionType);
requestHeader.setExp(expression);
requestHeader.setOrder(order);
//give 1000 ms for server response
if (poll) {
requestHeader.setPollTime(timeout);
requestHeader.setBornTime(System.currentTimeMillis());
// timeout + 10s, fix the too earlier timeout of client when long polling.
timeout += 10 * 1000;
}
String brokerAddr = findBrokerResult.getBrokerAddr();
this.mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), brokerAddr, requestHeader, timeout, popCallback);
return;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
......@@ -24,12 +24,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final LinkedBlockingQueue<MessageRequest> messageRequestQueue = new LinkedBlockingQueue<MessageRequest>();
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
......@@ -58,7 +60,28 @@ public class PullMessageService extends ServiceThread {
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
this.messageRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
public void executePopPullRequestLater(final PopRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePopPullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePopPullRequestImmediately(final PopRequest pullRequest) {
try {
this.messageRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
......@@ -86,14 +109,28 @@ public class PullMessageService extends ServiceThread {
}
}
private void popMessage(final PopRequest popRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(popRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.popMessage(popRequest);
} else {
log.warn("No matched consumer for the PopRequest {}, drop it", popRequest);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
MessageRequest messageRequest = this.messageRequestQueue.take();
if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
this.popMessage((PopRequest)messageRequest);
} else {
this.pullMessage((PullRequest)messageRequest);
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
......
......@@ -17,8 +17,9 @@
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
public class PullRequest {
public class PullRequest implements MessageRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
......@@ -101,4 +102,9 @@ public class PullRequest {
public void setProcessQueue(ProcessQueue processQueue) {
this.processQueue = processQueue;
}
@Override
public MessageRequestMode getMessageRequestMode() {
return MessageRequestMode.PULL;
}
}
......@@ -139,7 +139,30 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
public int getConsumeInitMode() {
throw new UnsupportedOperationException("no initMode for Pull");
}
@Override
public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
}
@Override
public void dispatchPopPullRequest(List<PopRequest> pullRequestList, long delay) {
}
@Override
public ProcessQueue createProcessQueue() {
return new ProcessQueue();
}
@Override
public PopProcessQueue createPopProcessQueue() {
return null;
}
public ProcessQueue createProcessQueue(String topicName) {
return createProcessQueue();
}
}
......@@ -74,6 +74,30 @@ public class RebalancePullImpl extends RebalanceImpl {
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
public int getConsumeInitMode() {
throw new UnsupportedOperationException("no initMode for Pull");
}
@Override
public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
}
@Override
public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
}
@Override
public ProcessQueue createProcessQueue() {
return new ProcessQueue();
}
@Override
public PopProcessQueue createPopProcessQueue() {
return null;
}
public ProcessQueue createProcessQueue(String topicName) {
return createProcessQueue();
}
}
......@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
......@@ -36,6 +37,7 @@ public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
......@@ -110,6 +112,16 @@ public class RebalancePushImpl extends RebalanceImpl {
return true;
}
@Override
public boolean clientRebalance(String topic) {
// POPTODO order pop consume not implement yet
return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly();
}
public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) {
return true;
}
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
if (pq.hasTempMessage()) {
......@@ -212,10 +224,48 @@ public class RebalancePushImpl extends RebalanceImpl {
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
public int getConsumeInitMode() {
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
if (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET == consumeFromWhere) {
return ConsumeInitMode.MIN;
} else {
return ConsumeInitMode.MAX;
}
}
@Override
public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
if (delay <= 0) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
} else {
this.defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, delay);
}
}
}
@Override
public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
for (PopRequest pullRequest : pullRequestList) {
if (delay <= 0) {
this.defaultMQPushConsumerImpl.executePopPullRequestImmediately(pullRequest);
} else {
this.defaultMQPushConsumerImpl.executePopPullRequestLater(pullRequest, delay);
}
}
}
@Override
public ProcessQueue createProcessQueue() {
return new ProcessQueue();
}
@Override public ProcessQueue createProcessQueue(String topicName) {
return createProcessQueue();
}
@Override
public PopProcessQueue createPopProcessQueue() {
return new PopProcessQueue();
}
}
......@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
......@@ -64,26 +63,29 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
......@@ -964,6 +966,19 @@ public class MQClientInstance {
this.adminExtTable.remove(group);
}
public void rebalanceLater(long delayMillis) {
if (delayMillis <= 0) {
this.rebalanceService.wakeup();
} else {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
MQClientInstance.this.rebalanceService.wakeup();
}
}, delayMillis, TimeUnit.MILLISECONDS);
}
}
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
......@@ -1091,6 +1106,22 @@ public class MQClientInstance {
return null;
}
public Set<MessageQueueAssignment> queryAssignment(final String topic, final String consumerGroup, final String strategyName, final MessageModel messageModel, int timeout)
throws RemotingException, InterruptedException, MQBrokerException {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
return this.mQClientAPIImpl.queryAssignment(brokerAddr, topic, consumerGroup, clientId, strategyName,
messageModel, timeout);
}
return null;
}
public String findBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
......
......@@ -20,8 +20,10 @@ package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
......@@ -52,11 +54,8 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -70,9 +69,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultLitePullConsumerImpl.class)
@PowerMockIgnore("javax.management.*")
@RunWith(MockitoJUnitRunner.class)
public class DefaultLitePullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
......@@ -94,13 +91,16 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged");
field.setAccessible(true);
field.set(null, true);
}
@Test
......@@ -183,6 +183,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.pause(Collections.singletonList(messageQueue));
long offset = litePullConsumer.committed(messageQueue);
litePullConsumer.seek(messageQueue, offset);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
......@@ -199,6 +200,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.pause(Collections.singletonList(messageQueue));
litePullConsumer.seekToBegin(messageQueue);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
......@@ -214,6 +216,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.pause(Collections.singletonList(messageQueue));
litePullConsumer.seekToEnd(messageQueue);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
......@@ -229,6 +232,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.pause(Collections.singletonList(messageQueue));
try {
litePullConsumer.seek(messageQueue, -1);
failBecauseExceptionWasNotThrown(MQClientException.class);
......@@ -517,9 +521,6 @@ public class DefaultLitePullConsumerTest {
public void testConsumerAfterShutdown() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer();
DefaultLitePullConsumer mockConsumer = spy(defaultLitePullConsumer);
when(mockConsumer.poll(anyLong())).thenReturn(new ArrayList<>());
new AsyncConsumer().executeAsync(defaultLitePullConsumer);
Thread.sleep(100);
......
......@@ -24,8 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
......@@ -60,11 +60,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -77,20 +74,20 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
@PowerMockIgnore("javax.management.*")
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
private final byte[] msgBody = Long.toString(System.currentTimeMillis()).getBytes();
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
private AtomicLong queueOffset = new AtomicLong(1024);;
@Before
public void init() throws Exception {
......@@ -108,12 +105,15 @@ public class DefaultMQPushConsumerTest {
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
field = DefaultMQPushConsumerImpl.class.getDeclaredField("doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged");
field.setAccessible(true);
field.set(null, true);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
......@@ -143,8 +143,9 @@ public class DefaultMQPushConsumerTest {
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setQueueOffset(queueOffset.getAndIncrement());
messageClientExt.setMsgId("1024");
messageClientExt.setBody(msgBody);
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
......@@ -190,10 +191,10 @@ public class DefaultMQPushConsumerTest {
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
assertThat(messageExts[0].getBody()).isEqualTo(msgBody);
}
@Test
@Test(timeout = 20000)
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
......@@ -213,9 +214,9 @@ public class DefaultMQPushConsumerTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestLater(createPullRequest(), 100);
countDownLatch.await(10, TimeUnit.SECONDS);
countDownLatch.await();
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
assertThat(messageExts[0].getBody()).isEqualTo(msgBody);
}
@Test
......@@ -259,7 +260,7 @@ public class DefaultMQPushConsumerTest {
}
}
@Test
@Test(timeout = 20000)
public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
pushConsumer.setAwaitTerminationMillisWhenShutdown(2000);
......@@ -268,6 +269,7 @@ public class DefaultMQPushConsumerTest {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
assertThat(msgs.get(0).getBody()).isEqualTo(msgBody);
countDownLatch.countDown();
try {
Thread.sleep(1000);
......@@ -302,7 +304,7 @@ public class DefaultMQPushConsumerTest {
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
pullRequest.setNextOffset(queueOffset.get());
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
......
......@@ -18,21 +18,66 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageContext;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerImplTest {
@Mock
private DefaultMQPushConsumer defaultMQPushConsumer;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void setUp() throws Exception {
when(defaultMQPushConsumer.getConsumerGroup()).thenReturn("test_group");
when(defaultMQPushConsumer.getMessageModel()).thenReturn(MessageModel.CLUSTERING);
when(defaultMQPushConsumer.getConsumeFromWhere()).thenReturn(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
when(defaultMQPushConsumer.getConsumeTimestamp()).thenReturn(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)));
when(defaultMQPushConsumer.getAllocateMessageQueueStrategy()).thenReturn(new AllocateMessageQueueAveragely());
when(defaultMQPushConsumer.getConsumeThreadMin()).thenReturn(20);
when(defaultMQPushConsumer.getConsumeThreadMax()).thenReturn(30);
when(defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()).thenReturn(2000);
when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(1000);
when(defaultMQPushConsumer.getPullThresholdForTopic()).thenReturn(-1);
when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(100);
when(defaultMQPushConsumer.getPullThresholdSizeForTopic()).thenReturn(-1);
when(defaultMQPushConsumer.getConsumeMessageBatchMaxSize()).thenReturn(1);
when(defaultMQPushConsumer.getPullBatchSize()).thenReturn(32);
when(defaultMQPushConsumer.getPopInvisibleTime()).thenReturn(60000L);
when(defaultMQPushConsumer.getPopBatchNums()).thenReturn(32);
when(defaultMQPushConsumer.getClientIP()).thenReturn("127.0.0.1");
when(defaultMQPushConsumer.getInstanceName()).thenReturn("test_instance");
when(defaultMQPushConsumer.buildMQClientId()).thenCallRealMethod();
ClientConfig clientConfig = new ClientConfig();
when(defaultMQPushConsumer.cloneClientConfig()).thenReturn(clientConfig);
when(defaultMQPushConsumer.getConsumeTimeout()).thenReturn(15L);
}
@Test
public void checkConfigTest() throws MQClientException {
......@@ -58,4 +103,51 @@ public class DefaultMQPushConsumerImplTest {
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null);
defaultMQPushConsumerImpl.start();
}
@Test
public void testHook() throws Exception {
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageHook() {
@Override public String hookName() {
return "consumerHook";
}
@Override public void consumeMessageBefore(ConsumeMessageContext context) {
assertThat(context).isNotNull();
}
@Override public void consumeMessageAfter(ConsumeMessageContext context) {
assertThat(context).isNotNull();
}
});
defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
@Override public String hookName() {
return "filterHook";
}
@Override public void filterMessage(FilterMessageContext context) {
assertThat(context).isNotNull();
}
});
defaultMQPushConsumerImpl.executeHookBefore(new ConsumeMessageContext());
defaultMQPushConsumerImpl.executeHookAfter(new ConsumeMessageContext());
}
@Test
public void testPush() throws Exception {
when(defaultMQPushConsumer.getMessageListener()).thenReturn(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
assertThat(msgs).size().isGreaterThan(0);
assertThat(context).isNotNull();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
try {
defaultMQPushConsumerImpl.start();
} finally {
defaultMQPushConsumerImpl.shutdown();
}
}
}
......@@ -22,23 +22,28 @@ import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
......@@ -51,6 +56,7 @@ public class RebalancePushImplTest {
private OffsetStore offsetStore;
private String consumerGroup = "CID_RebalancePushImplTest";
private String topic = "TopicA";
private final String brokerName = "BrokerA";
@Test
public void testMessageQueueChanged_CountThreshold() {
......@@ -88,16 +94,15 @@ public class RebalancePushImplTest {
rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
try {
when(mqClientInstance.queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt())).thenThrow(new RemotingTimeoutException("unsupported"));
} catch (RemotingException ignored) {
} catch (InterruptedException ignored) {
} catch (MQBrokerException ignored) {
}
when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup));
when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
doAnswer(new Answer() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
}
@Test
......@@ -134,8 +139,8 @@ public class RebalancePushImplTest {
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
allocateResultSet.add(new MessageQueue(topic, brokerName, 0));
allocateResultSet.add(new MessageQueue(topic, brokerName, 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
......@@ -160,4 +165,22 @@ public class RebalancePushImplTest {
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
}
@Test
public void testDoRebalancePull() throws Exception {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());
rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
doNothing().when(defaultMQPushConsumer).executePullRequestLater(any(PullRequest.class), anyLong());
MessageQueueAssignment queueAssignment = new MessageQueueAssignment();
queueAssignment.setMode(MessageRequestMode.PULL);
queueAssignment.setMessageQueue(new MessageQueue(topic, brokerName, 0));
when(mqClientInstance.queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt())).thenReturn(Collections.singleton(queueAssignment));
assertThat(rebalancePush.doRebalance(false)).isTrue();
}
}
\ No newline at end of file
......@@ -180,4 +180,5 @@ public class MQClientInstanceTest {
flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
assertThat(flag).isTrue();
}
}
\ No newline at end of file
......@@ -48,7 +48,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
......@@ -56,9 +55,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -260,7 +257,7 @@ public class DefaultMQProducerTest {
}
};
List<Message> msgs = new ArrayList<>();
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
......
......@@ -53,13 +53,11 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
......@@ -73,7 +71,6 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
......@@ -89,7 +86,6 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
@PowerMockIgnore("javax.management.*")
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
powermock.global-ignore=javax.management.*
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册