diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 293e51e9b8b5edec4b7509632c6ad339eb176b54..ed85a679c055ecafb972b11fd7db601f7ff82a62 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.Configuration; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -768,6 +770,24 @@ public class BrokerController { } } + public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { + TopicConfig registerTopicConfig = topicConfig; + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + registerTopicConfig = + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + } + + ConcurrentMap topicConfigTable = new ConcurrentHashMap(); + topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setDataVersion(dataVersion); + topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); + + doRegisterBrokerAll(true, false, topicConfigSerializeWrapper); + } + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); @@ -788,30 +808,35 @@ public class BrokerController { this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { - List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills(), - this.brokerConfig.isCompressedRegister()); - - if (registerBrokerResultList.size() > 0) { - RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); - if (registerBrokerResult != null) { - if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { - this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); - } + doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); + } + } + + private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, + TopicConfigSerializeWrapper topicConfigWrapper) { + List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills(), + this.brokerConfig.isCompressedRegister()); + + if (registerBrokerResultList.size() > 0) { + RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); + if (registerBrokerResult != null) { + if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { + this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + } - this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); - if (checkOrderConfig) { - this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); - } + if (checkOrderConfig) { + this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 2825a34cd725416cfaaa030fd9a88d2a2051f8b1..4dee01cbff0dcd3b21020ee0b7ff81c031649d44 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -125,14 +126,28 @@ public class BrokerOuterAPI { final List registerBrokerResultList = Lists.newArrayList(); List nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { + + final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + requestHeader.setHaServerAddr(haServerAddr); + requestHeader.setCompressed(compressed); + + RegisterBrokerBody requestBody = new RegisterBrokerBody(); + requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); + requestBody.setFilterServerList(filterServerList); + final byte[] body = requestBody.encode(compressed); + final int bodyCrc32 = UtilAll.crc32(body); + requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { - RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, - haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed); + RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } @@ -158,31 +173,14 @@ public class BrokerOuterAPI { private RegisterBrokerResult registerBroker( final String namesrvAddr, - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List filterServerList, final boolean oneway, final int timeoutMills, - final boolean compressed + final RegisterBrokerRequestHeader requestHeader, + final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); - requestHeader.setBrokerAddr(brokerAddr); - requestHeader.setBrokerId(brokerId); - requestHeader.setBrokerName(brokerName); - requestHeader.setClusterName(clusterName); - requestHeader.setHaServerAddr(haServerAddr); - requestHeader.setCompressed(compressed); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); - - RegisterBrokerBody requestBody = new RegisterBrokerBody(); - requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); - requestBody.setFilterServerList(filterServerList); - request.setBody(requestBody.encode(requestHeader.isCompressed())); + request.setBody(body); if (oneway) { try { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index c0a4b20211016a2ebc045b565a33c6c63ee45acf..1a704a8c6bcc4ee207ce1d48b55b4e48081f10f6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return false; } - private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, + private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = @@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) { - this.brokerController.registerBrokerAll(false, true, true); - } + this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); return null; } - private RemotingCommand deleteTopic(ChannelHandlerContext ctx, + private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = @@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { + private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 7caf83037c27fc6159f69f03b1397d29ec09b580..203431aee1a9d2697aab955b7d4de5f6ee104e3c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -141,8 +141,6 @@ public class BrokerConfig { * This configurable item defines interval of topics registration of broker to name server. Allowing values are * between 10, 000 and 60, 000 milliseconds. * - * If set to 0, newly created topics will be immediately reported to name servers and interval of periodical - * registration defaults to 10, 000 in milliseconds. */ private int registerNameServerPeriod = 1000 * 30; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java index 7ed7a403d6b29e94e2b9d3c9352da3b75846bf52..19175b04b3eb2c236da756cc8fc6e92e397a5bbb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -38,6 +38,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { private boolean compressed; + private Integer bodyCrc32 = 0; + public void checkFields() throws RemotingCommandException { } @@ -88,4 +90,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { public void setCompressed(boolean compressed) { this.compressed = compressed; } + + public Integer getBodyCrc32() { + return bodyCrc32; + } + + public void setBodyCrc32(Integer bodyCrc32) { + this.bodyCrc32 = bodyCrc32; + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 236e6a12ca042ce3596dd16ccaac60a8d1e425eb..467078c44f84f220ef1860fa2cfc7ea3b5cd59fc 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; @@ -196,6 +197,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + if (!checksum(ctx, request, requestHeader)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("crc32 not match"); + return response; + } + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); if (request.getBody() != null) { @@ -230,6 +237,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } + private boolean checksum(ChannelHandlerContext ctx, RemotingCommand request, + RegisterBrokerRequestHeader requestHeader) { + if (requestHeader.getBodyCrc32() != 0) { + final int crc32 = UtilAll.crc32(request.getBody()); + if (crc32 != requestHeader.getBodyCrc32()) { + log.warn(String.format("receive registerBroker request,crc32 not match,from %s", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()))); + return false; + } + } + return true; + } + public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class); @@ -261,6 +281,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + if (!checksum(ctx, request, requestHeader)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("crc32 not match"); + return response; + } + TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);