提交 2669a5c4 编写于 作者: S ShannonDing

Support origin 4.x

上级 510bb34f
......@@ -50,6 +50,7 @@ public class ClientConfig {
private boolean useTLS = TlsSystemConfig.tlsEnable;
private LanguageCode language = LanguageCode.JAVA;
private boolean realPush = false;
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
......@@ -100,6 +101,7 @@ public class ClientConfig {
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
this.language = cc.language;
this.realPush = cc.realPush;
}
public ClientConfig cloneClientConfig() {
......@@ -116,6 +118,7 @@ public class ClientConfig {
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
cc.language = language;
cc.realPush = realPush;
return cc;
}
......@@ -199,12 +202,20 @@ public class ClientConfig {
this.language = language;
}
public boolean isRealPush() {
return realPush;
}
public void setRealPush(boolean realPush) {
this.realPush = realPush;
}
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", realPush=" + realPush + "]";
}
}
......@@ -273,6 +273,7 @@ public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPus
public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.setRealPush(true);
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
}
......@@ -287,6 +288,7 @@ public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPus
public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
this.setRealPush(true);
if (allocateMessageQueueStrategy == null) {
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
} else {
......@@ -310,6 +312,7 @@ public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPus
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.setRealPush(true);
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
......
......@@ -88,7 +88,12 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromSnode(mq);
long brokerOffset = 0L;
if (this.mQClientFactory.getClientConfig().isRealPush()) {
brokerOffset = this.fetchConsumeOffsetFromSnode(mq);
} else {
brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
}
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
......@@ -124,14 +129,18 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToSnode(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}",
if (this.mQClientFactory.getClientConfig().isRealPush()) {
this.updateConsumeOffsetToSnode(mq, offset.get());
} else {
this.updateConsumeOffsetToBroker(mq, offset.get());
}
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e);
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
......@@ -153,14 +162,18 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
try {
this.updateConsumeOffsetToSnode(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}",
if (this.mQClientFactory.getClientConfig().isRealPush()) {
this.updateConsumeOffsetToSnode(mq, offset.get());
} else {
this.updateConsumeOffsetToBroker(mq, offset.get());
}
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetTobroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e);
log.error("updateConsumeOffsetTobroker exception, " + mq.toString(), e);
}
}
}
......@@ -288,22 +301,23 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
*/
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (snodeAddr != null) {
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
snodeAddr, requestHeader, 1000 * 5);
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("Get Offset from broker[" + mq.getBrokerName() + "] failed, Snode is not exist", null);
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
}
......@@ -567,7 +567,7 @@ public class MQClientAPIImpl {
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQSnodeException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
......
......@@ -177,13 +177,24 @@ public class PullAPIWrapper {
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (snodeAddr == null) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (snodeAddr == null) {
throw new MQClientException("The snode addr is null.",null);
String addr = null;
if (this.mQClientFactory.getClientConfig().isRealPush()) {
addr = this.mQClientFactory.findSnodeAddressInPublish();
if (addr == null) {
this.mQClientFactory.updateSnodeInfoFromNameServer();
addr = this.mQClientFactory.findSnodeAddressInPublish();
}
if (addr == null) {
throw new MQClientException("The snode addr is null.", null);
}
} else {
addr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
addr = computPullFromWhichFilterServer(mq.getTopic(), addr);
}
if (addr == null) {
throw new MQClientException("The broker addr is null.", null);
}
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
......@@ -200,7 +211,7 @@ public class PullAPIWrapper {
requestHeader.setEnodeName(mq.getBrokerName());
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
snodeAddr,
addr,
requestHeader,
timeoutMillis,
communicationMode,
......
......@@ -38,6 +38,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.FindBrokerResult;
......@@ -80,6 +81,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -290,39 +292,44 @@ public class MQClientInstance {
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
try {
MQClientInstance.this.updateSnodeInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e);
if (MQClientInstance.this.clientConfig.isRealPush()) {
try {
MQClientInstance.this.updateSnodeInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e);
}
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
if (this.clientConfig.isRealPush()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//MQClientInstance.this.cleanOfflineSnode();
MQClientInstance.this.sendHeartbeatToAllSnodeWithLock();
} catch (Exception e) {
log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e);
@Override
public void run() {
try {
//MQClientInstance.this.cleanOfflineSnode();
MQClientInstance.this.sendHeartbeatToAllSnodeWithLock();
} catch (Exception e) {
log.error("ScheduledTask updateSnodeInfoFromNameServer exception", e);
}
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
//
// @Override
// public void run() {
// try {
// MQClientInstance.this.cleanOfflineBroker();
// MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
// } catch (Exception e) {
// log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
// }
// }
// }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
......@@ -500,9 +507,12 @@ public class MQClientInstance {
}
// may need to check one broker every cluster...
// assume that the configs of every broker in cluster are the the same.
//String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
String addr = findSnodeAddressInPublish();
String addr = null;
if (this.clientConfig.isRealPush()) {
addr = findSnodeAddressInPublish();
} else {
addr = findBrokerAddrByTopic(subscriptionData.getTopic());
}
if (addr != null) {
try {
this.getMQClientAPIImpl().checkClientInBroker(
......@@ -1016,20 +1026,47 @@ public class MQClientInstance {
}
private void unregisterClient(final String producerGroup, final String consumerGroup) {
Iterator<Entry<String, String>> it = this.snodeAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, String> entry = it.next();
String snodeName = entry.getKey();
String snodeAddr = entry.getValue();
if (!entry.getValue().isEmpty()) {
try {
this.mQClientAPIImpl.unregisterClient(snodeAddr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from snode[{} {}] success", producerGroup, consumerGroup, snodeName, snodeAddr);
} catch (Exception e) {
log.error("unregister client exception from snode: " + snodeAddr, e);
if (!this.clientConfig.isRealPush()) {
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
String addr = entry1.getValue();
if (addr != null) {
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (InterruptedException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
}
}
}
}
}
} else {
Iterator<Entry<String, String>> it = this.snodeAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, String> entry = it.next();
String snodeName = entry.getKey();
String snodeAddr = entry.getValue();
if (!entry.getValue().isEmpty()) {
try {
this.mQClientAPIImpl.unregisterClient(snodeAddr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from snode[{} {}] success", producerGroup, consumerGroup, snodeName, snodeAddr);
} catch (Exception e) {
log.error("unregister client exception from snode: " + snodeAddr, e);
}
}
}
}
}
......@@ -1192,17 +1229,26 @@ public class MQClientInstance {
}
public List<String> findConsumerIdList(final String topic, final String group) {
String snodeAddr = this.findSnodeAddressInPublish();
if (null == snodeAddr) {
this.updateSnodeInfoFromNameServer();
snodeAddr = this.findSnodeAddressInPublish();
String addr = null;
if (this.clientConfig.isRealPush()) {
addr = this.findSnodeAddressInPublish();
if (null == addr) {
this.updateSnodeInfoFromNameServer();
addr = this.findSnodeAddressInPublish();
}
} else {
addr = this.findBrokerAddrByTopic(topic);
if (null == addr) {
this.updateTopicRouteInfoFromNameServer(topic);
addr = this.findBrokerAddrByTopic(topic);
}
}
if (null != snodeAddr) {
if (null != addr) {
try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(snodeAddr, group, 3000);
return this.mQClientAPIImpl.getConsumerIdListByGroup(addr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + snodeAddr + " " + group, e);
log.warn("getConsumerIdListByGroup exception, " + addr + " " + group, e);
}
}
......@@ -1395,4 +1441,8 @@ public class MQClientInstance {
return false;
}
public org.apache.rocketmq.client.ClientConfig getClientConfig() {
return clientConfig;
}
}
......@@ -193,10 +193,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (startFactory) {
mQClientFactory.start();
log.info("Update Snode Info for the first time.");
mQClientFactory.updateSnodeInfoFromNameServer();
log.info("Send heartbeat to Snode Info for the first time.");
mQClientFactory.sendHeartbeatToAllSnodeWithLock();
if (this.defaultMQProducer.isRealPush()) {
log.info("Update Snode Info for the first time.");
mQClientFactory.updateSnodeInfoFromNameServer();
log.info("Send heartbeat to Snode Info for the first time.");
mQClientFactory.sendHeartbeatToAllSnodeWithLock();
}
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
......@@ -214,7 +216,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break;
}
// this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
if (!this.defaultMQProducer.isRealPush()) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
private void checkConfig() throws MQClientException {
......@@ -690,14 +694,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == snodeAddr) {
tryToFindSnodePublishInfo();
snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
String addr = null;
if (this.defaultMQProducer.isRealPush()) {
addr = this.mQClientFactory.findSnodeAddressInPublish();
if (null == addr) {
tryToFindSnodePublishInfo();
addr = this.mQClientFactory.findSnodeAddressInPublish();
}
} else {
addr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == addr) {
tryToFindTopicPublishInfo(mq.getTopic());
addr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
}
SendMessageContext context = null;
if (snodeAddr != null) {
if (addr != null) {
//brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
......@@ -724,7 +736,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(snodeAddr);
checkForbiddenContext.setBrokerAddr(addr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
......@@ -737,7 +749,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(snodeAddr);
context.setBrokerAddr(addr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -795,7 +807,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
snodeAddr,
addr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
......@@ -815,7 +827,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
snodeAddr,
addr,
mq.getBrokerName(),
msg,
requestHeader,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册