diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index f89ae4c752e4aacddcc267d3f4970db929cdbb0c..ac7f7c479b0c6b6ee2fc0e8fba85d3710fa95ac1 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -28,7 +28,7 @@ import java.nio.charset.Charset; public class SimpleProducer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final Producer producer = messagingAccessPoint.createProducer(); diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 86cb696e05f9688f471b45ab88128d0dd691baa3..36b6b1d12a2d18c03f756540aee20d5f68596dab 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -27,7 +27,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys; public class SimplePullConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index 813e301d1b61326cb8c17ab807f937eb01dbc3c8..84c1b15243c31b35a48739b1af94fc2712f4e3ba 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -29,7 +29,7 @@ import io.openmessaging.rocketmq.domain.NonStandardKeys; public class SimplePushConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final PushConsumer consumer = messagingAccessPoint. createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 0ffd36c0dd6e5731ae369665691d03a57cd90e4c..9afc4c931ba38977d24ee5f5773925d43689f927 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -18,27 +18,39 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; import java.util.Collections; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.slf4j.Logger; -class LocalMessageCache { +class LocalMessageCache implements ServiceLifecycle { private final BlockingQueue consumeRequestCache; private final Map consumedRequest; private final ConcurrentHashMap pullOffsetTable; private final DefaultMQPullConsumer rocketmqPullConsumer; private final ClientConfig clientConfig; + private final ScheduledExecutorService cleanExpireMsgExecutors; + private final static Logger log = ClientLogger.getLog(); LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { @@ -47,6 +59,8 @@ class LocalMessageCache { this.pullOffsetTable = new ConcurrentHashMap<>(); this.rocketmqPullConsumer = rocketmqPullConsumer; this.clientConfig = clientConfig; + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "OMS_CleanExpireMsgScheduledThread_")); } int nextPullBatchNums() { @@ -92,9 +106,11 @@ class LocalMessageCache { try { ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); if (consumeRequest != null) { + MessageExt messageExt = consumeRequest.getMessageExt(); consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); - return consumeRequest.getMessageExt(); + MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); + consumedRequest.put(messageExt.getMsgId(), consumeRequest); + return messageExt; } } catch (InterruptedException ignore) { } @@ -112,4 +128,87 @@ class LocalMessageCache { } } } + + void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) { + consumedRequest.remove(messageExt.getMsgId()); + long offset = processQueue.removeMessage(Collections.singletonList(messageExt)); + try { + rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + + @Override + public void startup() { + this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpireMsg(); + } + }, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES); + } + + @Override + public void shutdown() { + ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS); + } + + private void cleanExpireMsg() { + for (final Map.Entry next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl() + .getRebalanceImpl().getProcessQueueTable().entrySet()) { + ProcessQueue pq = next.getValue(); + MessageQueue mq = next.getKey(); + ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); + if (lockTreeMap == null) { + log.error("Gets tree map lock in process queue error, may be has compatibility issue"); + return; + } + + TreeMap msgTreeMap = pq.getMsgTreeMap(); + + int loop = msgTreeMap.size(); + for (int i = 0; i < loop; i++) { + MessageExt msg = null; + try { + lockTreeMap.readLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty()) { + msg = msgTreeMap.firstEntry().getValue(); + System.out.println(msg); + if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) + > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { + //Expired, ack and remove it. + } else { + break; + } + } else { + break; + } + } finally { + lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("Gets expired message exception", e); + } + + try { + rocketmqPullConsumer.sendMessageBack(msg, 3); + log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", + msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + ack(mq, pq, msg); + } catch (Exception e) { + log.error("Send back expired msg exception", e); + } + } + } + } + + private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) { + try { + return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true); + } catch (IllegalAccessException e) { + return null; + } + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 56a49a49dde5c7906756d380660bf7013d9642fe..5d4e7d9f7b1f7d756cd6c7529e65c7d2bde4d4f5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -114,6 +114,7 @@ public class PullConsumerImpl implements PullConsumer { try { registerPullTaskCallback(); this.pullConsumerScheduleService.start(); + this.localMessageCache.startup(); } catch (MQClientException e) { throw new OMSRuntimeException("-1", e); } @@ -136,6 +137,7 @@ public class PullConsumerImpl implements PullConsumer { switch (pullResult.getPullStatus()) { case FOUND: if (pq != null) { + pq.putMessage(pullResult.getMsgFoundList()); for (final MessageExt messageExt : pullResult.getMsgFoundList()) { localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); } @@ -155,6 +157,7 @@ public class PullConsumerImpl implements PullConsumer { @Override public synchronized void shutdown() { if (this.started) { + this.localMessageCache.shutdown(); this.pullConsumerScheduleService.shutdown(); this.rocketmqPullConsumer.shutdown(); }