提交 c7def6fe 编写于 作者: L Li Zhanhui 提交者: dongeforever

[ROCKETMQ-255] Fix offsetStore being null after consumers start()

Author: Li Zhanhui <lizhanhui@gmail.com>

Closes #142 from lizhanhui/offset_store.
上级 5ac45fc5
...@@ -569,6 +569,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -569,6 +569,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
default: default:
break; break;
} }
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
} }
this.offsetStore.load(); this.offsetStore.load();
......
...@@ -577,6 +577,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -577,6 +577,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default: default:
break; break;
} }
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
} }
this.offsetStore.load(); this.offsetStore.load();
......
...@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -83,6 +84,11 @@ public class DefaultMQPullConsumerTest { ...@@ -83,6 +84,11 @@ public class DefaultMQPullConsumerTest {
pullConsumer.shutdown(); pullConsumer.shutdown();
} }
@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
Assert.assertNotNull(pullConsumer.getOffsetStore());
}
@Test @Test
public void testPullMessage_Success() throws Exception { public void testPullMessage_Success() throws Exception {
doAnswer(new Answer() { doAnswer(new Answer() {
......
...@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -155,6 +156,11 @@ public class DefaultMQPushConsumerTest { ...@@ -155,6 +156,11 @@ public class DefaultMQPushConsumerTest {
pushConsumer.shutdown(); pushConsumer.shutdown();
} }
@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
Assert.assertNotNull(pushConsumer.getOffsetStore());
}
@Test @Test
public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException { public void testPullMessage_Success() throws InterruptedException, RemotingException, MQBrokerException {
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册