提交 9707846a 编写于 作者: O odbozhou

Consumer synchronized start or stop

上级 09aeea34
...@@ -38,7 +38,7 @@ public class DefaultQueueMetaData implements QueueMetaData { ...@@ -38,7 +38,7 @@ public class DefaultQueueMetaData implements QueueMetaData {
@Override @Override
public List<QueueMetaData.Partition> partitions() { public List<QueueMetaData.Partition> partitions() {
return null; return partitions;
} }
public static class DefaultPartition implements Partition { public static class DefaultPartition implements Partition {
......
...@@ -93,7 +93,7 @@ public class PullConsumerImpl implements Consumer { ...@@ -93,7 +93,7 @@ public class PullConsumerImpl implements Consumer {
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId); this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put("TIMEOUT", consumerId); properties.put("CONSUMER_ID", consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
...@@ -325,7 +325,7 @@ public class PullConsumerImpl implements Consumer { ...@@ -325,7 +325,7 @@ public class PullConsumerImpl implements Consumer {
} }
@Override @Override
public void start() { public synchronized void start() {
if (!started) { if (!started) {
try { try {
this.pullConsumerScheduleService.start(); this.pullConsumerScheduleService.start();
...@@ -339,7 +339,7 @@ public class PullConsumerImpl implements Consumer { ...@@ -339,7 +339,7 @@ public class PullConsumerImpl implements Consumer {
} }
@Override @Override
public void stop() { public synchronized void stop() {
if (this.started) { if (this.started) {
this.localMessageCache.stop(); this.localMessageCache.stop();
this.pullConsumerScheduleService.shutdown(); this.pullConsumerScheduleService.shutdown();
......
...@@ -255,7 +255,7 @@ public class PushConsumerImpl implements Consumer { ...@@ -255,7 +255,7 @@ public class PushConsumerImpl implements Consumer {
} }
@Override @Override
public void start() { public synchronized void start() {
currentState = ServiceLifeState.STARTING; currentState = ServiceLifeState.STARTING;
if (!started) { if (!started) {
try { try {
...@@ -269,7 +269,7 @@ public class PushConsumerImpl implements Consumer { ...@@ -269,7 +269,7 @@ public class PushConsumerImpl implements Consumer {
} }
@Override @Override
public void stop() { public synchronized void stop() {
currentState = ServiceLifeState.STOPPING; currentState = ServiceLifeState.STOPPING;
if (this.started) { if (this.started) {
this.rocketmqPushConsumer.shutdown(); this.rocketmqPushConsumer.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册