diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index f999eae983eda93ade34ce7d86962bd46354cff8..91e9f887b6dd1b510f5afdafb1ad478d9806e5cb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -99,17 +99,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* -
*
CONSUME_FROM_LAST_OFFSET
: consumer clients pick up where it stopped previously.
- * If it were a newly booting up consumer client, according aging of the consumer group, there are two
- * cases:
+ * If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
*
* -
- * if the consumer group is created so recently that the earliest message being subscribed has yet
- * expired, which means the consumer group represents a lately launched business, consuming will
- * start from the very beginning;
+ * if the consumer group is created so recently that the earliest message being subscribed has yet expired, which
+ * means the consumer group represents a lately launched business, consuming will start from the very beginning;
*
* -
- * if the earliest message being subscribed has expired, consuming will start from the latest
- * messages, meaning messages born prior to the booting timestamp would be ignored.
+ * if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning
+ * messages born prior to the booting timestamp would be ignored.
*
*
*
@@ -125,10 +123,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
- * Backtracking consumption time with second precision. Time format is
- * 20131223171201
- * Implying Seventeen twelve and 01 seconds on December 23, 2013 year
- * Default backtracking consumption time Half an hour ago.
+ * Backtracking consumption time with second precision. Time format is 20131223171201
Implying Seventeen twelve
+ * and 01 seconds on December 23, 2013 year
Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
@@ -173,8 +169,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
- * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
- * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+ * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
+ * the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
@@ -190,8 +186,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
*
- * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
- * {@code pullThresholdForTopic} if it is't unlimited
+ * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code
+ * pullThresholdForTopic} if it is't unlimited
*
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
@@ -201,11 +197,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
*
- * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
- * {@code pullThresholdSizeForTopic} if it is't unlimited
+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code
+ * pullThresholdSizeForTopic} if it is't unlimited
*
- * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
- * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
+ * consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
@@ -280,16 +276,37 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
/**
- * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
+ * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
+ *
+ * @param consumerGroup Consume queue.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ * @param allocateMessageQueueStrategy Message queue allocating algorithm.
+ */
+ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
+ this.consumerGroup = consumerGroup;
+ if (allocateMessageQueueStrategy == null) {
+ this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
+ } else {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ }
+ defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
+ }
+
+ /**
+ * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
+ * customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
@@ -315,7 +332,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
-
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
@@ -331,9 +347,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
+ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
@@ -632,8 +650,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
- * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
- * if null or * expression,meaning subscribe all
+ * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if
+ * null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index 74a6e3341538976bb59123aaa65f7a65bac950da..c8c39199c74c96cca93c6d9026717f01f7d1e584 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
public class ClientRemotingProcessor implements RequestProcessor {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
@@ -60,7 +60,7 @@ public class ClientRemotingProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
- NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
@@ -77,6 +77,8 @@ public class ClientRemotingProcessor implements RequestProcessor {
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
+ case RequestCode.SNODE_PUSH_MESSAGE:
+ return this.processSnodePushMessage(ctx, request);
default:
break;
}
@@ -210,4 +212,21 @@ public class ClientRemotingProcessor implements RequestProcessor {
return response;
}
+
+ private RemotingCommand processSnodePushMessage(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final PushMessageHeader requestHeader =
+ (PushMessageHeader) request
+ .decodeCommandCustomHeader(PushMessageHeader.class);
+
+ final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
+ boolean result =
+ this.mqClientFactory.processSnodePushMessage(msg,
+ requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getQueueId(),
+ requestHeader.getQueueOffset());
+
+ return null;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index ab93573061c0f50241f3e38b97a6a48849e1c0fb..585d0026c28457dc42a7b417ce059cc48a2a1ce0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -190,6 +190,8 @@ public class MQClientAPIImpl {
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.SNODE_PUSH_MESSAGE, this.clientRemotingProcessor, null);
}
public List getNameServerAddressList() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 9b421422f7e6cd61cce24594cfd26b9fa7f6bed8..392a2f08153bd4b36d68d98e2e10a6aa4811de4f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -26,7 +26,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -92,7 +95,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
- private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
+ //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
+ private final RebalanceImpl rebalanceImpl;
private final ArrayList filterMessageHookList = new ArrayList();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList consumeMessageHookList = new ArrayList();
@@ -107,10 +111,26 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
+ private boolean realPushModel = true;
+ private final ConcurrentHashMap localConsumerOffset = new ConcurrentHashMap();
+ private final ConcurrentHashMap pullStopped = new ConcurrentHashMap();
+ private final ConcurrentHashMap processQueues = new ConcurrentHashMap();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
+ this(defaultMQPushConsumer, rpcHook, true);
+ }
+
+ public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
+ boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
+ this.realPushModel = realPushModel;
+ if (realPushModel) {
+ log.info("Open Real Push Model for {}",defaultMQPushConsumer.getConsumerGroup());
+ rebalanceImpl = new RebalanceRealPushImpl(this);
+ } else {
+ rebalanceImpl = new RebalancePushImpl(this);
+ }
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -287,6 +307,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
+ //Update local offset according remote offset
+ String localOffsetKey = pullRequest.getConsumerGroup()
+ + "@" + pullRequest.getMessageQueue().getTopic()
+ + "@" + pullRequest.getMessageQueue().getQueueId();
+ AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
+ if (localOffset == null) {
+ localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
+ }
+ localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
+
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
@@ -444,6 +474,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+ String localOffsetKey = pullRequest.getConsumerGroup()
+ + "@" + pullRequest.getMessageQueue().getTopic()
+ + "@" + pullRequest.getMessageQueue().getQueueId();
+ if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
+ //Stop pull request
+ log.info("Stop pull request, {}", localOffsetKey);
+ return;
+ }
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
@@ -460,6 +498,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
+ String localOffsetKey = pullRequest.getConsumerGroup()
+ + "@" + pullRequest.getMessageQueue().getTopic()
+ + "@" + pullRequest.getMessageQueue().getQueueId();
+ if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
+ //Stop pull request
+ log.info("Stop pull request, {}", localOffsetKey);
+ return;
+ }
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
@@ -971,7 +1017,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public ConsumeType consumeType() {
- return ConsumeType.CONSUME_PASSIVELY;
+ if (realPushModel) {
+ return ConsumeType.CONSUME_PUSH;
+ } else {
+ return ConsumeType.CONSUME_PASSIVELY;
+ }
}
@Override
@@ -1143,4 +1193,50 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
+
+ public boolean processPushMessage(final MessageExt msg,
+ final String consumerGroup,
+ final String topic,
+ final int queueID,
+ final long offset) {
+ String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
+ AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
+ if (localOffset == null) {
+ log.info("Current Local offset have not set, initiallized to -1.");
+ this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
+ return false;
+ }
+ if (localOffset.get() < offset) {
+ //should start pull message process
+ log.debug("Current Local key:{} and offset:{} and push offset:{}",localOffsetKey, localOffset.get(), offset);
+ return false;
+ } else {
+ //Stop pull request
+ AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
+ if (pullStop == null) {
+ this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
+ log.info("Pull stop flag of {} is not set, initialize to TRUE",localOffsetKey);
+ }
+ pullStop = this.pullStopped.get(localOffsetKey);
+ if (!pullStop.get()) {
+ pullStop.set(true);
+ log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...",localOffsetKey);
+ }
+ //update local offset
+ localOffset.set(offset);
+ //submit to process queue
+ List messageExtList = new ArrayList();
+ messageExtList.add(msg);
+ ProcessQueue processQueue = processQueues.get(localOffsetKey);
+ if (processQueue == null){
+ processQueues.put(localOffsetKey,new ProcessQueue());
+ processQueue = processQueues.get(localOffsetKey);
+ }
+ processQueue.putMessage(messageExtList);
+ MessageQueue messageQueue = new MessageQueue(topic,"",queueID);
+ this.consumeMessageService.submitConsumeRequest(messageExtList,processQueue,messageQueue,true);
+ log.info(".......submitConsumeRequest:{},Offset:{}...",localOffsetKey,offset);
+ }
+ return true;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 4f85cc0d12430db8b396c289ec8c2793eab1e291..a8bbaa557dc4e8186343392e53d1f619858f4239 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -137,6 +137,7 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
+ //return ConsumeType.CONSUME_PUSH;
}
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..8bd27019dd06804bee576622f08c4d521d8910be
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+
+public class RebalanceRealPushImpl extends RebalancePushImpl {
+
+ public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ super(defaultMQPushConsumerImpl);
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_PUSH;
+ }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 7fa1541c9a11ef131a96b3f8e08a08ef76ed3733..4a81c3b7d5a89ade677e51ae03826d3fc41c359c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1376,4 +1376,18 @@ public class MQClientInstance {
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
+ public boolean processSnodePushMessage(final MessageExt msg,
+ final String consumerGroup,
+ final String topic,
+ final int queueID,
+ final long offset) {
+ MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
+ if (null != mqConsumerInner) {
+ DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
+ consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
+ return true;
+ }
+
+ return false;
+ }
}