未验证 提交 1e8e7282 编写于 作者: youlixishia's avatar youlixishia 提交者: GitHub

Merge pull request #3295 from lwclover/develop

[ISSUE #3308] production level pull api demo
...@@ -16,63 +16,131 @@ ...@@ -16,63 +16,131 @@
*/ */
package org.apache.rocketmq.example.simple; package org.apache.rocketmq.example.simple;
import java.util.HashMap; import java.util.HashSet;
import java.util.Map; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
@SuppressWarnings("deprecation")
public class PullConsumer { public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException { public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setNamesrvAddr("127.0.0.1:9876");
Set<String> topics = new HashSet<>();
//You would better to register topics,It will use in rebalance when starting
topics.add("TopicTest");
consumer.setRegisterTopics(topics);
consumer.start(); consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a"); ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
for (MessageQueue mq : mqs) { @Override
System.out.printf("Consume from the queue: %s%n", mq); public Thread newThread(Runnable r) {
SINGLE_MQ: return new Thread(r, "PullConsumerThread");
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
} }
});
for(String topic : consumer.getRegisterTopics()){
executors.execute(new Runnable() {
public void doSomething(List<MessageExt> msgs){
//do you business
System.out.println(msgs);
}
@Override
public void run() {
while(true){
try {
Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance(topic);
if(messageQueues == null || messageQueues.isEmpty()){
Thread.sleep(1000);
continue;
}
PullResult pullResult = null;
for(MessageQueue messageQueue : messageQueues){
try {
long offset = this.consumeFromOffset(messageQueue);
pullResult = consumer.pull(messageQueue, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
if(msgs != null && !msgs.isEmpty()){
this.doSomething(msgs);
//update offset to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
//print pull tps
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
}
break;
case OFFSET_ILLEGAL:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_NEW_MSG:
Thread.sleep(1);
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_MATCHED_MSG:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
default:
}
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
} catch (MQClientException e) {
//reblance error
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
}
public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException{
//-1 when started
long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if(offset < 0){
//query from broker
offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0){
//first time start from last offset
offset = consumer.maxOffset(messageQueue);
}
//make sure
if (offset < 0){
offset = 0;
}
return offset;
}
public void incPullTPS(String topic, int pullSize) {
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
}
});
} }
// executors.shutdown();
consumer.shutdown(); // consumer.shutdown();
} }
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册