提交 c3e81544 编写于 作者: S ShannonDing

Modify code style for push consumer class

上级 84bbb8f3
...@@ -126,7 +126,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -126,7 +126,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.realPushModel = realPushModel; this.realPushModel = realPushModel;
if (realPushModel) { if (realPushModel) {
log.info("Open Real Push Model for {}",defaultMQPushConsumer.getConsumerGroup()); log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
rebalanceImpl = new RebalanceRealPushImpl(this); rebalanceImpl = new RebalanceRealPushImpl(this);
} else { } else {
rebalanceImpl = new RebalancePushImpl(this); rebalanceImpl = new RebalancePushImpl(this);
...@@ -1208,19 +1208,19 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1208,19 +1208,19 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
if (localOffset.get() < offset) { if (localOffset.get() < offset) {
//should start pull message process //should start pull message process
log.debug("Current Local key:{} and offset:{} and push offset:{}",localOffsetKey, localOffset.get(), offset); log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false; return false;
} else { } else {
//Stop pull request //Stop pull request
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) { if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE",localOffsetKey); log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
} }
pullStop = this.pullStopped.get(localOffsetKey); pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) { if (!pullStop.get()) {
pullStop.set(true); pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...",localOffsetKey); log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
} }
//update local offset //update local offset
localOffset.set(offset); localOffset.set(offset);
...@@ -1228,14 +1228,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1228,14 +1228,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
List<MessageExt> messageExtList = new ArrayList<MessageExt>(); List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg); messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey); ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null){ if (processQueue == null) {
processQueues.put(localOffsetKey,new ProcessQueue()); processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey); processQueue = processQueues.get(localOffsetKey);
} }
processQueue.putMessage(messageExtList); processQueue.putMessage(messageExtList);
MessageQueue messageQueue = new MessageQueue(topic,"",queueID); MessageQueue messageQueue = new MessageQueue(topic, "", queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList,processQueue,messageQueue,true); this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
log.info(".......submitConsumeRequest:{},Offset:{}...",localOffsetKey,offset); log.info(".......submitConsumeRequest:{},Offset:{}...", localOffsetKey, offset);
} }
return true; return true;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册