提交 62ca947a 编写于 作者: K King 提交者: Heng Du

Polish LitePullConsumer (#1332)

* fix unsubscribe code

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* fix commit consumed offset

* polish commit consumed offset

* pass checkstyle

* pass checkstyle
上级 11b686e4
......@@ -42,7 +42,7 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
/**
* Maximum commit offset interval time in seconds.
*/
private long autoCommitInterval = 20;
private long autoCommitInterval = 5;
public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
this.setConsumerGroup(consumerGroup);
......@@ -55,7 +55,7 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
}
@Override
public void start() throws MQClientException{
public void start() throws MQClientException {
this.liteMQPullConsumer.start();
}
......@@ -95,7 +95,7 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements
@Override
public void commitSync() {
this.liteMQPullConsumer.commit();
this.liteMQPullConsumer.commitSync();
}
public long getConsumeTimeout() {
......
......@@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
......@@ -40,6 +38,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* Default pulling consumer
*/
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
/**
......
......@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
......@@ -107,15 +106,6 @@ public class AssignedMessageQueue {
}
}
public Map<MessageQueue, Long> getNeedCommitOffsets() {
Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>();
Set<Map.Entry<MessageQueue, MessageQueueStat>> entries = this.assignedMessageQueueState.entrySet();
for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) {
map.put(entry.getKey(), entry.getValue().getNextOffset());
}
return map;
}
public class MessageQueueStat {
private MessageQueue messageQueue;
private boolean paused = false;
......
......@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
......@@ -50,6 +51,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private final InternalLogger log = ClientLogger.getLog();
private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
......@@ -59,7 +61,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
private volatile Set<ConsumeRequest> consumedSet = new HashSet<ConsumeRequest>();
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
......@@ -69,6 +71,8 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private ScheduledExecutorService autoCommitExecutors;
private final ThreadLocal<ConsumeRequest> preConsumeRequestLocal = new ThreadLocal<ConsumeRequest>();
public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
super(defaultMQPullConsumer, rpcHook);
this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
......@@ -145,7 +149,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
@Override
public void run() {
if (defaultLiteMQPullConsumer.isAutoCommit()) {
commit();
commitAll();
}
}
}, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
......@@ -164,7 +168,9 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
public List<MessageExt> poll(long timeout) {
try {
ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS);
addToConsumed(preConsumeRequestLocal.get());
ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
preConsumeRequestLocal.set(consumeRequest);
if (consumeRequest != null) {
List<MessageExt> messages = consumeRequest.getMessageExts();
for (MessageExt messageExt : messages) {
......@@ -173,7 +179,8 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
return messages;
}
} catch (InterruptedException ignore) {
} catch (InterruptedException e) {
log.error("poll ComsumeRequest error.", e);
}
return null;
}
......@@ -197,7 +204,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
}
public void unsubscribe(final String topic) {
unsubscribe(topic);
super.unsubscribe(topic);
removePullTaskCallback(topic);
assignedMessageQueue.removeAssignedMessageQueue(topic);
}
......@@ -212,30 +219,39 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
while (it.hasNext()) {
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
next.getValue().setCancelled(true);
it.remove();
}
}
}
}
public void commit() {
List<ConsumeRequest> consumeRequests;
synchronized (this.allConsumed) {
consumeRequests = this.allConsumed;
this.allConsumed = new ArrayList<ConsumeRequest>();
public void commitSync() {
addToConsumed(preConsumeRequestLocal.get());
preConsumeRequestLocal.set(null);
commitAll();
}
public void commitAll() {
Set<ConsumeRequest> consumedRequests;
synchronized (this.consumedSet) {
consumedRequests = this.consumedSet;
this.consumedSet = new HashSet<ConsumeRequest>();
}
for (ConsumeRequest consumeRequest : consumeRequests) {
for (ConsumeRequest consumeRequest : consumedRequests) {
consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
}
Set<Map.Entry<MessageQueue, Long>> entrySet = assignedMessageQueue.getNeedCommitOffsets().entrySet();
for (Map.Entry<MessageQueue, Long> entry : entrySet) {
Set<Map.Entry<MessageQueue, ProcessQueue>> entrySet = this.rebalanceImpl.getProcessQueueTable().entrySet();
for (Map.Entry<MessageQueue, ProcessQueue> entry : entrySet) {
try {
updateConsumeOffset(entry.getKey(), entry.getValue());
long consumeOffset = entry.getValue().getConsumeOffset();
if (consumeOffset != -1)
updateConsumeOffset(entry.getKey(), consumeOffset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
}
}
this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet());
this.getOffsetStore().persistAll(this.rebalanceImpl.getProcessQueueTable().keySet());
}
private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
......@@ -260,7 +276,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
}
}
void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
try {
assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
} catch (MQClientException e) {
......@@ -269,21 +285,23 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
}
private void addToConsumed(ConsumeRequest consumeRequest) {
synchronized (this.allConsumed) {
allConsumed.add(consumeRequest);
if (consumeRequest != null) {
synchronized (this.consumedSet) {
if (!consumedSet.contains(consumeRequest))
consumedSet.add(consumeRequest);
}
}
}
void submitConsumeRequest(ConsumeRequest consumeRequest) {
private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
addToConsumed(consumeRequest);
} catch (InterruptedException ex) {
log.error("Submit consumeRequest error", ex);
}
}
long nextPullOffset(MessageQueue remoteQueue) {
private long nextPullOffset(MessageQueue remoteQueue) {
long offset = -1;
try {
offset = assignedMessageQueue.getNextOffset(remoteQueue);
......@@ -337,7 +355,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
this.defaultMQPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
System.out.println("Send expired msg back.");
log.info("Send expired msg back.");
commit(mq, pq, msg);
} catch (Exception e) {
log.error("Send back expired msg exception", e);
......@@ -364,7 +382,6 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
@Override
public void run() {
System.out.println("begin pull message");
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
......
......@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -431,4 +432,15 @@ public class ProcessQueue {
public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
public long getConsumeOffset() {
if (msgTreeMap.isEmpty() && queueOffsetMax == 0L)
return -1;
if (!msgTreeMap.isEmpty())
return msgTreeMap.firstKey();
else
return queueOffsetMax + 1;
}
}
......@@ -25,24 +25,24 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerTest {
public static void main(String[] args) throws Exception {
DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null);
litePullConsumer.subscribe("test", null);
litePullConsumer.setNamesrvAddr("localhost:9876");
litePullConsumer.subscribe("litepullconsumertest9", null);
litePullConsumer.start();
MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1);
MessageQueue messageQueue = new MessageQueue("test", "IT-C02YW28FLVDL.local", 1);
int i = 0;
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.println("-----------");
System.out.println(messageExts);
System.out.println("-----------");
System.out.printf("%s%n", messageExts);
i++;
if (i == 3) {
System.out.println("pause");
System.out.printf("pause%n");
litePullConsumer.pause(Arrays.asList(messageQueue));
}
if (i == 10) {
System.out.println("resume");
System.out.printf("resume%n");
litePullConsumer.resume(Arrays.asList(messageQueue));
}
//
litePullConsumer.commitSync();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册