diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 392a2f08153bd4b36d68d98e2e10a6aa4811de4f..000d5692fff792bed9d77fd4823f90c2516ea216 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -126,7 +126,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.rpcHook = rpcHook; this.realPushModel = realPushModel; if (realPushModel) { - log.info("Open Real Push Model for {}",defaultMQPushConsumer.getConsumerGroup()); + log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup()); rebalanceImpl = new RebalanceRealPushImpl(this); } else { rebalanceImpl = new RebalancePushImpl(this); @@ -1208,19 +1208,19 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } if (localOffset.get() < offset) { //should start pull message process - log.debug("Current Local key:{} and offset:{} and push offset:{}",localOffsetKey, localOffset.get(), offset); + log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); return false; } else { //Stop pull request AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); if (pullStop == null) { this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); - log.info("Pull stop flag of {} is not set, initialize to TRUE",localOffsetKey); + log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); } pullStop = this.pullStopped.get(localOffsetKey); if (!pullStop.get()) { pullStop.set(true); - log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...",localOffsetKey); + log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey); } //update local offset localOffset.set(offset); @@ -1228,14 +1228,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { List messageExtList = new ArrayList(); messageExtList.add(msg); ProcessQueue processQueue = processQueues.get(localOffsetKey); - if (processQueue == null){ - processQueues.put(localOffsetKey,new ProcessQueue()); + if (processQueue == null) { + processQueues.put(localOffsetKey, new ProcessQueue()); processQueue = processQueues.get(localOffsetKey); } processQueue.putMessage(messageExtList); - MessageQueue messageQueue = new MessageQueue(topic,"",queueID); - this.consumeMessageService.submitConsumeRequest(messageExtList,processQueue,messageQueue,true); - log.info(".......submitConsumeRequest:{},Offset:{}...",localOffsetKey,offset); + MessageQueue messageQueue = new MessageQueue(topic, "", queueID); + this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true); + log.info(".......submitConsumeRequest:{},Offset:{}...", localOffsetKey, offset); } return true; }