From f508f131f7dee2bcb86e66a6beb7bbdedbe31bc6 Mon Sep 17 00:00:00 2001 From: Jaskey Date: Wed, 19 Apr 2017 11:58:48 +0800 Subject: [PATCH] [ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer start/shutdown, closes apache/incubator-rocketmq#68 --- .../consumer/DefaultMQPullConsumerImpl.java | 21 ++++++++++++------- .../consumer/DefaultMQPushConsumerImpl.java | 15 ++++++------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index b26d062b..7d43b372 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -70,7 +70,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private final RPCHook rpcHook; private final ArrayList consumeMessageHookList = new ArrayList(); private final ArrayList filterMessageHookList = new ArrayList(); - private ServiceState serviceState = ServiceState.CREATE_JUST; + private volatile ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private PullAPIWrapper pullAPIWrapper; private OffsetStore offsetStore; @@ -161,7 +161,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); } - private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout) + private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); @@ -365,7 +366,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); } - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException { this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); } @@ -449,7 +451,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { return defaultMQPullConsumer; } - public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) + public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, + PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); @@ -510,7 +513,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } } - public void shutdown() { + public synchronized void shutdown() { switch (this.serviceState) { case CREATE_JUST: break; @@ -528,7 +531,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { } } - public void start() throws MQClientException { + public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; @@ -593,6 +596,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { default: break; } + } private void checkConfig() throws MQClientException { @@ -662,7 +666,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.offsetStore.updateOffset(mq, offset, false); } - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage(String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.makeSureStateOK(); return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); } @@ -692,6 +697,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { return serviceState; } + //Don't use this deprecated setter, which will be removed soon. + @Deprecated public void setServiceState(ServiceState serviceState) { this.serviceState = serviceState; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 4f33732d..67f3ebe2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -97,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private final long consumerStartTimestamp = System.currentTimeMillis(); private final ArrayList consumeMessageHookList = new ArrayList(); private final RPCHook rpcHook; - private ServiceState serviceState = ServiceState.CREATE_JUST; + private volatile ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private PullAPIWrapper pullAPIWrapper; private volatile boolean pause = false; @@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public void shutdown() { + public synchronized void shutdown() { switch (this.serviceState) { case CREATE_JUST: break; @@ -535,7 +535,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public void start() throws MQClientException { + public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), @@ -615,9 +615,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - this.mQClientFactory.rebalanceImmediately(); } @@ -855,7 +853,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.consumeMessageService.updateCorePoolSize(corePoolSize); } - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage(String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); } @@ -1014,7 +1013,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { return serviceState; } - public void setServiceState(ServiceState serviceState) { + //Don't use this deprecated setter, which will be removed soon. + @Deprecated + public synchronized void setServiceState(ServiceState serviceState) { this.serviceState = serviceState; } -- GitLab