From 9707846a05ce81029ef4f74bb0d318e7d76ecd20 Mon Sep 17 00:00:00 2001 From: odbozhou <877036922@qq.com> Date: Thu, 16 May 2019 00:30:45 +0800 Subject: [PATCH] Consumer synchronized start or stop --- .../openmessaging/rocketmq/config/DefaultQueueMetaData.java | 2 +- .../openmessaging/rocketmq/consumer/PullConsumerImpl.java | 6 +++--- .../openmessaging/rocketmq/consumer/PushConsumerImpl.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java index ddbb2d68..b2695bfa 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java @@ -38,7 +38,7 @@ public class DefaultQueueMetaData implements QueueMetaData { @Override public List partitions() { - return null; + return partitions; } public static class DefaultPartition implements Partition { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 521dfc8b..81b80b9e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -93,7 +93,7 @@ public class PullConsumerImpl implements Consumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); - properties.put("TIMEOUT", consumerId); + properties.put("CONSUMER_ID", consumerId); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); @@ -325,7 +325,7 @@ public class PullConsumerImpl implements Consumer { } @Override - public void start() { + public synchronized void start() { if (!started) { try { this.pullConsumerScheduleService.start(); @@ -339,7 +339,7 @@ public class PullConsumerImpl implements Consumer { } @Override - public void stop() { + public synchronized void stop() { if (this.started) { this.localMessageCache.stop(); this.pullConsumerScheduleService.shutdown(); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 6837e73a..91650f28 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -255,7 +255,7 @@ public class PushConsumerImpl implements Consumer { } @Override - public void start() { + public synchronized void start() { currentState = ServiceLifeState.STARTING; if (!started) { try { @@ -269,7 +269,7 @@ public class PushConsumerImpl implements Consumer { } @Override - public void stop() { + public synchronized void stop() { currentState = ServiceLifeState.STOPPING; if (this.started) { this.rocketmqPushConsumer.shutdown(); -- GitLab