entry = it.next();
+ String snodeName = entry.getKey();
+ String snodeAddr = entry.getValue();
+ if (!entry.getValue().isEmpty()) {
+ try {
+ this.mQClientAPIImpl.unregisterClient(snodeAddr, this.clientId, producerGroup, consumerGroup, 3000);
+ log.info("unregister client[Producer: {} Consumer: {}] from snode[{} {}] success", producerGroup, consumerGroup, snodeName, snodeAddr);
+ } catch (Exception e) {
+ log.error("unregister client exception from snode: " + snodeAddr, e);
}
}
+
}
}
@@ -1013,6 +1134,23 @@ public class MQClientInstance {
return null;
}
+ public String findSnodeAddressInPublish() {
+ if (this.snodeAddrTable.size() == 0) {
+ return null;
+ }
+ int index = this.whitchSnodeIndex.getAndIncrement();
+ int pos = Math.abs(index) % this.snodeAddrTable.size();
+ if (pos < 0) {
+ pos = 0;
+ }
+ for (String snode : this.snodeAddrTable.keySet()) {
+ if (pos == 0)
+ return this.snodeAddrTable.get(snode);
+ pos--;
+ }
+ return null;
+ }
+
public FindBrokerResult findBrokerAddressInSubscribe(
final String brokerName,
final long brokerId,
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 291834f01b603b2f3e311088731d05370fa8eb21..822fd81cbf46142415c364c227803dbb8e487bf5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -171,6 +171,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (startFactory) {
mQClientFactory.start();
+ log.info("Update Snode Info for the first time.");
+ mQClientFactory.updateSnodeInfoFromNameServer();
+ log.info("Send heartbeat to Snode Info for the first time.");
+ mQClientFactory.sendHeartbeatToAllSnodeWithLock();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
@@ -188,7 +192,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break;
}
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+// this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
private void checkConfig() throws MQClientException {
@@ -652,6 +656,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return topicPublishInfo;
}
}
+ private void tryToFindSnodePublishInfo() {
+ this.mQClientFactory.updateSnodeInfoFromNameServer();
+ }
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
@@ -660,14 +667,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+
+ String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
+ if (null == snodeAddr) {
+ tryToFindSnodePublishInfo();
+ snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
SendMessageContext context = null;
- if (brokerAddr != null) {
- brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
+ if (snodeAddr != null) {
+ //brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
@@ -693,7 +701,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
+ checkForbiddenContext.setBrokerAddr(snodeAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
@@ -706,7 +714,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
+ context.setBrokerAddr(snodeAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -764,7 +772,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
+ snodeAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
@@ -784,7 +792,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
+ snodeAddr,
mq.getBrokerName(),
msg,
requestHeader,
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index f9798d0ea44f27fda305af6623fe822f2756fec8..665fede21e35afa6e6a0dbbde1c0e139ec261844 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -36,6 +36,7 @@ public class Consumer {
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5");
+ consumer.setNamesrvAddr("139.196.101.149:9876");
/*
* Specify name server addresses.
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index b87aada28203241d4223776ec52e150f99c649f7..8cb7548db7efd277370076830b93f895de00c50a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -32,11 +32,11 @@ public class Producer {
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ producer.setNamesrvAddr("139.196.101.149:9876");
/*
* Specify name server addresses.
*
- *
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
*
* {@code
@@ -76,7 +76,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
- Thread.sleep(100000000000L);
+ Thread.sleep(3000L);
producer.shutdown();
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
index d40739c813d4ad2a7925a76d2f3b6164e025fb8e..9150974a5c5cf8a12a60e4a1ce4d8276a4ae22cd 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
@@ -31,6 +31,7 @@ public class AsyncProducer {
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
+ producer.setNamesrvAddr("139.196.101.149:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);