提交 b8edd0b5 编写于 作者: S ShannonDing

Submit Push Message to consumeservice

上级 d49da7b8
......@@ -99,17 +99,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* <ul>
* <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two
* cases:
* If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
* <ol>
* <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet
* expired, which means the consumer group represents a lately launched business, consuming will
* start from the very beginning;
* if the consumer group is created so recently that the earliest message being subscribed has yet expired, which
* means the consumer group represents a lately launched business, consuming will start from the very beginning;
* </li>
* <li>
* if the earliest message being subscribed has expired, consuming will start from the latest
* messages, meaning messages born prior to the booting timestamp would be ignored.
* if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning
* messages born prior to the booting timestamp would be ignored.
* </li>
* </ol>
* </li>
......@@ -125,10 +123,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* Backtracking consumption time with second precision. Time format is
* 20131223171201<br>
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Default backtracking consumption time Half an hour ago.
* Backtracking consumption time with second precision. Time format is 20131223171201<br> Implying Seventeen twelve
* and 01 seconds on December 23, 2013 year<br> Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
......@@ -173,8 +169,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
......@@ -190,8 +186,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
* <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
* {@code pullThresholdForTopic} if it is't unlimited
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code
* pullThresholdForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
......@@ -201,11 +197,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
* {@code pullThresholdSizeForTopic} if it is't unlimited
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code
* pullThresholdSizeForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
* consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
......@@ -280,16 +276,37 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
......@@ -315,7 +332,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
......@@ -331,9 +347,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
......@@ -632,8 +650,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
......
......@@ -114,6 +114,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private boolean realPushModel = true;
private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
......@@ -1224,6 +1225,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
//update local offset
localOffset.set(offset);
//submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null){
processQueues.put(localOffsetKey,new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic,"",queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList,processQueue,messageQueue,true);
log.info(".......submitConsumeRequest:{},Offset:{}...",localOffsetKey,offset);
}
return true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册