From 245146b209d2e90e09b6cc75f1a669b1734163c7 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Wed, 30 Jan 2019 23:23:00 +0800 Subject: [PATCH] Support link to snode --- .../client/exception/MQSnodeException.java | 27 +++ .../rocketmq/client/impl/MQClientAPIImpl.java | 29 ++- .../consumer/DefaultMQPushConsumerImpl.java | 5 +- .../client/impl/consumer/PullAPIWrapper.java | 17 +- .../client/impl/factory/MQClientInstance.java | 184 +++++++++++++++--- .../impl/producer/DefaultMQProducerImpl.java | 30 +-- .../rocketmq/example/quickstart/Consumer.java | 1 + .../rocketmq/example/quickstart/Producer.java | 4 +- .../example/simple/AsyncProducer.java | 1 + 9 files changed, 248 insertions(+), 50 deletions(-) create mode 100644 client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java new file mode 100644 index 00000000..440e5779 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.exception; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.help.FAQUrl; + +public class MQSnodeException extends MQBrokerException { + + public MQSnodeException(int responseCode, String errorMessage) { + super(responseCode,errorMessage); + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 5b4d3181..0080f1ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.MQSnodeException; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -75,6 +76,7 @@ import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; @@ -561,7 +563,7 @@ public class MQClientAPIImpl { final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback - ) throws RemotingException, MQBrokerException, InterruptedException { + ) throws RemotingException, MQSnodeException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: @@ -616,14 +618,14 @@ public class MQClientAPIImpl { final String addr, final RemotingCommand request, final long timeoutMillis - ) throws RemotingException, InterruptedException, MQBrokerException { + ) throws RemotingException, InterruptedException, MQSnodeException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processPullResponse(response); } private PullResult processPullResponse( - final RemotingCommand response) throws MQBrokerException, RemotingCommandException { + final RemotingCommand response) throws MQSnodeException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: @@ -640,7 +642,7 @@ public class MQClientAPIImpl { break; default: - throw new MQBrokerException(response.getCode(), response.getRemark()); + throw new MQSnodeException(response.getCode(), response.getRemark()); } PullMessageResponseHeader responseHeader = @@ -1189,6 +1191,25 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } + public SnodeClusterInfo getSnodeClusterInfo( + //Todo Redifine snode exception + final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException , MQBrokerException{ + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SNODE_CLUSTER_INFO, null); + + RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return SnodeClusterInfo.decode(response.getBody(), SnodeClusterInfo.class); + } + default: + break; + } + + throw new MQSnodeException(response.getCode(), response.getRemark()); + } + public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 2f3cc97e..30c9a4d9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -497,9 +497,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { - String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) - : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, brokerAddr, msg, + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index b650e35e..e549ce5e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -177,7 +177,14 @@ public class PullAPIWrapper { if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } - + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (snodeAddr == null) { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + } + if (snodeAddr == null) { + throw new MQClientException("The snode addr is null.",null); + } PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); @@ -190,14 +197,10 @@ public class PullAPIWrapper { requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); - - String brokerAddr = findBrokerResult.getBrokerAddr(); - if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { - brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); - } + requestHeader.setEnodeName(mq.getBrokerName()); PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( - brokerAddr, + snodeAddr, requestHeader, timeoutMillis, communicationMode, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 321f804d..3c34fdba 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.admin.MQAdminExtInner; +import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.ClientRemotingProcessor; @@ -66,6 +67,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; @@ -73,6 +75,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.SnodeData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.ClientConfig; @@ -99,8 +102,12 @@ public class MQClientInstance { private final Lock lockHeartbeat = new ReentrantLock(); private final ConcurrentMap> brokerAddrTable = new ConcurrentHashMap>(); + private final ConcurrentMap snodeAddrTable = + new ConcurrentHashMap(); private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap>(); + private final ConcurrentMap> snodeVersionTable = + new ConcurrentHashMap>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -116,6 +123,7 @@ public class MQClientInstance { private ServiceState serviceState = ServiceState.CREATE_JUST; private DatagramSocket datagramSocket; private Random random = new Random(); + private volatile ThreadLocalIndex whitchSnodeIndex = new ThreadLocalIndex(); public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId) { this(clientConfig, instanceIndex, clientId, null); @@ -255,6 +263,10 @@ public class MQClientInstance { } } + private void doFetchNameServerAddr() { + + } + private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -279,6 +291,11 @@ public class MQClientInstance { } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } + try { + MQClientInstance.this.updateSnodeInfoFromNameServer(); + } catch (Exception e) { + log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e); + } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); @@ -287,14 +304,27 @@ public class MQClientInstance { @Override public void run() { try { - MQClientInstance.this.cleanOfflineBroker(); - MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); + //MQClientInstance.this.cleanOfflineSnode(); + MQClientInstance.this.sendHeartbeatToAllSnodeWithLock(); } catch (Exception e) { - log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); + log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); +// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { +// +// @Override +// public void run() { +// try { +// MQClientInstance.this.cleanOfflineBroker(); +// MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); +// } catch (Exception e) { +// log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); +// } +// } +// }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override @@ -324,6 +354,52 @@ public class MQClientInstance { return clientId; } + public boolean updateSnodeInfoFromNameServer() { + try { + if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + SnodeClusterInfo snodeClusterInfo; + snodeClusterInfo = this.mQClientAPIImpl.getSnodeClusterInfo(1000 * 3); + if (snodeClusterInfo != null) { + HashMap snodeTable = snodeClusterInfo.getSnodeTable(); + Iterator> snodeIter = this.snodeAddrTable.entrySet().iterator(); + while (snodeIter.hasNext()) { + Entry entry = snodeIter.next(); + String snodeName = entry.getKey(); + if (!snodeTable.containsKey(snodeName)) { + snodeIter.remove(); + log.info("snodeAddrTable.remove. Snode Name = {}, Snode Addr:[{}]", entry.getKey(), entry.getKey()); + } + } + for (Map.Entry entry : snodeTable.entrySet()) { + SnodeData snodeData = entry.getValue(); + if (snodeData != null) { + this.snodeAddrTable.put(entry.getKey(), snodeData.getAddress()); + log.debug("snodeAddrTable.put. Snode Name = {}, Snode Addr:[{}]", entry.getKey(), snodeData.getAddress()); + } + } + return true; + } else { + //this.snodeAddrTable.clear(); + log.warn("updateSnodeInfoFromNameServer, getSnodeInfoFromNameServer return null."); + //return true; + } + } catch (Exception e) { + log.warn("updateSnodeInfoFromNameServer Exception", e); + } finally { + this.lockNamesrv.unlock(); + } + } else { + log.warn("updateSnodeInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + log.warn("updateSnodeInfoFromNameServer Exception", e); + } + + return false; + + } + public void updateTopicRouteInfoFromNameServer() { Set topicList = new HashSet(); @@ -462,6 +538,20 @@ public class MQClientInstance { } } + public void sendHeartbeatToAllSnodeWithLock() { + if (this.lockHeartbeat.tryLock()) { + try { + this.sendHeartbeatToAllSnode(); + } catch (final Exception e) { + log.error("sendHeartbeatToAllSnodeWithLock exception", e); + } finally { + this.lockHeartbeat.unlock(); + } + } else { + log.warn("lock heartBeat, but failed."); + } + } + private void persistAllConsumerOffset() { Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { @@ -510,6 +600,46 @@ public class MQClientInstance { return false; } + private void sendHeartbeatToAllSnode() { + final HeartbeatData heartbeatData = this.prepareHeartbeatData(); + final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); + final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); + if (producerEmpty && consumerEmpty) { + log.warn("sending heartbeat, but no consumer and no producer"); + return; + } + + if (!this.snodeAddrTable.isEmpty()) { + long times = this.sendHeartbeatTimesTotal.getAndIncrement(); + Iterator> it = this.snodeAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + String snodeName = entry.getKey(); + String snodeAddr = entry.getValue(); + if (snodeAddr != null) { + if (consumerEmpty) { + continue; + } + + try { + int version = this.mQClientAPIImpl.sendHearbeat(snodeAddr, heartbeatData, 3000); + if (!this.snodeVersionTable.containsKey(snodeName)) { + this.snodeVersionTable.put(snodeName, new HashMap(4)); + } + this.snodeVersionTable.get(snodeName).put(snodeAddr, version); + if (times % 20 == 0) { + log.info("send heart beat to Snode[{} {}] success", snodeName, snodeAddr); + log.info(heartbeatData.toString()); + } + } catch (Exception e) { + log.info("send heart beat to Snode[{} {}] failed", snodeName, snodeAddr); + } + + } + } + } + } + private void sendHeartbeatToAllBroker() { final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); @@ -886,29 +1016,20 @@ public class MQClientInstance { } private void unregisterClient(final String producerGroup, final String consumerGroup) { - Iterator>> it = this.brokerAddrTable.entrySet().iterator(); + Iterator> it = this.snodeAddrTable.entrySet().iterator(); while (it.hasNext()) { - Entry> entry = it.next(); - String brokerName = entry.getKey(); - HashMap oneTable = entry.getValue(); - - if (oneTable != null) { - for (Map.Entry entry1 : oneTable.entrySet()) { - String addr = entry1.getValue(); - if (addr != null) { - try { - this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); - log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); - } catch (RemotingException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (InterruptedException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (MQBrokerException e) { - log.error("unregister client exception from broker: " + addr, e); - } - } + Entry entry = it.next(); + String snodeName = entry.getKey(); + String snodeAddr = entry.getValue(); + if (!entry.getValue().isEmpty()) { + try { + this.mQClientAPIImpl.unregisterClient(snodeAddr, this.clientId, producerGroup, consumerGroup, 3000); + log.info("unregister client[Producer: {} Consumer: {}] from snode[{} {}] success", producerGroup, consumerGroup, snodeName, snodeAddr); + } catch (Exception e) { + log.error("unregister client exception from snode: " + snodeAddr, e); } } + } } @@ -1013,6 +1134,23 @@ public class MQClientInstance { return null; } + public String findSnodeAddressInPublish() { + if (this.snodeAddrTable.size() == 0) { + return null; + } + int index = this.whitchSnodeIndex.getAndIncrement(); + int pos = Math.abs(index) % this.snodeAddrTable.size(); + if (pos < 0) { + pos = 0; + } + for (String snode : this.snodeAddrTable.keySet()) { + if (pos == 0) + return this.snodeAddrTable.get(snode); + pos--; + } + return null; + } + public FindBrokerResult findBrokerAddressInSubscribe( final String brokerName, final long brokerId, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 291834f0..822fd81c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -171,6 +171,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (startFactory) { mQClientFactory.start(); + log.info("Update Snode Info for the first time."); + mQClientFactory.updateSnodeInfoFromNameServer(); + log.info("Send heartbeat to Snode Info for the first time."); + mQClientFactory.sendHeartbeatToAllSnodeWithLock(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), @@ -188,7 +192,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { break; } - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); +// this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } private void checkConfig() throws MQClientException { @@ -652,6 +656,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { return topicPublishInfo; } } + private void tryToFindSnodePublishInfo() { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + } private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, @@ -660,14 +667,15 @@ public class DefaultMQProducerImpl implements MQProducerInner { final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - tryToFindTopicPublishInfo(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); + + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (null == snodeAddr) { + tryToFindSnodePublishInfo(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } SendMessageContext context = null; - if (brokerAddr != null) { - brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); + if (snodeAddr != null) { + //brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { @@ -693,7 +701,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); - checkForbiddenContext.setBrokerAddr(brokerAddr); + checkForbiddenContext.setBrokerAddr(snodeAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); @@ -706,7 +714,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); - context.setBrokerAddr(brokerAddr); + context.setBrokerAddr(snodeAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); @@ -764,7 +772,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( - brokerAddr, + snodeAddr, mq.getBrokerName(), tmpMessage, requestHeader, @@ -784,7 +792,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( - brokerAddr, + snodeAddr, mq.getBrokerName(), msg, requestHeader, diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index f9798d0e..665fede2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -36,6 +36,7 @@ public class Consumer { * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5"); + consumer.setNamesrvAddr("139.196.101.149:9876"); /* * Specify name server addresses. diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index b87aada2..8cb7548d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -32,11 +32,11 @@ public class Producer { * Instantiate with a producer group name. */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.setNamesrvAddr("139.196.101.149:9876"); /* * Specify name server addresses. *

- * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR *

          * {@code
@@ -76,7 +76,7 @@ public class Producer {
         /*
          * Shut down once the producer instance is not longer in use.
          */
-        Thread.sleep(100000000000L);
+        Thread.sleep(3000L);
         producer.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
index d40739c8..9150974a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
@@ -31,6 +31,7 @@ public class AsyncProducer {
         String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
 
         DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
+        producer.setNamesrvAddr("139.196.101.149:9876");
         producer.start();
         producer.setRetryTimesWhenSendAsyncFailed(0);
 
-- 
GitLab