提交 4a4a464b 编写于 作者: K King 提交者: Heng Du

Polish lite pull consumer and fix bug (#1402)

1. Fix the problem of seek function blocking.
2. Fix shutdown function bug.
3. Make the code cleaner
4. Add unit test.
5. Fix seek bug
上级 cfca4401
...@@ -53,14 +53,17 @@ public class Validators { ...@@ -53,14 +53,17 @@ public class Validators {
if (UtilAll.isBlank(group)) { if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null); throw new MQClientException("the specified group is blank", null);
} }
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
if (!regularExpressionMatcher(group, PATTERN)) { if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format( throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group, "the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null); VALID_PATTERN_STR), null);
} }
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
} }
/** /**
......
...@@ -128,6 +128,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -128,6 +128,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/ */
private long pollTimeoutMillis = 1000 * 5; private long pollTimeoutMillis = 1000 * 5;
/**
* Interval time in in milliseconds for checking changes in topic metadata.
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
/** /**
* Default constructor. * Default constructor.
*/ */
...@@ -175,202 +180,92 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -175,202 +180,92 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
} }
/**
* Start the consumer
*/
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
this.defaultLitePullConsumerImpl.start(); this.defaultLitePullConsumerImpl.start();
} }
/**
* Shutdown the consumer
*/
@Override @Override
public void shutdown() { public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown(); this.defaultLitePullConsumerImpl.shutdown();
} }
/**
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override @Override
public void subscribe(String topic, String subExpression) throws MQClientException { public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
} }
/**
* Subscribe some topic with selector.
*
* @param messageSelector message selector({@link MessageSelector}), can be null.
* @throws MQClientException if there is any client error.
*/
@Override @Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
} }
/**
* Unsubscribe consumption some topic
*
* @param topic Message topic that needs to be unsubscribe.
*/
@Override @Override
public void unsubscribe(String topic) { public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
} }
/**
* Manually assign a list of message queues to this consumer. This interface does not allow for incremental
* assignment and will replace the previous assignment (if there is one).
*
* @param messageQueues Message queues that needs to be assigned.
*/
@Override @Override
public void assign(Collection<MessageQueue> messageQueues) { public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
} }
/**
* Fetch data for the topics or partitions specified using assign API
*
* @return list of message, can be null.
*/
@Override @Override
public List<MessageExt> poll() { public List<MessageExt> poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis()); return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
} }
/**
* Fetch data for the topics or partitions specified using assign API
*
* @param timeout The amount time, in milliseconds, spent waiting in poll if data is not available. Must not be
* negative
* @return list of message, can be null.
*/
@Override @Override
public List<MessageExt> poll(long timeout) { public List<MessageExt> poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout); return defaultLitePullConsumerImpl.poll(timeout);
} }
/**
* Overrides the fetch offsets that the consumer will use on the next poll. If this API is invoked for the same
* message queue more than once, the latest offset will be used on the next poll(). Note that you may lose data if
* this API is arbitrarily used in the middle of consumption.
*
* @param messageQueue
* @param offset
*/
@Override @Override
public void seek(MessageQueue messageQueue, long offset) throws MQClientException { public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset); this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
} }
/**
* Suspend pulling from the requested message queues.
*
* Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop immediately until the
* messages of the requested message queues drain.
*
* Note that this method does not affect message queue subscription. In particular, it does not cause a group
* rebalance.
*
* @param messageQueues Message queues that needs to be paused.
*/
@Override @Override
public void pause(Collection<MessageQueue> messageQueues) { public void pause(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues)); this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
} }
/**
* Resume specified message queues which have been paused with {@link #pause(Collection)}.
*
* @param messageQueues Message queues that needs to be resumed.
*/
@Override @Override
public void resume(Collection<MessageQueue> messageQueues) { public void resume(Collection<MessageQueue> messageQueues) {
this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues)); this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
} }
/**
* Get metadata about the message queues for a given topic.
*
* @param topic The topic that need to get metadata.
* @return collection of message queues
* @throws MQClientException if there is any client error.
*/
@Override @Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException { public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic)); return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
} }
/**
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
* queue.
*
* @param messageQueue Message queues that needs to get offset by timestamp.
* @param timestamp
* @return offset
* @throws MQClientException if there is any client error.
*/
@Override @Override
public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException { public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp); return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
} }
/**
* Register a callback for sensing topic metadata changes.
*
* @param topic The topic that need to monitor.
* @param topicMessageQueueChangeListener Callback when topic metadata changes.
* @throws MQClientException if there is any client error.
*/
@Override @Override
public void registerTopicMessageQueueChangeListener(String topic, public void registerTopicMessageQueueChangeListener(String topic,
TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException { TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener); this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
} }
/**
* Manually commit consume offset.
*/
@Override @Override
public void commitSync() { public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync(); this.defaultLitePullConsumerImpl.commitSync();
} }
/**
* Get the last committed offset for the given message queue.
*
* @param messageQueue
* @return offset, if offset equals -1 means no offset in broker.
* @throws MQClientException if there is any client error.
*/
@Override @Override
public Long committed(MessageQueue messageQueue) throws MQClientException { public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue); return this.defaultLitePullConsumerImpl.committed(messageQueue);
} }
/**
* Whether to enable auto-commit consume offset.
*
* @return true if enable auto-commit, false if disable auto-commit.
*/
@Override @Override
public boolean isAutoCommit() { public boolean isAutoCommit() {
return autoCommit; return autoCommit;
} }
/**
* Set whether to enable auto-commit consume offset.
*
* @param autoCommit Whether to enable auto-commit.
*/
@Override @Override
public void setAutoCommit(boolean autoCommit) { public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit; this.autoCommit = autoCommit;
...@@ -392,12 +287,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -392,12 +287,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.autoCommitIntervalMillis = autoCommitIntervalMillis; this.autoCommitIntervalMillis = autoCommitIntervalMillis;
} }
public int getPullBatchNums() { public int getPullBatchSize() {
return pullBatchSize; return pullBatchSize;
} }
public void setPullBatchNums(int pullBatchNums) { public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchNums; this.pullBatchSize = pullBatchSize;
} }
public long getPullThresholdForAll() { public long getPullThresholdForAll() {
...@@ -503,4 +398,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -503,4 +398,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
} }
public long getTopicMetadataCheckIntervalMillis() {
return topicMetadataCheckIntervalMillis;
}
public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
}
} }
...@@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -117,25 +117,24 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return; return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey(); MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue(); AtomicLong offset = entry.getValue();
if (offset != null) { if (offset != null) {
if (mqs.contains(mq)) { if (mqs.contains(mq)) {
try { try {
this.updateConsumeOffsetToBroker(mq, offset.get()); this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName, this.groupName,
this.mQClientFactory.getClientId(), this.mQClientFactory.getClientId(),
mq, mq,
offset.get()); offset.get());
} catch (Exception e) { } catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
} }
} else {
unusedMQ.add(mq);
} }
} }
} }
...@@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -187,8 +186,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
/** /**
* Update the Consumer Offset in one way, once the Master is off, updated to Slave, * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
* here need to be optimized.
*/ */
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
...@@ -196,8 +194,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -196,8 +194,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
} }
/** /**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
* here need to be optimized.
*/ */
@Override @Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
......
...@@ -21,12 +21,11 @@ import java.util.Iterator; ...@@ -21,12 +21,11 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue { public class AssignedMessageQueue {
private ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState; private final ConcurrentHashMap<MessageQueue, MessageQueueState> assignedMessageQueueState;
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
...@@ -54,7 +53,6 @@ public class AssignedMessageQueue { ...@@ -54,7 +53,6 @@ public class AssignedMessageQueue {
for (MessageQueue messageQueue : messageQueues) { for (MessageQueue messageQueue : messageQueues) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) { if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.getPausedLatch().reset();
messageQueueState.setPaused(true); messageQueueState.setPaused(true);
} }
} }
...@@ -65,7 +63,6 @@ public class AssignedMessageQueue { ...@@ -65,7 +63,6 @@ public class AssignedMessageQueue {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) { if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueState.setPaused(false); messageQueueState.setPaused(false);
messageQueueState.getPausedLatch().reset();
} }
} }
} }
...@@ -123,14 +120,6 @@ public class AssignedMessageQueue { ...@@ -123,14 +120,6 @@ public class AssignedMessageQueue {
return -1; return -1;
} }
public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
return messageQueueState.getPausedLatch();
}
return null;
}
public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) { public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) { synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator(); Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator();
...@@ -195,9 +184,8 @@ public class AssignedMessageQueue { ...@@ -195,9 +184,8 @@ public class AssignedMessageQueue {
private volatile long pullOffset = -1; private volatile long pullOffset = -1;
private volatile long consumeOffset = -1; private volatile long consumeOffset = -1;
private volatile long seekOffset = -1; private volatile long seekOffset = -1;
private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
public MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) { private MessageQueueState(MessageQueue messageQueue, ProcessQueue processQueue) {
this.messageQueue = messageQueue; this.messageQueue = messageQueue;
this.processQueue = processQueue; this.processQueue = processQueue;
} }
...@@ -249,9 +237,5 @@ public class AssignedMessageQueue { ...@@ -249,9 +237,5 @@ public class AssignedMessageQueue {
public void setSeekOffset(long seekOffset) { public void setSeekOffset(long seekOffset) {
this.seekOffset = seekOffset; this.seekOffset = seekOffset;
} }
public CountDownLatch2 getPausedLatch() {
return pausedLatch;
}
} }
} }
...@@ -246,10 +246,6 @@ public class MQClientInstance { ...@@ -246,10 +246,6 @@ public class MQClientInstance {
log.info("the client factory [{}] start OK", this.clientId); log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING; this.serviceState = ServiceState.RUNNING;
break; break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED: case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default: default:
......
...@@ -202,7 +202,6 @@ public class DefaultLitePullConsumerTest { ...@@ -202,7 +202,6 @@ public class DefaultLitePullConsumerTest {
field.setAccessible(true); field.setAccessible(true);
AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl); AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) field.get(litePullConsumerImpl);
assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50); assertEquals(assignedMessageQueue.getSeekOffset(messageQueue), 50);
assertEquals(assignedMessageQueue.getConusmerOffset(messageQueue), 50);
litePullConsumer.shutdown(); litePullConsumer.shutdown();
} }
...@@ -219,6 +218,26 @@ public class DefaultLitePullConsumerTest { ...@@ -219,6 +218,26 @@ public class DefaultLitePullConsumerTest {
} }
} }
@Test
public void testPauseAndResume_Success() throws Exception {
DefaultLitePullConsumer litePullConsumer = createNotStartLitePullConsumer();
try {
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
litePullConsumer.pause(Collections.singletonList(messageQueue));
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.isEmpty()).isTrue();
litePullConsumer.resume(Collections.singletonList(messageQueue));
result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
private MessageQueue createMessageQueue() { private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue(); MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName); messageQueue.setBrokerName(brokerName);
...@@ -328,5 +347,4 @@ public class DefaultLitePullConsumerTest { ...@@ -328,5 +347,4 @@ public class DefaultLitePullConsumerTest {
} }
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
} }
} }
...@@ -31,9 +31,7 @@ public class LitePullConsumerSubscribe { ...@@ -31,9 +31,7 @@ public class LitePullConsumerSubscribe {
try { try {
while (running) { while (running) {
List<MessageExt> messageExts = litePullConsumer.poll(); List<MessageExt> messageExts = litePullConsumer.poll();
if (messageExts != null) { System.out.printf("%s%n", messageExts);
System.out.printf("%s%n", messageExts);
}
} }
} finally { } finally {
litePullConsumer.shutdown(); litePullConsumer.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册