未验证 提交 ba48f6a2 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #739 from ShannonDing/snode

[ISSUE #737]Polish Push Consumer
......@@ -124,14 +124,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.updateConsumeOffsetToSnode(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
......@@ -153,14 +153,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.updateConsumeOffsetToSnode(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e);
}
}
}
......@@ -193,6 +193,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
/**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
......@@ -200,14 +204,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish();
}
if (findBrokerResult != null) {
if (snodeAddr != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
......@@ -216,35 +220,34 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setEnodeName(mq.getBrokerName());
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
snodeAddr, requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
snodeAddr, requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null);
}
}
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr){
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish();
}
if (findBrokerResult != null) {
if (snodeAddr != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
snodeAddr, requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
throw new MQClientException("Get Offset from broker[" + mq.getBrokerName() + "] failed, Snode is not exist", null);
}
}
}
......@@ -627,10 +627,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default:
break;
}
this.tryToFindSnodePublishInfo();
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
this.mQClientFactory.rebalanceImmediately();
}
......@@ -1138,4 +1138,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService = consumeMessageService;
}
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
}
......@@ -84,7 +84,8 @@ public class RebalancePushImpl extends RebalanceImpl {
}
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
//this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
this.getmQClientFactory().sendHeartbeatToAllSnodeWithLock();
}
@Override
......
......@@ -501,7 +501,8 @@ public class MQClientInstance {
}
// may need to check one broker every cluster...
// assume that the configs of every broker in cluster are the the same.
String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
//String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
String addr = findSnodeAddressInPublish();
if (addr != null) {
try {
......@@ -1191,17 +1192,17 @@ public class MQClientInstance {
}
public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
String snodeAddr = this.findSnodeAddressInPublish();
if (null == snodeAddr) {
this.updateSnodeInfoFromNameServer();
snodeAddr = this.findSnodeAddressInPublish();
}
if (null != brokerAddr) {
if (null != snodeAddr) {
try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
return this.mQClientAPIImpl.getConsumerIdListByGroup(snodeAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
log.warn("getConsumerIdListByGroup exception, " + snodeAddr + " " + group, e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册