提交 fbe64ffb 编写于 作者: S ShannonDing

Polish the push consumer to support 4.x

上级 d7a2603c
......@@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
public class DefaultMQPushConsumer extends ClientConfig implements MQRealPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
......@@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Whether update subscription relationship when every pull
*/
private boolean postSubscriptionWhenPull = true;
private boolean postSubscriptionWhenPull = false;
/**
* Whether the unit of subscription group
......@@ -275,24 +275,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
......
......@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
/**
* Push consumer
......@@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer {
* Subscribe some topic
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe
* all
* null or * expression,meaning subscribe all
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
* This method will be removed in the version 5.0.0,because filterServer was removed,and method
* <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*
* Subscribe some topic
*
......@@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Subscribe some topic with selector.
* <p>
* This interface also has the ability of {@link #subscribe(String, String)},
* and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
* This interface also has the ability of {@link #subscribe(String, String)}, and, support other message selection,
* such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
* </p>
* <p/>
* <p>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Real Push consumer
*/
public interface MQRealPushConsumer extends MQPushConsumer {
MQPushConsumerInner getDefaultMQPushConsumerImpl();
long getConsumeTimeout();
int getPullThresholdForTopic();
int getPullThresholdForQueue();
void setPullThresholdForQueue(int pullThresholdForQueue);
int getPullThresholdSizeForTopic();
void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic);
int getPullThresholdSizeForQueue();
void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue);
ConsumeFromWhere getConsumeFromWhere();
String getConsumeTimestamp();
String getConsumerGroup();
int getConsumeThreadMin();
void setConsumeThreadMin(int consumeThreadMin);
int getConsumeThreadMax();
long getSuspendCurrentQueueTimeMillis();
int getMaxReconsumeTimes();
void setMaxReconsumeTimes(final int maxReconsumeTimes);
int getConsumeMessageBatchMaxSize();
MessageModel getMessageModel();
}
\ No newline at end of file
......@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
......@@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MQPushConsumerInner defaultMQPushConsumerImpl;
private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
......@@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService cleanExpireMsgExecutors;
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
public ConsumeMessageConcurrentlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
......
......@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
......@@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MQPushConsumerInner defaultMQPushConsumerImpl;
private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
......@@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
public ConsumeMessageOrderlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
......
......@@ -26,10 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public class DefaultMQPushConsumerImpl implements MQPushConsumerInner {
/**
* Delay some time when exception occur
*/
......@@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
//private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
......@@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
if (realPushModel) {
log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this);
} else {
rebalanceImpl = new RebalancePushImpl(this);
}
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
......@@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
......@@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
......@@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
......@@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg,
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(null, brokerAddr,msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
......@@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default:
break;
}
this.tryToFindSnodePublishInfo();
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
......@@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
}
}
}
......@@ -1020,11 +971,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public ConsumeType consumeType() {
if (realPushModel) {
return ConsumeType.CONSUME_PUSH;
} else {
return ConsumeType.CONSUME_PASSIVELY;
}
return ConsumeType.CONSUME_PASSIVELY;
}
@Override
......@@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final String brokerName,
final int queueID,
final long offset) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
//update local offset
localOffset.set(offset);
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null) {
processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
}
return true;
}
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue != null) {
log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
}
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop != null) {
if (pullStop.get()) {
pullStop.set(false);
log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
}
}
return true;
}
public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
return true;
}
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Push Consumer inner interface
*/
public interface MQPushConsumerInner extends MQConsumerInner {
MQRealPushConsumer getDefaultMQPushConsumer();
OffsetStore getOffsetStore();
boolean isConsumeOrderly();
MQClientInstance getmQClientFactory();
boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID);
void executePullRequestImmediately(final PullRequest pullRequest);
RebalanceImpl getRebalanceImpl();
ConsumerStatsManager getConsumerStatsManager();
void sendMessageBack(MessageExt msg, int delayLevel,
final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
boolean hasHook();
void registerConsumeMessageHook(final ConsumeMessageHook hook);
void executeHookBefore(final ConsumeMessageContext context);
void executeHookAfter(final ConsumeMessageContext context);
void pullMessage(final PullRequest pullRequest);
}
......@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
......@@ -73,7 +75,7 @@ public class ProcessQueue {
/**
* @param pushConsumer
*/
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
public void cleanExpiredMsg(MQRealPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
......
......@@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
MQPushConsumerInner impl;
if (consumer instanceof DefaultMQRealPushConsumerImpl) {
impl = (DefaultMQRealPushConsumerImpl) consumer;
} else {
impl = (DefaultMQPushConsumerImpl) consumer;
}
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
......
......@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final MQPushConsumerInner defaultMQPushConsumerImpl;
public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
public RebalancePushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
MQClientInstance mQClientFactory, MQPushConsumerInner defaultMQPushConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
}
......
......@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
public class RebalanceRealPushImpl extends RebalancePushImpl {
public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
public RebalanceRealPushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
super(defaultMQPushConsumerImpl);
}
......
......@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
......@@ -1387,7 +1388,7 @@ public class MQClientInstance {
consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
DefaultMQRealPushConsumerImpl consumer = (DefaultMQRealPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
return true;
}
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
......@@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private MQPushConsumerInner hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
......@@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostProducer = hostProducer;
}
public DefaultMQPushConsumerImpl getHostConsumer() {
public MQPushConsumerInner getHostConsumer() {
return hostConsumer;
}
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
public void setHostConsumer(MQPushConsumerInner hostConsumer) {
this.hostConsumer = hostConsumer;
}
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
......@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
/**
* This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
* This example shows how to subscribe and consume messages using providing {@link DefaultMQRealPushConsumer}.
*/
public class Consumer {
......@@ -35,7 +35,7 @@ public class Consumer {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("hello");
/*
* Specify name server addresses.
......@@ -61,7 +61,6 @@ public class Consumer {
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
// consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
......
......@@ -47,7 +47,6 @@ public class Producer {
/*
* Launch the instance.
*/
// producer.setNamesrvAddr("47.102.149.193:9876");
producer.start();
for (int i = 0; i < 10; i++) {
......@@ -76,7 +75,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
Thread.sleep(30000L);
Thread.sleep(3000L);
producer.shutdown();
}
}
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
......@@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册