提交 92f0e1f5 编写于 作者: 傅冲 提交者: Li Zhanhui

Optimize broker topic route registration to relieve stress on Java GC

上级 9932819e
......@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
......@@ -768,6 +770,24 @@ public class BrokerController {
}
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
}
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
......@@ -788,6 +808,12 @@ public class BrokerController {
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
......@@ -815,7 +841,6 @@ public class BrokerController {
}
}
}
}
private boolean needRegister(final String clusterName,
final String brokerAddr,
......
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -125,14 +126,28 @@ public class BrokerOuterAPI {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
......@@ -158,31 +173,14 @@ public class BrokerOuterAPI {
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
request.setBody(requestBody.encode(requestHeader.isCompressed()));
request.setBody(body);
if (oneway) {
try {
......
......@@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return false;
}
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
......@@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) {
this.brokerController.registerBrokerAll(false, true, true);
}
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
......@@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
......
......@@ -141,8 +141,6 @@ public class BrokerConfig {
* This configurable item defines interval of topics registration of broker to name server. Allowing values are
* between 10, 000 and 60, 000 milliseconds.
*
* If set to 0, newly created topics will be immediately reported to name servers and interval of periodical
* registration defaults to 10, 000 in milliseconds.
*/
private int registerNameServerPeriod = 1000 * 30;
......
......@@ -38,6 +38,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
private boolean compressed;
private Integer bodyCrc32 = 0;
public void checkFields() throws RemotingCommandException {
}
......@@ -88,4 +90,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
public void setCompressed(boolean compressed) {
this.compressed = compressed;
}
public Integer getBodyCrc32() {
return bodyCrc32;
}
public void setBodyCrc32(Integer bodyCrc32) {
this.bodyCrc32 = bodyCrc32;
}
}
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MQVersion.Version;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -196,6 +197,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
......@@ -230,6 +237,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
private boolean checksum(ChannelHandlerContext ctx, RemotingCommand request,
RegisterBrokerRequestHeader requestHeader) {
if (requestHeader.getBodyCrc32() != 0) {
final int crc32 = UtilAll.crc32(request.getBody());
if (crc32 != requestHeader.getBodyCrc32()) {
log.warn(String.format("receive registerBroker request,crc32 not match,from %s",
RemotingHelper.parseChannelRemoteAddr(ctx.channel())));
return false;
}
}
return true;
}
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
......@@ -261,6 +281,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册