未验证 提交 26f8ae89 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #813 from ShannonDing/snode

Support real push consume message model
......@@ -99,17 +99,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* <ul>
* <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two
* cases:
* If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
* <ol>
* <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet
* expired, which means the consumer group represents a lately launched business, consuming will
* start from the very beginning;
* if the consumer group is created so recently that the earliest message being subscribed has yet expired, which
* means the consumer group represents a lately launched business, consuming will start from the very beginning;
* </li>
* <li>
* if the earliest message being subscribed has expired, consuming will start from the latest
* messages, meaning messages born prior to the booting timestamp would be ignored.
* if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning
* messages born prior to the booting timestamp would be ignored.
* </li>
* </ol>
* </li>
......@@ -125,10 +123,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* Backtracking consumption time with second precision. Time format is
* 20131223171201<br>
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Default backtracking consumption time Half an hour ago.
* Backtracking consumption time with second precision. Time format is 20131223171201<br> Implying Seventeen twelve
* and 01 seconds on December 23, 2013 year<br> Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
......@@ -173,8 +169,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
......@@ -190,8 +186,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
* <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
* {@code pullThresholdForTopic} if it is't unlimited
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code
* pullThresholdForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
......@@ -201,11 +197,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
* {@code pullThresholdSizeForTopic} if it is't unlimited
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code
* pullThresholdSizeForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
* consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
......@@ -280,16 +276,37 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
......@@ -315,7 +332,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
......@@ -331,9 +347,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
......@@ -632,8 +650,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
......
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -48,7 +49,6 @@ import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientRemotingProcessor implements RequestProcessor {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
......@@ -60,7 +60,7 @@ public class ClientRemotingProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
......@@ -77,6 +77,8 @@ public class ClientRemotingProcessor implements RequestProcessor {
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.SNODE_PUSH_MESSAGE:
return this.processSnodePushMessage(ctx, request);
default:
break;
}
......@@ -210,4 +212,21 @@ public class ClientRemotingProcessor implements RequestProcessor {
return response;
}
private RemotingCommand processSnodePushMessage(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final PushMessageHeader requestHeader =
(PushMessageHeader) request
.decodeCommandCustomHeader(PushMessageHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
boolean result =
this.mqClientFactory.processSnodePushMessage(msg,
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset());
return null;
}
}
......@@ -190,6 +190,8 @@ public class MQClientAPIImpl {
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.SNODE_PUSH_MESSAGE, this.clientRemotingProcessor, null);
}
public List<String> getNameServerAddressList() {
......
......@@ -26,7 +26,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -92,7 +95,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
//private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
......@@ -107,10 +111,26 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
if (realPushModel) {
log.info("Open Real Push Model for {}",defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this);
} else {
rebalanceImpl = new RebalancePushImpl(this);
}
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
......@@ -287,6 +307,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
......@@ -444,6 +474,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
......@@ -460,6 +498,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
String localOffsetKey = pullRequest.getConsumerGroup()
+ "@" + pullRequest.getMessageQueue().getTopic()
+ "@" + pullRequest.getMessageQueue().getQueueId();
if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
return;
}
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
......@@ -971,7 +1017,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
if (realPushModel) {
return ConsumeType.CONSUME_PUSH;
} else {
return ConsumeType.CONSUME_PASSIVELY;
}
}
@Override
......@@ -1143,4 +1193,50 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final int queueID,
final long offset) {
String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
if (localOffset.get() < offset) {
//should start pull message process
log.debug("Current Local key:{} and offset:{} and push offset:{}",localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE",localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...",localOffsetKey);
}
//update local offset
localOffset.set(offset);
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null){
processQueues.put(localOffsetKey,new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic,"",queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList,processQueue,messageQueue,true);
log.info(".......submitConsumeRequest:{},Offset:{}...",localOffsetKey,offset);
}
return true;
}
}
......@@ -137,6 +137,7 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
//return ConsumeType.CONSUME_PUSH;
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
public class RebalanceRealPushImpl extends RebalancePushImpl {
public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
super(defaultMQPushConsumerImpl);
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PUSH;
}
}
......@@ -1376,4 +1376,18 @@ public class MQClientInstance {
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
public boolean processSnodePushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
final int queueID,
final long offset) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
return true;
}
return false;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册