提交 2aae40f9 编写于 作者: D duhenglucky

Add pull schedual service

上级 d66243c0
......@@ -151,7 +151,7 @@ public class MQPullConsumerScheduleService {
}
}
class PullTaskImpl implements Runnable {
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
public class AssignedMessageQueue {
private ConcurrentHashMap<MessageQueue, MessageQueueStat> assignedMessageQueueState;
public AssignedMessageQueue() {
assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueStat>();
}
public boolean isPaused(MessageQueue messageQueue) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueueStat != null) {
return messageQueueStat.isPaused();
}
return false;
}
public void pause(Collection<MessageQueue> messageQueues) {
for (MessageQueue messageQueue : messageQueues) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.setPaused(true);
}
}
}
public void resume(Collection<MessageQueue> messageQueueCollection) {
for (MessageQueue messageQueue : messageQueueCollection) {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
messageQueueStat.setPaused(false);
}
}
}
public long getNextOffset(MessageQueue messageQueue) throws MQClientException {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (assignedMessageQueueState.get(messageQueue) != null) {
return messageQueueStat.getNextOffset();
}
return -1;
}
public void updateNextOffset(MessageQueue messageQueue, long offset) throws MQClientException {
MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
if (messageQueue == null) {
messageQueueStat = new MessageQueueStat(messageQueue, offset);
assignedMessageQueueState.putIfAbsent(messageQueue, messageQueueStat);
}
assignedMessageQueueState.get(messageQueue).setNextOffset(offset);
}
public void updateAssignedMessageQueue(Set<MessageQueue> assigned) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (!assigned.contains(next.getKey())) {
it.remove();
}
}
for (MessageQueue messageQueue : assigned) {
if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
MessageQueueStat messageQueueStat = new MessageQueueStat(messageQueue);
this.assignedMessageQueueState.put(messageQueue, messageQueueStat);
}
}
}
}
public void removeAssignedMessageQueue(String topic) {
synchronized (this.assignedMessageQueueState) {
Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
it.remove();
}
}
}
}
public Map<MessageQueue, Long> getNeedCommitOffsets() {
Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>();
Set<Map.Entry<MessageQueue, MessageQueueStat>> entries = this.assignedMessageQueueState.entrySet();
for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) {
map.put(entry.getKey(), entry.getValue().getNextOffset());
}
return map;
}
public class MessageQueueStat {
private MessageQueue messageQueue;
private boolean paused = false;
private long nextOffset = -1;
public MessageQueueStat(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public MessageQueueStat(MessageQueue messageQueue, long nextOffset) {
this.messageQueue = messageQueue;
this.nextOffset = nextOffset;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public boolean isPaused() {
return paused;
}
public void setPaused(boolean paused) {
this.paused = paused;
}
public long getNextOffset() {
return nextOffset;
}
public void setNextOffset(long nextOffset) {
this.nextOffset = nextOffset;
}
}
}
......@@ -68,16 +68,16 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
protected final DefaultMQPullConsumer defaultMQPullConsumer;
private final long consumerStartTimestamp = System.currentTimeMillis();
private final RPCHook rpcHook;
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
protected MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
protected RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
this.defaultMQPullConsumer = defaultMQPullConsumer;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
private final InternalLogger log = ClientLogger.getLog();
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
;
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
super(defaultMQPullConsumer, rpcHook);
}
public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
updatePullTask(topic, assignedMessageQueue);
}
public void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
if (!mqNewSet.contains(next.getKey())) {
next.getValue().setCancelled(true);
it.remove();
}
}
}
for (MessageQueue messageQueue : mqNewSet) {
if (!this.taskTable.containsKey(messageQueue)) {
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel = defaultMQPullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
updateAssignedMessageQueue(topic, mqAll);
break;
case CLUSTERING:
updateAssignedMessageQueue(topic, mqDivided);
break;
default:
break;
}
}
}
int nextPullBatchNums() {
return Math.min(10, consumeRequestCache.remainingCapacity());
}
@Override
public synchronized void start() throws MQClientException {
super.start();
final String group = this.defaultMQPullConsumer.getConsumerGroup();
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
10, //this.pullThreadNums,
new ThreadFactoryImpl("PullMsgThread-" + group)
);
this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
}
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
try {
assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
} catch (MQClientException e) {
log.error("A error occurred in update consume: {} offset process.", remoteQueue, e);
}
}
private void addToConsumed(ConsumeRequest consumeRequest) {
synchronized (this.allConsumed) {
allConsumed.add(consumeRequest);
}
}
void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
addToConsumed(consumeRequest);
} catch (InterruptedException ex) {
log.error("Submit consumeRequest error", ex);
}
}
long nextPullOffset(MessageQueue remoteQueue) {
long offset = -1;
try {
offset = assignedMessageQueue.getNextOffset(remoteQueue);
if (offset == -1) {
offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false);
assignedMessageQueue.updateNextOffset(remoteQueue, offset);
}
} catch (MQClientException e) {
log.error("An error occurred in fetch consume offset process.", e);
}
return offset;
}
public class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
public PullTaskImpl(final MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
@Override
public void run() {
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
log.debug("Message Queue: {} has been paused!", messageQueue);
return;
}
SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
long offset = nextPullOffset(messageQueue);
try {
PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums());
ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
switch (pullResult.getPullStatus()) {
case FOUND:
if (processQueue != null) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
break;
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("An error occurred in pull message process.", e);
}
}
}
public boolean isCancelled() {
return cancelled;
}
public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
}
public class ConsumeRequest {
private final List<MessageExt> messageExts;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
private long startConsumeTimeMillis;
public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
this.messageExts = messageExts;
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
public List<MessageExt> getMessageExts() {
return messageExts;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public long getStartConsumeTimeMillis() {
return startConsumeTimeMillis;
}
public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
this.startConsumeTimeMillis = startConsumeTimeMillis;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册