提交 41d5adc0 编写于 作者: D duhenglucky

Merge branch 'snode' of github.com:apache/rocketmq into snode

...@@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; ...@@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe. * <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p> * </p>
*/ */
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { public class DefaultMQPushConsumer extends ClientConfig implements MQRealPushConsumer {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
...@@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Whether update subscription relationship when every pull * Whether update subscription relationship when every pull
*/ */
private boolean postSubscriptionWhenPull = true; private boolean postSubscriptionWhenPull = false;
/** /**
* Whether the unit of subscription group * Whether the unit of subscription group
...@@ -275,24 +275,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -275,24 +275,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
} }
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
}
/** /**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name. * customized trace topic name.
......
...@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener; ...@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
/** /**
* Push consumer * Push consumer
...@@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer { ...@@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer {
* Subscribe some topic * Subscribe some topic
* *
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe * null or * expression,meaning subscribe all
* all
*/ */
void subscribe(final String topic, final String subExpression) throws MQClientException; void subscribe(final String topic, final String subExpression) throws MQClientException;
/** /**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code> * This method will be removed in the version 5.0.0,because filterServer was removed,and method
* is recommended. * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
* *
* Subscribe some topic * Subscribe some topic
* *
...@@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer { ...@@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer {
/** /**
* Subscribe some topic with selector. * Subscribe some topic with selector.
* <p> * <p>
* This interface also has the ability of {@link #subscribe(String, String)}, * This interface also has the ability of {@link #subscribe(String, String)}, and, support other message selection,
* and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}. * such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
* </p> * </p>
* <p/> * <p/>
* <p> * <p>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Real Push consumer
*/
public interface MQRealPushConsumer extends MQPushConsumer {
MQPushConsumerInner getDefaultMQPushConsumerImpl();
long getConsumeTimeout();
int getPullThresholdForTopic();
int getPullThresholdForQueue();
void setPullThresholdForQueue(int pullThresholdForQueue);
int getPullThresholdSizeForTopic();
void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic);
int getPullThresholdSizeForQueue();
void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue);
ConsumeFromWhere getConsumeFromWhere();
String getConsumeTimestamp();
String getConsumerGroup();
int getConsumeThreadMin();
void setConsumeThreadMin(int consumeThreadMin);
int getConsumeThreadMax();
long getSuspendCurrentQueueTimeMillis();
int getMaxReconsumeTimes();
void setMaxReconsumeTimes(final int maxReconsumeTimes);
int getConsumeMessageBatchMaxSize();
MessageModel getMessageModel();
}
\ No newline at end of file
...@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; ...@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
...@@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; ...@@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog(); private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final MQPushConsumerInner defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer; private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener; private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue; private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor; private final ThreadPoolExecutor consumeExecutor;
...@@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService ...@@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService cleanExpireMsgExecutors; private final ScheduledExecutorService cleanExpireMsgExecutors;
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, public ConsumeMessageConcurrentlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) { MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener; this.messageListener = messageListener;
......
...@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; ...@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
...@@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ...@@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog(); private static final InternalLogger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY = private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000")); Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final MQPushConsumerInner defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer; private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener; private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue; private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor; private final ThreadPoolExecutor consumeExecutor;
...@@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ...@@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false; private volatile boolean stopped = false;
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, public ConsumeMessageOrderlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) { MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener; this.messageListener = messageListener;
......
...@@ -26,10 +26,7 @@ import java.util.Map; ...@@ -26,10 +26,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
...@@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; ...@@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public class DefaultMQPushConsumerImpl implements MQPushConsumerInner {
/** /**
* Delay some time when exception occur * Delay some time when exception occur
*/ */
...@@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer; private final DefaultMQPushConsumer defaultMQPushConsumer;
//private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis(); private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
...@@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService; private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0; private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer; this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
if (realPushModel) {
log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this);
} else {
rebalanceImpl = new RebalancePushImpl(this);
}
} }
public void registerFilterMessageHook(final FilterMessageHook hook) { public void registerFilterMessageHook(final FilterMessageHook hook) {
...@@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override @Override
public void onSuccess(PullResult pullResult) { public void onSuccess(PullResult pullResult) {
if (pullResult != null) { if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData); subscriptionData);
...@@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
} }
...@@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
public void executePullRequestImmediately(final PullRequest pullRequest) { public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullRequest.getMessageQueue().getBrokerName(),
pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
} }
...@@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try { try {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg, : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(null, brokerAddr,msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) { } catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
...@@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default: default:
break; break;
} }
this.tryToFindSnodePublishInfo();
this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately(); this.mQClientFactory.rebalanceImmediately();
} }
...@@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey(); final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
} }
} }
} }
...@@ -1020,11 +971,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1020,11 +971,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override @Override
public ConsumeType consumeType() { public ConsumeType consumeType() {
if (realPushModel) { return ConsumeType.CONSUME_PASSIVELY;
return ConsumeType.CONSUME_PUSH;
} else {
return ConsumeType.CONSUME_PASSIVELY;
}
} }
@Override @Override
...@@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final String brokerName,
final int queueID,
final long offset) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
//update local offset
localOffset.set(offset);
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null) {
processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
}
return true;
}
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue != null) {
log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
}
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop != null) {
if (pullStop.get()) {
pullStop.set(false);
log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
}
}
return true;
}
public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
return true;
}
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
return true; return true;
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* Push Consumer inner interface
*/
public interface MQPushConsumerInner extends MQConsumerInner {
MQRealPushConsumer getDefaultMQPushConsumer();
OffsetStore getOffsetStore();
boolean isConsumeOrderly();
MQClientInstance getmQClientFactory();
boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID);
void executePullRequestImmediately(final PullRequest pullRequest);
RebalanceImpl getRebalanceImpl();
ConsumerStatsManager getConsumerStatsManager();
void sendMessageBack(MessageExt msg, int delayLevel,
final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
boolean hasHook();
void registerConsumeMessageHook(final ConsumeMessageHook hook);
void executeHookBefore(final ConsumeMessageContext context);
void executeHookAfter(final ConsumeMessageContext context);
void pullMessage(final PullRequest pullRequest);
}
...@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock; ...@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
...@@ -73,7 +75,7 @@ public class ProcessQueue { ...@@ -73,7 +75,7 @@ public class ProcessQueue {
/** /**
* @param pushConsumer * @param pushConsumer
*/ */
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { public void cleanExpiredMsg(MQRealPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return; return;
} }
......
...@@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread { ...@@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) { private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) { if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; MQPushConsumerInner impl;
if (consumer instanceof DefaultMQRealPushConsumerImpl) {
impl = (DefaultMQRealPushConsumerImpl) consumer;
} else {
impl = (DefaultMQPushConsumerImpl) consumer;
}
impl.pullMessage(pullRequest); impl.pullMessage(pullRequest);
} else { } else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
......
...@@ -20,6 +20,7 @@ import java.util.List; ...@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; ...@@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl { public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final MQPushConsumerInner defaultMQPushConsumerImpl;
public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { public RebalancePushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl); this(null, null, null, null, defaultMQPushConsumerImpl);
} }
public RebalancePushImpl(String consumerGroup, MessageModel messageModel, public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { MQClientInstance mQClientFactory, MQPushConsumerInner defaultMQPushConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
} }
......
...@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; ...@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
public class RebalanceRealPushImpl extends RebalancePushImpl { public class RebalanceRealPushImpl extends RebalancePushImpl {
public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { public RebalanceRealPushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
super(defaultMQPushConsumerImpl); super(defaultMQPushConsumerImpl);
} }
......
...@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl; ...@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullMessageService;
...@@ -1387,7 +1388,7 @@ public class MQClientInstance { ...@@ -1387,7 +1388,7 @@ public class MQClientInstance {
consumerGroup, topic, brokerName, queueID, offset); consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) { if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; DefaultMQRealPushConsumerImpl consumer = (DefaultMQRealPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset); consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
return true; return true;
} }
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
...@@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile Thread shutDownHook; private volatile Thread shutDownHook;
private volatile boolean stopped = false; private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer; private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer; private MQPushConsumerInner hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString(); private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName; private String traceTopicName;
...@@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostProducer = hostProducer; this.hostProducer = hostProducer;
} }
public DefaultMQPushConsumerImpl getHostConsumer() { public MQPushConsumerInner getHostConsumer() {
return hostConsumer; return hostConsumer;
} }
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) { public void setHostConsumer(MQPushConsumerInner hostConsumer) {
this.hostConsumer = hostConsumer; this.hostConsumer = hostConsumer;
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.quickstart; package org.apache.rocketmq.example.quickstart;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
...@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; ...@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
/** /**
* This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}. * This example shows how to subscribe and consume messages using providing {@link DefaultMQRealPushConsumer}.
*/ */
public class Consumer { public class Consumer {
...@@ -35,7 +35,7 @@ public class Consumer { ...@@ -35,7 +35,7 @@ public class Consumer {
/* /*
* Instantiate with specified consumer group name. * Instantiate with specified consumer group name.
*/ */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello"); DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("hello");
/* /*
* Specify name server addresses. * Specify name server addresses.
...@@ -61,7 +61,6 @@ public class Consumer { ...@@ -61,7 +61,6 @@ public class Consumer {
/* /*
* Register callback to execute on arrival of messages fetched from brokers. * Register callback to execute on arrival of messages fetched from brokers.
*/ */
// consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
......
...@@ -47,7 +47,6 @@ public class Producer { ...@@ -47,7 +47,6 @@ public class Producer {
/* /*
* Launch the instance. * Launch the instance.
*/ */
// producer.setNamesrvAddr("47.102.149.193:9876");
producer.start(); producer.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
...@@ -76,7 +75,7 @@ public class Producer { ...@@ -76,7 +75,7 @@ public class Producer {
/* /*
* Shut down once the producer instance is not longer in use. * Shut down once the producer instance is not longer in use.
*/ */
Thread.sleep(30000L); Thread.sleep(3000L);
producer.shutdown(); producer.shutdown();
} }
} }
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.simple; package org.apache.rocketmq.example.simple;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
...@@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumer { public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException { public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*"); consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800 //wrong time format 2017_0422_221800
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册