提交 28a6deac 编写于 作者: Y yukon

Clean expired message in oms pull consumer.

上级 2e3c1b00
...@@ -28,7 +28,7 @@ import java.nio.charset.Charset; ...@@ -28,7 +28,7 @@ import java.nio.charset.Charset;
public class SimpleProducer { public class SimpleProducer {
public static void main(String[] args) { public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer(); final Producer producer = messagingAccessPoint.createProducer();
......
...@@ -27,7 +27,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys; ...@@ -27,7 +27,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePullConsumer { public class SimplePullConsumer {
public static void main(String[] args) { public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
......
...@@ -29,7 +29,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys; ...@@ -29,7 +29,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePushConsumer { public class SimplePushConsumer {
public static void main(String[] args) { public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint. final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
......
...@@ -18,27 +18,39 @@ package io.openmessaging.rocketmq.consumer; ...@@ -18,27 +18,39 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.PropertyKeys; import io.openmessaging.PropertyKeys;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.ConsumeRequest;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
class LocalMessageCache { class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache; private final BlockingQueue<ConsumeRequest> consumeRequestCache;
private final Map<String, ConsumeRequest> consumedRequest; private final Map<String, ConsumeRequest> consumedRequest;
private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable; private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
private final DefaultMQPullConsumer rocketmqPullConsumer; private final DefaultMQPullConsumer rocketmqPullConsumer;
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
private final ScheduledExecutorService cleanExpireMsgExecutors;
private final static Logger log = ClientLogger.getLog(); private final static Logger log = ClientLogger.getLog();
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
...@@ -47,6 +59,8 @@ class LocalMessageCache { ...@@ -47,6 +59,8 @@ class LocalMessageCache {
this.pullOffsetTable = new ConcurrentHashMap<>(); this.pullOffsetTable = new ConcurrentHashMap<>();
this.rocketmqPullConsumer = rocketmqPullConsumer; this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"OMS_CleanExpireMsgScheduledThread_"));
} }
int nextPullBatchNums() { int nextPullBatchNums() {
...@@ -92,9 +106,11 @@ class LocalMessageCache { ...@@ -92,9 +106,11 @@ class LocalMessageCache {
try { try {
ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
if (consumeRequest != null) { if (consumeRequest != null) {
MessageExt messageExt = consumeRequest.getMessageExt();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
return consumeRequest.getMessageExt(); consumedRequest.put(messageExt.getMsgId(), consumeRequest);
return messageExt;
} }
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
...@@ -112,4 +128,87 @@ class LocalMessageCache { ...@@ -112,4 +128,87 @@ class LocalMessageCache {
} }
} }
} }
void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
consumedRequest.remove(messageExt.getMsgId());
long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
try {
rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
}
}
@Override
public void startup() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
}
@Override
public void shutdown() {
ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
}
private void cleanExpireMsg() {
for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
.getRebalanceImpl().getProcessQueueTable().entrySet()) {
ProcessQueue pq = next.getValue();
MessageQueue mq = next.getKey();
ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
if (lockTreeMap == null) {
log.error("Gets tree map lock in process queue error, may be has compatibility issue");
return;
}
TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
int loop = msgTreeMap.size();
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
lockTreeMap.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
System.out.println(msg);
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
} else {
break;
}
} else {
break;
}
} finally {
lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("Gets expired message exception", e);
}
try {
rocketmqPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
ack(mq, pq, msg);
} catch (Exception e) {
log.error("Send back expired msg exception", e);
}
}
}
}
private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
try {
return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
} catch (IllegalAccessException e) {
return null;
}
}
} }
...@@ -114,6 +114,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -114,6 +114,7 @@ public class PullConsumerImpl implements PullConsumer {
try { try {
registerPullTaskCallback(); registerPullTaskCallback();
this.pullConsumerScheduleService.start(); this.pullConsumerScheduleService.start();
this.localMessageCache.startup();
} catch (MQClientException e) { } catch (MQClientException e) {
throw new OMSRuntimeException("-1", e); throw new OMSRuntimeException("-1", e);
} }
...@@ -136,6 +137,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -136,6 +137,7 @@ public class PullConsumerImpl implements PullConsumer {
switch (pullResult.getPullStatus()) { switch (pullResult.getPullStatus()) {
case FOUND: case FOUND:
if (pq != null) { if (pq != null) {
pq.putMessage(pullResult.getMsgFoundList());
for (final MessageExt messageExt : pullResult.getMsgFoundList()) { for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
} }
...@@ -155,6 +157,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -155,6 +157,7 @@ public class PullConsumerImpl implements PullConsumer {
@Override @Override
public synchronized void shutdown() { public synchronized void shutdown() {
if (this.started) { if (this.started) {
this.localMessageCache.shutdown();
this.pullConsumerScheduleService.shutdown(); this.pullConsumerScheduleService.shutdown();
this.rocketmqPullConsumer.shutdown(); this.rocketmqPullConsumer.shutdown();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册