提交 245146b2 编写于 作者: S ShannonDing

Support link to snode

上级 36481471
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
public class MQSnodeException extends MQBrokerException {
public MQSnodeException(int responseCode, String errorMessage) {
super(responseCode,errorMessage);
}
}
...@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.PullResult; ...@@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQSnodeException;
import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
...@@ -75,6 +76,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; ...@@ -75,6 +76,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.TopicList;
...@@ -561,7 +563,7 @@ public class MQClientAPIImpl { ...@@ -561,7 +563,7 @@ public class MQClientAPIImpl {
final long timeoutMillis, final long timeoutMillis,
final CommunicationMode communicationMode, final CommunicationMode communicationMode,
final PullCallback pullCallback final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException { ) throws RemotingException, MQSnodeException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
switch (communicationMode) { switch (communicationMode) {
case ONEWAY: case ONEWAY:
...@@ -616,14 +618,14 @@ public class MQClientAPIImpl { ...@@ -616,14 +618,14 @@ public class MQClientAPIImpl {
final String addr, final String addr,
final RemotingCommand request, final RemotingCommand request,
final long timeoutMillis final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException { ) throws RemotingException, InterruptedException, MQSnodeException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null; assert response != null;
return this.processPullResponse(response); return this.processPullResponse(response);
} }
private PullResult processPullResponse( private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException, RemotingCommandException { final RemotingCommand response) throws MQSnodeException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG; PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: case ResponseCode.SUCCESS:
...@@ -640,7 +642,7 @@ public class MQClientAPIImpl { ...@@ -640,7 +642,7 @@ public class MQClientAPIImpl {
break; break;
default: default:
throw new MQBrokerException(response.getCode(), response.getRemark()); throw new MQSnodeException(response.getCode(), response.getRemark());
} }
PullMessageResponseHeader responseHeader = PullMessageResponseHeader responseHeader =
...@@ -1189,6 +1191,25 @@ public class MQClientAPIImpl { ...@@ -1189,6 +1191,25 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark()); throw new MQBrokerException(response.getCode(), response.getRemark());
} }
public SnodeClusterInfo getSnodeClusterInfo(
//Todo Redifine snode exception
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException , MQBrokerException{
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return SnodeClusterInfo.decode(response.getBody(), SnodeClusterInfo.class);
}
default:
break;
}
throw new MQSnodeException(response.getCode(), response.getRemark());
}
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException { throws RemotingException, MQClientException, InterruptedException {
......
...@@ -497,9 +497,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -497,9 +497,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException { throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try { try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg,
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) { } catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
......
...@@ -177,7 +177,14 @@ public class PullAPIWrapper { ...@@ -177,7 +177,14 @@ public class PullAPIWrapper {
if (findBrokerResult.isSlave()) { if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
} }
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (snodeAddr == null) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr == null) {
throw new MQClientException("The snode addr is null.",null);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic()); requestHeader.setTopic(mq.getTopic());
...@@ -190,14 +197,10 @@ public class PullAPIWrapper { ...@@ -190,14 +197,10 @@ public class PullAPIWrapper {
requestHeader.setSubscription(subExpression); requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion); requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType); requestHeader.setExpressionType(expressionType);
requestHeader.setEnodeName(mq.getBrokerName());
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr, snodeAddr,
requestHeader, requestHeader,
timeoutMillis, timeoutMillis,
communicationMode, communicationMode,
......
...@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; ...@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
...@@ -66,6 +67,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -66,6 +67,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
...@@ -73,6 +75,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; ...@@ -73,6 +75,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
...@@ -99,8 +102,12 @@ public class MQClientInstance { ...@@ -99,8 +102,12 @@ public class MQClientInstance {
private final Lock lockHeartbeat = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>(); new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentMap<String/* Snode Name */, String/* address */> snodeAddrTable =
new ConcurrentHashMap<String, String>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>(); new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ConcurrentMap<String/* Snode Name */, HashMap<String/* address */, Integer>> snodeVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -116,6 +123,7 @@ public class MQClientInstance { ...@@ -116,6 +123,7 @@ public class MQClientInstance {
private ServiceState serviceState = ServiceState.CREATE_JUST; private ServiceState serviceState = ServiceState.CREATE_JUST;
private DatagramSocket datagramSocket; private DatagramSocket datagramSocket;
private Random random = new Random(); private Random random = new Random();
private volatile ThreadLocalIndex whitchSnodeIndex = new ThreadLocalIndex();
public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId) { public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId) {
this(clientConfig, instanceIndex, clientId, null); this(clientConfig, instanceIndex, clientId, null);
...@@ -255,6 +263,10 @@ public class MQClientInstance { ...@@ -255,6 +263,10 @@ public class MQClientInstance {
} }
} }
private void doFetchNameServerAddr() {
}
private void startScheduledTask() { private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) { if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
...@@ -279,6 +291,11 @@ public class MQClientInstance { ...@@ -279,6 +291,11 @@ public class MQClientInstance {
} catch (Exception e) { } catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
} }
try {
MQClientInstance.this.updateSnodeInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e);
}
} }
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
...@@ -287,14 +304,27 @@ public class MQClientInstance { ...@@ -287,14 +304,27 @@ public class MQClientInstance {
@Override @Override
public void run() { public void run() {
try { try {
MQClientInstance.this.cleanOfflineBroker(); //MQClientInstance.this.cleanOfflineSnode();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); MQClientInstance.this.sendHeartbeatToAllSnodeWithLock();
} catch (Exception e) { } catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e);
} }
} }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
//
// @Override
// public void run() {
// try {
// MQClientInstance.this.cleanOfflineBroker();
// MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
// } catch (Exception e) {
// log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
// }
// }
// }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
...@@ -324,6 +354,52 @@ public class MQClientInstance { ...@@ -324,6 +354,52 @@ public class MQClientInstance {
return clientId; return clientId;
} }
public boolean updateSnodeInfoFromNameServer() {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
SnodeClusterInfo snodeClusterInfo;
snodeClusterInfo = this.mQClientAPIImpl.getSnodeClusterInfo(1000 * 3);
if (snodeClusterInfo != null) {
HashMap<String, SnodeData> snodeTable = snodeClusterInfo.getSnodeTable();
Iterator<Entry<String, String>> snodeIter = this.snodeAddrTable.entrySet().iterator();
while (snodeIter.hasNext()) {
Entry<String, String> entry = snodeIter.next();
String snodeName = entry.getKey();
if (!snodeTable.containsKey(snodeName)) {
snodeIter.remove();
log.info("snodeAddrTable.remove. Snode Name = {}, Snode Addr:[{}]", entry.getKey(), entry.getKey());
}
}
for (Map.Entry<String, SnodeData> entry : snodeTable.entrySet()) {
SnodeData snodeData = entry.getValue();
if (snodeData != null) {
this.snodeAddrTable.put(entry.getKey(), snodeData.getAddress());
log.debug("snodeAddrTable.put. Snode Name = {}, Snode Addr:[{}]", entry.getKey(), snodeData.getAddress());
}
}
return true;
} else {
//this.snodeAddrTable.clear();
log.warn("updateSnodeInfoFromNameServer, getSnodeInfoFromNameServer return null.");
//return true;
}
} catch (Exception e) {
log.warn("updateSnodeInfoFromNameServer Exception", e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateSnodeInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateSnodeInfoFromNameServer Exception", e);
}
return false;
}
public void updateTopicRouteInfoFromNameServer() { public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>(); Set<String> topicList = new HashSet<String>();
...@@ -462,6 +538,20 @@ public class MQClientInstance { ...@@ -462,6 +538,20 @@ public class MQClientInstance {
} }
} }
public void sendHeartbeatToAllSnodeWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllSnode();
} catch (final Exception e) {
log.error("sendHeartbeatToAllSnodeWithLock exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
private void persistAllConsumerOffset() { private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
...@@ -510,6 +600,46 @@ public class MQClientInstance { ...@@ -510,6 +600,46 @@ public class MQClientInstance {
return false; return false;
} }
private void sendHeartbeatToAllSnode() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.snodeAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, String>> it = this.snodeAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, String> entry = it.next();
String snodeName = entry.getKey();
String snodeAddr = entry.getValue();
if (snodeAddr != null) {
if (consumerEmpty) {
continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(snodeAddr, heartbeatData, 3000);
if (!this.snodeVersionTable.containsKey(snodeName)) {
this.snodeVersionTable.put(snodeName, new HashMap<String, Integer>(4));
}
this.snodeVersionTable.get(snodeName).put(snodeAddr, version);
if (times % 20 == 0) {
log.info("send heart beat to Snode[{} {}] success", snodeName, snodeAddr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
log.info("send heart beat to Snode[{} {}] failed", snodeName, snodeAddr);
}
}
}
}
}
private void sendHeartbeatToAllBroker() { private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
...@@ -886,29 +1016,20 @@ public class MQClientInstance { ...@@ -886,29 +1016,20 @@ public class MQClientInstance {
} }
private void unregisterClient(final String producerGroup, final String consumerGroup) { private void unregisterClient(final String producerGroup, final String consumerGroup) {
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); Iterator<Entry<String, String>> it = this.snodeAddrTable.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next(); Entry<String, String> entry = it.next();
String brokerName = entry.getKey(); String snodeName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue(); String snodeAddr = entry.getValue();
if (!entry.getValue().isEmpty()) {
if (oneTable != null) { try {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { this.mQClientAPIImpl.unregisterClient(snodeAddr, this.clientId, producerGroup, consumerGroup, 3000);
String addr = entry1.getValue(); log.info("unregister client[Producer: {} Consumer: {}] from snode[{} {}] success", producerGroup, consumerGroup, snodeName, snodeAddr);
if (addr != null) { } catch (Exception e) {
try { log.error("unregister client exception from snode: " + snodeAddr, e);
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (InterruptedException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
}
}
} }
} }
} }
} }
...@@ -1013,6 +1134,23 @@ public class MQClientInstance { ...@@ -1013,6 +1134,23 @@ public class MQClientInstance {
return null; return null;
} }
public String findSnodeAddressInPublish() {
if (this.snodeAddrTable.size() == 0) {
return null;
}
int index = this.whitchSnodeIndex.getAndIncrement();
int pos = Math.abs(index) % this.snodeAddrTable.size();
if (pos < 0) {
pos = 0;
}
for (String snode : this.snodeAddrTable.keySet()) {
if (pos == 0)
return this.snodeAddrTable.get(snode);
pos--;
}
return null;
}
public FindBrokerResult findBrokerAddressInSubscribe( public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName, final String brokerName,
final long brokerId, final long brokerId,
......
...@@ -171,6 +171,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -171,6 +171,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (startFactory) { if (startFactory) {
mQClientFactory.start(); mQClientFactory.start();
log.info("Update Snode Info for the first time.");
mQClientFactory.updateSnodeInfoFromNameServer();
log.info("Send heartbeat to Snode Info for the first time.");
mQClientFactory.sendHeartbeatToAllSnodeWithLock();
} }
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
...@@ -188,7 +192,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -188,7 +192,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break; break;
} }
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
} }
private void checkConfig() throws MQClientException { private void checkConfig() throws MQClientException {
...@@ -652,6 +656,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -652,6 +656,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return topicPublishInfo; return topicPublishInfo;
} }
} }
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
private SendResult sendKernelImpl(final Message msg, private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq, final MessageQueue mq,
...@@ -660,14 +667,15 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -660,14 +667,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo, final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
tryToFindTopicPublishInfo(mq.getTopic()); if (null == snodeAddr) {
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); tryToFindSnodePublishInfo();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
} }
SendMessageContext context = null; SendMessageContext context = null;
if (brokerAddr != null) { if (snodeAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); //brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody(); byte[] prevBody = msg.getBody();
try { try {
...@@ -693,7 +701,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -693,7 +701,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setBrokerAddr(snodeAddr);
checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq); checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode()); checkForbiddenContext.setUnitMode(this.isUnitMode());
...@@ -706,7 +714,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -706,7 +714,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode); context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr); context.setBrokerAddr(snodeAddr);
context.setMessage(msg); context.setMessage(msg);
context.setMq(mq); context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
...@@ -764,7 +772,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -764,7 +772,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
} }
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, snodeAddr,
mq.getBrokerName(), mq.getBrokerName(),
tmpMessage, tmpMessage,
requestHeader, requestHeader,
...@@ -784,7 +792,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -784,7 +792,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
} }
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr, snodeAddr,
mq.getBrokerName(), mq.getBrokerName(),
msg, msg,
requestHeader, requestHeader,
......
...@@ -36,6 +36,7 @@ public class Consumer { ...@@ -36,6 +36,7 @@ public class Consumer {
* Instantiate with specified consumer group name. * Instantiate with specified consumer group name.
*/ */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5");
consumer.setNamesrvAddr("139.196.101.149:9876");
/* /*
* Specify name server addresses. * Specify name server addresses.
......
...@@ -32,11 +32,11 @@ public class Producer { ...@@ -32,11 +32,11 @@ public class Producer {
* Instantiate with a producer group name. * Instantiate with a producer group name.
*/ */
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("139.196.101.149:9876");
/* /*
* Specify name server addresses. * Specify name server addresses.
* <p/> * <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre> * <pre>
* {@code * {@code
...@@ -76,7 +76,7 @@ public class Producer { ...@@ -76,7 +76,7 @@ public class Producer {
/* /*
* Shut down once the producer instance is not longer in use. * Shut down once the producer instance is not longer in use.
*/ */
Thread.sleep(100000000000L); Thread.sleep(3000L);
producer.shutdown(); producer.shutdown();
} }
} }
...@@ -31,6 +31,7 @@ public class AsyncProducer { ...@@ -31,6 +31,7 @@ public class AsyncProducer {
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("139.196.101.149:9876");
producer.start(); producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0); producer.setRetryTimesWhenSendAsyncFailed(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册