提交 82b18d5c 编写于 作者: S ShannonDing

Add RealPush rebalance as default

上级 0f0b5653
...@@ -92,7 +92,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -92,7 +92,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer; private final DefaultMQPushConsumer defaultMQPushConsumer;
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis(); private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
...@@ -107,10 +108,22 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -107,10 +108,22 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService; private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0; private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0;
private boolean realPushModel = true;
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer; this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
if (realPushModel) {
rebalanceImpl = new RebalanceRealPushImpl(this);
} else {
rebalanceImpl = new RebalancePushImpl(this);
}
} }
public void registerFilterMessageHook(final FilterMessageHook hook) { public void registerFilterMessageHook(final FilterMessageHook hook) {
...@@ -971,7 +984,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -971,7 +984,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override @Override
public ConsumeType consumeType() { public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY; if (realPushModel) {
return ConsumeType.CONSUME_PUSH;
} else {
return ConsumeType.CONSUME_PASSIVELY;
}
} }
@Override @Override
......
...@@ -137,6 +137,7 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -137,6 +137,7 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override @Override
public ConsumeType consumeType() { public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY; return ConsumeType.CONSUME_PASSIVELY;
//return ConsumeType.CONSUME_PUSH;
} }
@Override @Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
public class RebalanceRealPushImpl extends RebalancePushImpl {
public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
super(defaultMQPushConsumerImpl);
}
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PUSH;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册