From e74422252719cfae89e13b0231f52f1a6538db48 Mon Sep 17 00:00:00 2001 From: fuyou001 Date: Wed, 7 Mar 2018 14:38:54 +0800 Subject: [PATCH] [ROCKETMQ-319] Improve broker register performance and reduce memory usage (#205) --- .../rocketmq/broker/BrokerController.java | 62 ++++-- .../rocketmq/broker/out/BrokerOuterAPI.java | 127 ++++++++++-- .../processor/AdminBrokerProcessor.java | 4 +- .../broker/topic/TopicConfigManager.java | 8 +- .../rocketmq/broker/BrokerOuterAPITest.java | 191 ++++++++++++++++++ .../apache/rocketmq/common/BrokerConfig.java | 25 ++- .../apache/rocketmq/common/DataVersion.java | 9 + .../rocketmq/common/ThreadFactoryImpl.java | 11 +- .../rocketmq/common/protocol/RequestCode.java | 2 + .../protocol/body/RegisterBrokerBody.java | 153 ++++++++++++++ .../QueryDataVersionRequestHeader.java | 70 +++++++ .../QueryDataVersionResponseHeader.java | 48 +++++ .../namesrv/RegisterBrokerRequestHeader.java | 11 +- .../common/RegisterBrokerBodyTest.java | 51 +++++ .../processor/DefaultRequestProcessor.java | 35 +++- .../namesrv/routeinfo/RouteInfoManager.java | 19 +- 16 files changed, 775 insertions(+), 51 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java create mode 100644 common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 409b1d06..60f287af 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -729,14 +729,14 @@ public class BrokerController { this.filterServerManager.start(); } - this.registerBrokerAll(true, false); + this.registerBrokerAll(true, false, true); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { - BrokerController.this.registerBrokerAll(true, false); + BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } @@ -752,7 +752,7 @@ public class BrokerController { } } - public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) @@ -767,28 +767,56 @@ public class BrokerController { topicConfigWrapper.setTopicConfigTable(topicConfigTable); } - RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), + if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills()); - - if (registerBrokerResult != null) { - if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { - this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + this.brokerConfig.getRegisterBrokerTimeoutMills())) { + List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills(), + this.brokerConfig.isCompressedRegister()); + + if (registerBrokerResultList.size() > 0) { + RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); + if (registerBrokerResult != null) { + if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { + this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + } + + this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + + if (checkOrderConfig) { + this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); + } + } } + } + } - this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + private boolean needRegister(final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final int timeoutMills) { - if (checkOrderConfig) { - this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); + TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); + List changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); + boolean needRegister = false; + for (Boolean changed : changeList) { + if (changed) { + needRegister = true; + break; } } + return needRegister; } public TopicConfigManager getTopicConfigManager() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 87c00a3f..262e2d2c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -16,11 +16,19 @@ */ package org.apache.rocketmq.broker.out; +import com.google.common.collect.Lists; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -33,6 +41,8 @@ import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; @@ -52,6 +62,8 @@ public class BrokerOuterAPI { private final RemotingClient remotingClient; private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr()); private String nameSrvAddr = null; + private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue(32), new ThreadFactoryImpl("brokerOutApi_thread_", true)); public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { this(nettyClientConfig, null); @@ -97,7 +109,7 @@ public class BrokerOuterAPI { this.remotingClient.updateNameServerAddressList(lst); } - public RegisterBrokerResult registerBrokerAll( + public List registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, @@ -106,27 +118,41 @@ public class BrokerOuterAPI { final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final boolean oneway, - final int timeoutMills) { - RegisterBrokerResult registerBrokerResult = null; + final int timeoutMills, + final boolean compressed) { + final List registerBrokerResultList = Lists.newArrayList(); List nameServerAddressList = this.remotingClient.getNameServerAddressList(); - if (nameServerAddressList != null) { - for (String namesrvAddr : nameServerAddressList) { - try { - RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, - haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); - if (result != null) { - registerBrokerResult = result; + if (nameServerAddressList != null && nameServerAddressList.size() > 0) { + final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); + for (final String namesrvAddr : nameServerAddressList) { + brokerOuterExecutor.execute(new Runnable() { + @Override + public void run() { + try { + RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, + haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed); + if (result != null) { + registerBrokerResultList.add(result); + } + + log.info("register broker to name server {} OK", namesrvAddr); + } catch (Exception e) { + log.warn("registerBroker Exception, {}", namesrvAddr, e); + } finally { + countDownLatch.countDown(); + } } + }); + } - log.info("register broker to name server {} OK", namesrvAddr); - } catch (Exception e) { - log.warn("registerBroker Exception, {}", namesrvAddr, e); - } + try { + countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { } } - return registerBrokerResult; + return registerBrokerResultList; } private RegisterBrokerResult registerBroker( @@ -139,7 +165,8 @@ public class BrokerOuterAPI { final TopicConfigSerializeWrapper topicConfigWrapper, final List filterServerList, final boolean oneway, - final int timeoutMills + final int timeoutMills, + final boolean compressed ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); @@ -148,12 +175,13 @@ public class BrokerOuterAPI { requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); + requestHeader.setCompressed(compressed); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); - request.setBody(requestBody.encode()); + request.setBody(requestBody.encode(requestHeader.isCompressed())); if (oneway) { try { @@ -231,6 +259,71 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } + public List needRegister( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final TopicConfigSerializeWrapper topicConfigWrapper, + final int timeoutMills) { + final List changedList = new CopyOnWriteArrayList<>(); + List nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null && nameServerAddressList.size() > 0) { + final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); + for (final String namesrvAddr : nameServerAddressList) { + brokerOuterExecutor.execute(new Runnable() { + @Override + public void run() { + try { + QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); + request.setBody(topicConfigWrapper.getDataVersion().encode()); + RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills); + DataVersion nameServerDataVersion = null; + Boolean changed = false; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + QueryDataVersionResponseHeader queryDataVersionResponseHeader = + (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class); + changed = queryDataVersionResponseHeader.getChanged(); + byte[] body = response.getBody(); + if (body != null) { + nameServerDataVersion = DataVersion.decode(body, DataVersion.class); + if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) { + changed = true; + } + } + if (changed == null || changed) { + changedList.add(Boolean.TRUE); + } + } + default: + break; + } + log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion); + } catch (Exception e) { + changedList.add(Boolean.TRUE); + log.error("Query data version from name server {} Exception, {}", namesrvAddr, e); + } finally { + countDownLatch.countDown(); + } + } + }); + + } + try { + countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("query dataversion from nameserver countDownLatch await Exception", e); + } + } + return changedList; + } + public TopicConfigSerializeWrapper getAllTopicConfig( final String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index e8be2d4f..a9e54aa3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -245,7 +245,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.registerBrokerAll(false, true); + this.brokerController.registerBrokerAll(false, true, true); return null; } @@ -310,8 +310,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress()); this.brokerController.getConfiguration().update(properties); if (properties.containsKey("brokerPermission")) { - this.brokerController.registerBrokerAll(false, false); this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(); + this.brokerController.registerBrokerAll(false, false, true); } } else { log.error("string2Properties error"); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 29e22808..cdae66f2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -210,7 +210,7 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true); + this.brokerController.registerBrokerAll(false, true,true); } return topicConfig; @@ -254,7 +254,7 @@ public class TopicConfigManager extends ConfigManager { } if (createNew) { - this.brokerController.registerBrokerAll(false, true); + this.brokerController.registerBrokerAll(false, true,true); } return topicConfig; @@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager { this.dataVersion.nextVersion(); this.persist(); - this.brokerController.registerBrokerAll(false, true); + this.brokerController.registerBrokerAll(false, true,true); } } @@ -299,7 +299,7 @@ public class TopicConfigManager extends ConfigManager { this.dataVersion.nextVersion(); this.persist(); - this.brokerController.registerBrokerAll(false, true); + this.brokerController.registerBrokerAll(false, true,true); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java new file mode 100644 index 00000000..69e0dd38 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.out.BrokerOuterAPI; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import static org.junit.Assert.assertEquals; +import org.junit.Test; +import org.junit.runner.RunWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import org.mockito.Mock; +import static org.mockito.Mockito.when; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class BrokerOuterAPITest { + @Mock + private ChannelHandlerContext handlerContext; + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private MessageStore messageStore; + private String clusterName = "clusterName"; + private String brokerName = "brokerName"; + private String brokerAddr = "brokerAddr"; + private long brokerId = 0L; + private String nameserver1 = "127.0.0.1"; + private String nameserver2 = "127.0.0.2"; + private String nameserver3 = "127.0.0.3"; + private int timeOut = 3000; + + @Mock + private NettyRemotingClient nettyRemotingClient; + + private BrokerOuterAPI brokerOuterAPI; + + public void init() throws Exception { + brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null); + Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient"); + field.setAccessible(true); + field.set(brokerOuterAPI, nettyRemotingClient); + } + + @Test + public void test_needRegister_normal() throws Exception { + init(); + brokerOuterAPI.start(); + final RemotingCommand response = buildResponse(Boolean.TRUE); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + + when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3})); + when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response); + List booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut); + assertEquals(3, booleanList.size()); + assertEquals(false, booleanList.contains(Boolean.FALSE)); + } + + @Test + public void test_needRegister_timeout() throws Exception { + init(); + brokerOuterAPI.start(); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + + when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3})); + + when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer() { + @Override + public RemotingCommand answer(InvocationOnMock invocation) throws Throwable { + if (invocation.getArgument(0) == nameserver1) { + return buildResponse(Boolean.TRUE); + } else if (invocation.getArgument(0) == nameserver2) { + return buildResponse(Boolean.FALSE); + } else if (invocation.getArgument(0) == nameserver3) { + TimeUnit.MILLISECONDS.sleep(timeOut + 20); + return buildResponse(Boolean.TRUE); + } + return buildResponse(Boolean.TRUE); + } + }); + List booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut); + assertEquals(2, booleanList.size()); + boolean success = Iterables.any(booleanList, + new Predicate() { + public boolean apply(Boolean input) { + return input ? true : false; + } + }); + + assertEquals(true, success); + + } + + @Test + public void test_register_normal() throws Exception { + init(); + brokerOuterAPI.start(); + + final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + + when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3})); + when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response); + List registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.newArrayList(), false, timeOut, true); + + assertEquals(3, registerBrokerResultList.size()); + } + + @Test + public void test_register_timeout() throws Exception { + init(); + brokerOuterAPI.start(); + + final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + + when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3})); + when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer() { + @Override + public RemotingCommand answer(InvocationOnMock invocation) throws Throwable { + if (invocation.getArgument(0) == nameserver1) { + return response; + } else if (invocation.getArgument(0) == nameserver2) { + return response; + } else if (invocation.getArgument(0) == nameserver3) { + TimeUnit.MILLISECONDS.sleep(timeOut + 20); + return response; + } + return response; + } + }); + List registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.newArrayList(), false, timeOut, true); + + assertEquals(2, registerBrokerResultList.size()); + } + + private RemotingCommand buildResponse(Boolean changed) { + final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class); + final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader(); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + responseHeader.setChanged(changed); + return response; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 4d7eb469..4468b2d5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common; +import java.net.InetAddress; +import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; @@ -23,9 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; -import java.net.InetAddress; -import java.net.UnknownHostException; - public class BrokerConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @@ -133,6 +132,10 @@ public class BrokerConfig { private boolean filterSupportRetry = false; private boolean enablePropertyFilter = false; + private boolean compressedRegister = false; + + private boolean forceRegister = false; + public boolean isTraceOn() { return traceOn; } @@ -598,4 +601,20 @@ public class BrokerConfig { public void setEnablePropertyFilter(boolean enablePropertyFilter) { this.enablePropertyFilter = enablePropertyFilter; } + + public boolean isCompressedRegister() { + return compressedRegister; + } + + public void setCompressedRegister(boolean compressedRegister) { + this.compressedRegister = compressedRegister; + } + + public boolean isForceRegister() { + return forceRegister; + } + + public void setForceRegister(boolean forceRegister) { + this.forceRegister = forceRegister; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java index 71b00fdd..e54000de 100644 --- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java @@ -78,4 +78,13 @@ public class DataVersion extends RemotingSerializable { } return result; } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("DataVersion["); + sb.append("timestamp=").append(timestamp); + sb.append(", counter=").append(counter); + sb.append(']'); + return sb.toString(); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java index 3860ec3c..564d60c5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java @@ -23,14 +23,21 @@ import java.util.concurrent.atomic.AtomicLong; public class ThreadFactoryImpl implements ThreadFactory { private final AtomicLong threadIndex = new AtomicLong(0); private final String threadNamePrefix; + private final boolean daemon; public ThreadFactoryImpl(final String threadNamePrefix) { + this(threadNamePrefix, false); + } + + public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) { this.threadNamePrefix = threadNamePrefix; + this.daemon = daemon; } @Override public Thread newThread(Runnable r) { - return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet()); - + Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet()); + thread.setDaemon(daemon); + return thread; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 5900c0b9..8cf2d46a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -165,4 +165,6 @@ public class RequestCode { public static final int SEND_BATCH_MESSAGE = 320; public static final int QUERY_CONSUME_QUEUE = 321; + + public static final int QUERY_DATA_VERSION = 322; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java index c220927c..2b49b6d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java @@ -17,14 +17,155 @@ package org.apache.rocketmq.common.protocol.body; +import com.alibaba.fastjson.JSON; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RegisterBrokerBody extends RemotingSerializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class); private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); private List filterServerList = new ArrayList(); + public byte[] encode(boolean compress) { + + if (!compress) { + return super.encode(); + } + long start = System.currentTimeMillis(); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION)); + DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); + ConcurrentMap topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable()); + assert topicConfigTable != null; + try { + byte[] buffer = dataVersion.encode(); + + // write data version + outputStream.write(convertIntToByteArray(buffer.length)); + outputStream.write(buffer); + + int topicNumber = topicConfigTable.size(); + + // write number of topic configs + outputStream.write(convertIntToByteArray(topicNumber)); + + // write topic config entry one by one. + for (ConcurrentMap.Entry next : topicConfigTable.entrySet()) { + buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET); + outputStream.write(convertIntToByteArray(buffer.length)); + outputStream.write(buffer); + } + + buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET); + + // write filter server list json length + outputStream.write(convertIntToByteArray(buffer.length)); + + // write filter server list json + outputStream.write(buffer); + + outputStream.finish(); + long interval = System.currentTimeMillis() - start; + if (interval > 50) { + LOGGER.info("Compressing takes {}ms", interval); + } + return byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + LOGGER.error("Failed to compress RegisterBrokerBody object", e); + } + + return null; + } + + public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException { + if (!compressed) { + return RegisterBrokerBody.decode(data, RegisterBrokerBody.class); + } + long start = System.currentTimeMillis(); + InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data)); + int dataVersionLength = readInt(inflaterInputStream); + byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength); + DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class); + + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion); + ConcurrentMap topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable(); + + int topicConfigNumber = readInt(inflaterInputStream); + LOGGER.debug("{} topic configs to extract", topicConfigNumber); + + for (int i = 0; i < topicConfigNumber; i++) { + int topicConfigJsonLength = readInt(inflaterInputStream); + + byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength); + TopicConfig topicConfig = new TopicConfig(); + String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET); + topicConfig.decode(topicConfigJson); + topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + + int filterServerListJsonLength = readInt(inflaterInputStream); + + byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength); + String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET); + List filterServerList = new ArrayList(); + try { + filterServerList = JSON.parseArray(filterServerListJson, String.class); + } catch (Exception e) { + LOGGER.error("Decompressing occur Exception {}", filterServerListJson); + } + + registerBrokerBody.setFilterServerList(filterServerList); + long interval = System.currentTimeMillis() - start; + if (interval > 50) { + LOGGER.info("Decompressing takes {}ms", interval); + } + return registerBrokerBody; + } + + private static byte[] convertIntToByteArray(int n) { + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + byteBuffer.putInt(n); + return byteBuffer.array(); + } + + private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException { + byte[] buffer = new byte[length]; + int bytesRead = 0; + while (bytesRead < length) { + int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead); + if (len == -1) { + throw new IOException("End of compressed data has reached"); + } else { + bytesRead += len; + } + } + return buffer; + } + + private static int readInt(InflaterInputStream inflaterInputStream) throws IOException { + byte[] buffer = readBytes(inflaterInputStream, 4); + ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); + return byteBuffer.getInt(); + } + public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() { return topicConfigSerializeWrapper; } @@ -40,4 +181,16 @@ public class RegisterBrokerBody extends RemotingSerializable { public void setFilterServerList(List filterServerList) { this.filterServerList = filterServerList; } + + public static ConcurrentMap cloneTopicConfigTable( + ConcurrentMap topicConfigConcurrentMap) { + ConcurrentHashMap result = new ConcurrentHashMap(); + if (topicConfigConcurrentMap != null) { + for (Map.Entry entry : topicConfigConcurrentMap.entrySet()) { + result.put(entry.getKey(), entry.getValue()); + } + } + return result; + + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java new file mode 100644 index 00000000..ac6a617d --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class QueryDataVersionRequestHeader implements CommandCustomHeader { + @CFNotNull + private String brokerName; + @CFNotNull + private String brokerAddr; + @CFNotNull + private String clusterName; + @CFNotNull + private Long brokerId; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public Long getBrokerId() { + return brokerId; + } + + public void setBrokerId(Long brokerId) { + this.brokerId = brokerId; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java new file mode 100644 index 00000000..90741e5f --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.header.namesrv; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class QueryDataVersionResponseHeader implements CommandCustomHeader { + @CFNotNull + private Boolean changed; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public Boolean getChanged() { + return changed; + } + + public void setChanged(Boolean changed) { + this.changed = changed; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("QueryDataVersionResponseHeader{"); + sb.append("changed=").append(changed); + sb.append('}'); + return sb.toString(); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java index 45d5b6e9..7ed7a403 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -36,7 +36,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { @CFNotNull private Long brokerId; - @Override + private boolean compressed; + public void checkFields() throws RemotingCommandException { } @@ -79,4 +80,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { public void setBrokerId(Long brokerId) { this.brokerId = brokerId; } + + public boolean isCompressed() { + return compressed; + } + + public void setCompressed(boolean compressed) { + this.compressed = compressed; + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java new file mode 100644 index 00000000..87a0fc00 --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +public class RegisterBrokerBodyTest { + @Test + public void test_encode_decode() throws IOException { + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper); + + ConcurrentMap topicConfigTable = new ConcurrentHashMap(); + for (int i = 0; i < 10000; i++) { + topicConfigTable.put(String.valueOf(i), new TopicConfig(String.valueOf(i))); + } + + topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); + + byte[] compareEncode = registerBrokerBody.encode(true); + byte[] encode2 = registerBrokerBody.encode(false); + System.out.println(compareEncode.length); + System.out.println(encode2.length); + RegisterBrokerBody decodeRegisterBrokerBody = RegisterBrokerBody.decode(compareEncode, true); + + assertEquals(registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size(), decodeRegisterBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size()); + + } +} diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 49068860..236e6a12 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext; import java.io.UnsupportedEncodingException; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; import org.apache.rocketmq.common.MixAll; @@ -41,6 +42,8 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; @@ -81,6 +84,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); + case RequestCode.QUERY_DATA_VERSION: + return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { @@ -194,7 +199,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); if (request.getBody() != null) { - registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class); + try { + registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); + } catch (Exception e) { + throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); + } } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); @@ -221,6 +230,30 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } + public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class); + final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader(); + final QueryDataVersionRequestHeader requestHeader = + (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class); + DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class); + + Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion); + if (!changed) { + this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr()); + } + + DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + if (nameSeverDataVersion != null) { + response.setBody(nameSeverDataVersion.encode()); + } + responseHeader.setChanged(changed); + return response; + } + public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index ef02dd00..00962ef2 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -183,13 +183,24 @@ public class RouteInfoManager { return result; } - private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) { + public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) { + DataVersion prev = queryBrokerTopicConfig(brokerAddr); + return null == prev || !prev.equals(dataVersion); + } + + public DataVersion queryBrokerTopicConfig(final String brokerAddr) { BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr); - if (null == prev || !prev.getDataVersion().equals(dataVersion)) { - return true; + if (prev != null) { + return prev.getDataVersion(); } + return null; + } - return false; + public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) { + BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr); + if (prev != null) { + prev.setLastUpdateTimestamp(System.currentTimeMillis()); + } } private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { -- GitLab