提交 11b686e4 编写于 作者: D duhenglucky

Add lite pull consumer example

上级 af04557a
......@@ -27,18 +27,50 @@ import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer {
private LiteMQPullConsumerImpl liteMQPullConsumer;
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
private long consumeTimeout = 15;
/**
* Is auto commit offset
*/
private boolean autoCommit = true;
private int pullThreadNumbers = 20;
/**
* Maximum commit offset interval time in seconds.
*/
private long autoCommitInterval = 20;
public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
this.setConsumerGroup(consumerGroup);
this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook);
}
@Override public void subscribe(String topic, String subExpression) throws MQClientException{
public DefaultLiteMQPullConsumer(String consumerGroup) {
this.setConsumerGroup(consumerGroup);
this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null);
}
@Override
public void start() throws MQClientException{
this.liteMQPullConsumer.start();
}
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.liteMQPullConsumer.subscribe(topic, subExpression);
}
@Override public void unsubscribe(String topic) {
@Override
public void unsubscribe(String topic) {
this.liteMQPullConsumer.unsubscribe(topic);
}
@Override public List<MessageExt> poll() {
@Override
public List<MessageExt> poll() {
return poll(this.getConsumerPullTimeoutMillis());
}
......@@ -46,19 +78,55 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
return liteMQPullConsumer.poll(timeout);
}
@Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
@Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.liteMQPullConsumer.seek(messageQueue, offset);
}
@Override
public void pause(Collection<MessageQueue> messageQueues) {
this.liteMQPullConsumer.pause(messageQueues);
}
@Override
public void resume(Collection<MessageQueue> messageQueues) {
this.liteMQPullConsumer.resume(messageQueues);
}
@Override
public void commitSync() {
this.liteMQPullConsumer.commit();
}
public long getConsumeTimeout() {
return consumeTimeout;
}
@Override public void pause(Collection<MessageQueue> messageQueueCollection) {
public void setConsumeTimeout(long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}
public boolean isAutoCommit() {
return autoCommit;
}
@Override public void resume(Collection<MessageQueue> partitions) {
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
public int getPullThreadNumbers() {
return pullThreadNumbers;
}
@Override public void commitSync() {
public void setPullThreadNumbers(int pullThreadNumbers) {
this.pullThreadNumbers = pullThreadNumbers;
}
public long getAutoCommitInterval() {
return autoCommitInterval;
}
public void setAutoCommitInterval(long autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}
}
......@@ -38,9 +38,6 @@ public interface LiteMQPullConsumer {
*/
void unsubscribe(final String topic);
/**
* @return
*/
List<MessageExt> poll();
List<MessageExt> poll(long timeout);
......
......@@ -27,12 +27,14 @@ import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -50,6 +52,8 @@ import org.apache.rocketmq.remoting.RPCHook;
public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private final InternalLogger log = ClientLogger.getLog();
private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
......@@ -58,12 +62,21 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
;
private final ScheduledExecutorService cleanExpireMsgExecutors;
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
private ScheduledExecutorService autoCommitExecutors;
public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
super(defaultMQPullConsumer, rpcHook);
this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"Lite_CleanExpireMsgScheduledThread_"));
this.autoCommitExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"Lite_AutoCommitScheduledThread_"));
}
public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
......@@ -115,18 +128,43 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
@Override
public synchronized void start() throws MQClientException {
this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
super.start();
final String group = this.defaultMQPullConsumer.getConsumerGroup();
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
10, //this.pullThreadNums,
this.defaultLiteMQPullConsumer.getPullThreadNumbers(),
new ThreadFactoryImpl("PullMsgThread-" + group)
);
this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultLiteMQPullConsumer.getConsumeTimeout(), this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (defaultLiteMQPullConsumer.isAutoCommit()) {
commit();
}
}
}, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
public List<MessageExt> poll(long timeout) {
try {
ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS);
if (consumeRequest != null) {
List<MessageExt> messages = consumeRequest.getMessageExts();
for (MessageExt messageExt : messages) {
......@@ -148,6 +186,16 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
assignedMessageQueue.resume(messageQueues);
}
public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.updatePullOffset(messageQueue, offset);
try {
updateConsumeOffset(messageQueue, offset);
} catch (MQClientException ex) {
log.error("Seek offset to remote message queue error!", ex);
throw ex;
}
}
public void unsubscribe(final String topic) {
unsubscribe(topic);
removePullTaskCallback(topic);
......@@ -270,7 +318,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
> 10 * 60 * 1000) {
> this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
} else {
break;
......@@ -316,16 +364,19 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
@Override
public void run() {
System.out.println("begin pull message");
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
scheduledThreadPoolExecutor.schedule(this, 1000, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
return;
}
SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
long offset = nextPullOffset(messageQueue);
long pullDelayTimeMills = 0;
try {
PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
PullResult pullResult = pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
switch (pullResult.getPullStatus()) {
case FOUND:
......@@ -338,9 +389,17 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Exception e) {
} catch (Throwable e) {
pullDelayTimeMills = 1000;
e.printStackTrace();
log.error("An error occurred in pull message process.", e);
}
if (!this.isCancelled()) {
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
import java.util.Arrays;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerTest {
public static void main(String[] args) throws Exception {
DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null);
litePullConsumer.subscribe("test", null);
litePullConsumer.start();
MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1);
int i = 0;
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.println("-----------");
System.out.println(messageExts);
System.out.println("-----------");
i++;
if (i == 3) {
System.out.println("pause");
litePullConsumer.pause(Arrays.asList(messageQueue));
}
if (i == 10) {
System.out.println("resume");
litePullConsumer.resume(Arrays.asList(messageQueue));
}
litePullConsumer.commitSync();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册