提交 60e12f08 编写于 作者: S ShannonDing

Polish Push Comsumer

上级 38d71d9a
...@@ -124,14 +124,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -124,14 +124,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
if (offset != null) { if (offset != null) {
if (mqs.contains(mq)) { if (mqs.contains(mq)) {
try { try {
this.updateConsumeOffsetToBroker(mq, offset.get()); this.updateConsumeOffsetToSnode(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}",
this.groupName, this.groupName,
this.mQClientFactory.getClientId(), this.mQClientFactory.getClientId(),
mq, mq,
offset.get()); offset.get());
} catch (Exception e) { } catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e);
} }
} else { } else {
unusedMQ.add(mq); unusedMQ.add(mq);
...@@ -153,14 +153,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -153,14 +153,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
AtomicLong offset = this.offsetTable.get(mq); AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) { if (offset != null) {
try { try {
this.updateConsumeOffsetToBroker(mq, offset.get()); this.updateConsumeOffsetToSnode(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}",
this.groupName, this.groupName,
this.mQClientFactory.getClientId(), this.mQClientFactory.getClientId(),
mq, mq,
offset.get()); offset.get());
} catch (Exception e) { } catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e);
} }
} }
} }
...@@ -193,6 +193,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -193,6 +193,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true); updateConsumeOffsetToBroker(mq, offset, true);
} }
private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
/** /**
* Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
...@@ -200,14 +204,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -200,14 +204,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
@Override @Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException { MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == snodeAddr){
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr= this.mQClientFactory.findSnodeAddressInPublish();
} }
if (findBrokerResult != null) { if (snodeAddr != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic()); requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName); requestHeader.setConsumerGroup(this.groupName);
...@@ -216,35 +220,34 @@ public class RemoteBrokerOffsetStore implements OffsetStore { ...@@ -216,35 +220,34 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setEnodeName(mq.getBrokerName()); requestHeader.setEnodeName(mq.getBrokerName());
if (isOneway) { if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); snodeAddr, requestHeader, 1000 * 5);
} else { } else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); snodeAddr, requestHeader, 1000 * 5);
} }
} else { } else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null);
} }
} }
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException { InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == findBrokerResult) { if (null == snodeAddr){
this.mQClientFactory.updateSnodeInfoFromNameServer();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); snodeAddr= this.mQClientFactory.findSnodeAddressInPublish();
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
} }
if (findBrokerResult != null) { if (snodeAddr != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic()); requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName); requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueId(mq.getQueueId());
requestHeader.setEnodeName(mq.getBrokerName()); requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); snodeAddr, requestHeader, 1000 * 5);
} else { } else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); throw new MQClientException("Get Offset from broker[" + mq.getBrokerName() + "] failed, Snode is not exist", null);
} }
} }
} }
...@@ -627,10 +627,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -627,10 +627,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default: default:
break; break;
} }
this.tryToFindSnodePublishInfo();
this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
this.mQClientFactory.rebalanceImmediately(); this.mQClientFactory.rebalanceImmediately();
} }
...@@ -1138,4 +1138,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -1138,4 +1138,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService = consumeMessageService; this.consumeMessageService = consumeMessageService;
} }
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
} }
...@@ -84,7 +84,8 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -84,7 +84,8 @@ public class RebalancePushImpl extends RebalanceImpl {
} }
// notify broker // notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); //this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
this.getmQClientFactory().sendHeartbeatToAllSnodeWithLock();
} }
@Override @Override
......
...@@ -501,7 +501,8 @@ public class MQClientInstance { ...@@ -501,7 +501,8 @@ public class MQClientInstance {
} }
// may need to check one broker every cluster... // may need to check one broker every cluster...
// assume that the configs of every broker in cluster are the the same. // assume that the configs of every broker in cluster are the the same.
String addr = findBrokerAddrByTopic(subscriptionData.getTopic()); //String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
String addr = findSnodeAddressInPublish();
if (addr != null) { if (addr != null) {
try { try {
...@@ -1191,17 +1192,17 @@ public class MQClientInstance { ...@@ -1191,17 +1192,17 @@ public class MQClientInstance {
} }
public List<String> findConsumerIdList(final String topic, final String group) { public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic); String snodeAddr = this.findSnodeAddressInPublish();
if (null == brokerAddr) { if (null == snodeAddr) {
this.updateTopicRouteInfoFromNameServer(topic); this.updateSnodeInfoFromNameServer();
brokerAddr = this.findBrokerAddrByTopic(topic); snodeAddr = this.findSnodeAddressInPublish();
} }
if (null != brokerAddr) { if (null != snodeAddr) {
try { try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000); return this.mQClientAPIImpl.getConsumerIdListByGroup(snodeAddr, group, 3000);
} catch (Exception e) { } catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e); log.warn("getConsumerIdListByGroup exception, " + snodeAddr + " " + group, e);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册