diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java index ddbb2d681ad04d01ca56bf4d5f203c09f6fecb3c..b2695bfa0efefe025e9b45700f2171e9c7de4098 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java @@ -38,7 +38,7 @@ public class DefaultQueueMetaData implements QueueMetaData { @Override public List partitions() { - return null; + return partitions; } public static class DefaultPartition implements Partition { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 521dfc8b67e01295499bf4406457e2a819c68a84..81b80b9ecd46bf7ad7b7ccc91aaa3305966ba3f0 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -93,7 +93,7 @@ public class PullConsumerImpl implements Consumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); - properties.put("TIMEOUT", consumerId); + properties.put("CONSUMER_ID", consumerId); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); @@ -325,7 +325,7 @@ public class PullConsumerImpl implements Consumer { } @Override - public void start() { + public synchronized void start() { if (!started) { try { this.pullConsumerScheduleService.start(); @@ -339,7 +339,7 @@ public class PullConsumerImpl implements Consumer { } @Override - public void stop() { + public synchronized void stop() { if (this.started) { this.localMessageCache.stop(); this.pullConsumerScheduleService.shutdown(); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 6837e73a6347b5e4a95d4785f58a679e1f83fba9..91650f2814fd8946531ea627550debd6a0373732 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -255,7 +255,7 @@ public class PushConsumerImpl implements Consumer { } @Override - public void start() { + public synchronized void start() { currentState = ServiceLifeState.STARTING; if (!started) { try { @@ -269,7 +269,7 @@ public class PushConsumerImpl implements Consumer { } @Override - public void stop() { + public synchronized void stop() { currentState = ServiceLifeState.STOPPING; if (this.started) { this.rocketmqPushConsumer.shutdown();