From c7def6fed42f85886eb747bdd8b9e9aed5d7a908 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Tue, 26 Sep 2017 13:20:39 +0800 Subject: [PATCH] [ROCKETMQ-255] Fix offsetStore being null after consumers start() Author: Li Zhanhui Closes #142 from lizhanhui/offset_store. --- .../client/impl/consumer/DefaultMQPullConsumerImpl.java | 1 + .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 1 + .../rocketmq/client/consumer/DefaultMQPullConsumerTest.java | 6 ++++++ .../rocketmq/client/consumer/DefaultMQPushConsumerTest.java | 6 ++++++ 4 files changed, 14 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 8640d2d6..6eca3818 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -569,6 +569,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { default: break; } + this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 7eda7c1b..4ba6216b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -577,6 +577,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { default: break; } + this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index 9cdeda8e..569055de 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -83,6 +84,11 @@ public class DefaultMQPullConsumerTest { pullConsumer.shutdown(); } + @Test + public void testStart_OffsetShouldNotNUllAfterStart() { + Assert.assertNotNull(pullConsumer.getOffsetStore()); + } + @Test public void testPullMessage_Success() throws Exception { doAnswer(new Answer() { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 7e69cc14..94b3f0f9 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -155,6 +156,11 @@ public class DefaultMQPushConsumerTest { pushConsumer.shutdown(); } + @Test + public void testStart_OffsetShouldNotNUllAfterStart() { + Assert.assertNotNull(pushConsumer.getOffsetStore()); + } + @Test public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException { final CountDownLatch countDownLatch = new CountDownLatch(1); -- GitLab