提交 e7442225 编写于 作者: F fuyou001 提交者: yukon

[ROCKETMQ-319] Improve broker register performance and reduce memory usage (#205)

上级 46f91479
......@@ -729,14 +729,14 @@ public class BrokerController {
this.filterServerManager.start();
}
this.registerBrokerAll(true, false);
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
......@@ -752,7 +752,7 @@ public class BrokerController {
}
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
......@@ -767,7 +767,12 @@ public class BrokerController {
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
......@@ -776,8 +781,11 @@ public class BrokerController {
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills());
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
......@@ -790,6 +798,26 @@ public class BrokerController {
}
}
}
}
}
private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
boolean needRegister = false;
for (Boolean changed : changeList) {
if (changed) {
needRegister = true;
break;
}
}
return needRegister;
}
public TopicConfigManager getTopicConfigManager() {
return topicConfigManager;
......
......@@ -16,11 +16,19 @@
*/
package org.apache.rocketmq.broker.out;
import com.google.common.collect.Lists;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -33,6 +41,8 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
......@@ -52,6 +62,8 @@ public class BrokerOuterAPI {
private final RemotingClient remotingClient;
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
......@@ -97,7 +109,7 @@ public class BrokerOuterAPI {
this.remotingClient.updateNameServerAddressList(lst);
}
public RegisterBrokerResult registerBrokerAll(
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
......@@ -106,27 +118,41 @@ public class BrokerOuterAPI {
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills) {
RegisterBrokerResult registerBrokerResult = null;
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
if (result != null) {
registerBrokerResult = result;
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResult;
return registerBrokerResultList;
}
private RegisterBrokerResult registerBroker(
......@@ -139,7 +165,8 @@ public class BrokerOuterAPI {
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills
final int timeoutMills,
final boolean compressed
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
......@@ -148,12 +175,13 @@ public class BrokerOuterAPI {
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
request.setBody(requestBody.encode());
request.setBody(requestBody.encode(requestHeader.isCompressed()));
if (oneway) {
try {
......@@ -231,6 +259,71 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper,
final int timeoutMills) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
request.setBody(topicConfigWrapper.getDataVersion().encode());
RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader queryDataVersionResponseHeader =
(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null) {
nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
changed = true;
}
}
if (changed == null || changed) {
changedList.add(Boolean.TRUE);
}
}
default:
break;
}
log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
} catch (Exception e) {
changedList.add(Boolean.TRUE);
log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("query dataversion from nameserver countDownLatch await Exception", e);
}
}
return changedList;
}
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
......
......@@ -245,7 +245,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
this.brokerController.registerBrokerAll(false, true, true);
return null;
}
......@@ -310,8 +310,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
this.brokerController.getConfiguration().update(properties);
if (properties.containsKey("brokerPermission")) {
this.brokerController.registerBrokerAll(false, false);
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
this.brokerController.registerBrokerAll(false, false, true);
}
} else {
log.error("string2Properties error");
......
......@@ -210,7 +210,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
this.brokerController.registerBrokerAll(false, true);
this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
......@@ -254,7 +254,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
this.brokerController.registerBrokerAll(false, true);
this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
......@@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true);
this.brokerController.registerBrokerAll(false, true,true);
}
}
......@@ -299,7 +299,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true);
this.brokerController.registerBrokerAll(false, true,true);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import org.mockito.Mock;
import static org.mockito.Mockito.when;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class BrokerOuterAPITest {
@Mock
private ChannelHandlerContext handlerContext;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private MessageStore messageStore;
private String clusterName = "clusterName";
private String brokerName = "brokerName";
private String brokerAddr = "brokerAddr";
private long brokerId = 0L;
private String nameserver1 = "127.0.0.1";
private String nameserver2 = "127.0.0.2";
private String nameserver3 = "127.0.0.3";
private int timeOut = 3000;
@Mock
private NettyRemotingClient nettyRemotingClient;
private BrokerOuterAPI brokerOuterAPI;
public void init() throws Exception {
brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
field.setAccessible(true);
field.set(brokerOuterAPI, nettyRemotingClient);
}
@Test
public void test_needRegister_normal() throws Exception {
init();
brokerOuterAPI.start();
final RemotingCommand response = buildResponse(Boolean.TRUE);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
assertEquals(3, booleanList.size());
assertEquals(false, booleanList.contains(Boolean.FALSE));
}
@Test
public void test_needRegister_timeout() throws Exception {
init();
brokerOuterAPI.start();
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
@Override
public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
if (invocation.getArgument(0) == nameserver1) {
return buildResponse(Boolean.TRUE);
} else if (invocation.getArgument(0) == nameserver2) {
return buildResponse(Boolean.FALSE);
} else if (invocation.getArgument(0) == nameserver3) {
TimeUnit.MILLISECONDS.sleep(timeOut + 20);
return buildResponse(Boolean.TRUE);
}
return buildResponse(Boolean.TRUE);
}
});
List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
assertEquals(2, booleanList.size());
boolean success = Iterables.any(booleanList,
new Predicate<Boolean>() {
public boolean apply(Boolean input) {
return input ? true : false;
}
});
assertEquals(true, success);
}
@Test
public void test_register_normal() throws Exception {
init();
brokerOuterAPI.start();
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
assertEquals(3, registerBrokerResultList.size());
}
@Test
public void test_register_timeout() throws Exception {
init();
brokerOuterAPI.start();
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
@Override
public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
if (invocation.getArgument(0) == nameserver1) {
return response;
} else if (invocation.getArgument(0) == nameserver2) {
return response;
} else if (invocation.getArgument(0) == nameserver3) {
TimeUnit.MILLISECONDS.sleep(timeOut + 20);
return response;
}
return response;
}
});
List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
assertEquals(2, registerBrokerResultList.size());
}
private RemotingCommand buildResponse(Boolean changed) {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
responseHeader.setChanged(changed);
return response;
}
}
......@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
......@@ -23,9 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
......@@ -133,6 +132,10 @@ public class BrokerConfig {
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
private boolean compressedRegister = false;
private boolean forceRegister = false;
public boolean isTraceOn() {
return traceOn;
}
......@@ -598,4 +601,20 @@ public class BrokerConfig {
public void setEnablePropertyFilter(boolean enablePropertyFilter) {
this.enablePropertyFilter = enablePropertyFilter;
}
public boolean isCompressedRegister() {
return compressedRegister;
}
public void setCompressedRegister(boolean compressedRegister) {
this.compressedRegister = compressedRegister;
}
public boolean isForceRegister() {
return forceRegister;
}
public void setForceRegister(boolean forceRegister) {
this.forceRegister = forceRegister;
}
}
......@@ -78,4 +78,13 @@ public class DataVersion extends RemotingSerializable {
}
return result;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DataVersion[");
sb.append("timestamp=").append(timestamp);
sb.append(", counter=").append(counter);
sb.append(']');
return sb.toString();
}
}
......@@ -23,14 +23,21 @@ import java.util.concurrent.atomic.AtomicLong;
public class ThreadFactoryImpl implements ThreadFactory {
private final AtomicLong threadIndex = new AtomicLong(0);
private final String threadNamePrefix;
private final boolean daemon;
public ThreadFactoryImpl(final String threadNamePrefix) {
this(threadNamePrefix, false);
}
public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
this.threadNamePrefix = threadNamePrefix;
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
thread.setDaemon(daemon);
return thread;
}
}
......@@ -165,4 +165,6 @@ public class RequestCode {
public static final int SEND_BATCH_MESSAGE = 320;
public static final int QUERY_CONSUME_QUEUE = 321;
public static final int QUERY_DATA_VERSION = 322;
}
......@@ -17,14 +17,155 @@
package org.apache.rocketmq.common.protocol.body;
import com.alibaba.fastjson.JSON;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RegisterBrokerBody extends RemotingSerializable {
private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class);
private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();
public byte[] encode(boolean compress) {
if (!compress) {
return super.encode();
}
long start = System.currentTimeMillis();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
assert topicConfigTable != null;
try {
byte[] buffer = dataVersion.encode();
// write data version
outputStream.write(convertIntToByteArray(buffer.length));
outputStream.write(buffer);
int topicNumber = topicConfigTable.size();
// write number of topic configs
outputStream.write(convertIntToByteArray(topicNumber));
// write topic config entry one by one.
for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
outputStream.write(convertIntToByteArray(buffer.length));
outputStream.write(buffer);
}
buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
// write filter server list json length
outputStream.write(convertIntToByteArray(buffer.length));
// write filter server list json
outputStream.write(buffer);
outputStream.finish();
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
LOGGER.info("Compressing takes {}ms", interval);
}
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
LOGGER.error("Failed to compress RegisterBrokerBody object", e);
}
return null;
}
public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException {
if (!compressed) {
return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
}
long start = System.currentTimeMillis();
InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data));
int dataVersionLength = readInt(inflaterInputStream);
byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
int topicConfigNumber = readInt(inflaterInputStream);
LOGGER.debug("{} topic configs to extract", topicConfigNumber);
for (int i = 0; i < topicConfigNumber; i++) {
int topicConfigJsonLength = readInt(inflaterInputStream);
byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
TopicConfig topicConfig = new TopicConfig();
String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
topicConfig.decode(topicConfigJson);
topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
int filterServerListJsonLength = readInt(inflaterInputStream);
byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
List<String> filterServerList = new ArrayList<String>();
try {
filterServerList = JSON.parseArray(filterServerListJson, String.class);
} catch (Exception e) {
LOGGER.error("Decompressing occur Exception {}", filterServerListJson);
}
registerBrokerBody.setFilterServerList(filterServerList);
long interval = System.currentTimeMillis() - start;
if (interval > 50) {
LOGGER.info("Decompressing takes {}ms", interval);
}
return registerBrokerBody;
}
private static byte[] convertIntToByteArray(int n) {
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
byteBuffer.putInt(n);
return byteBuffer.array();
}
private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException {
byte[] buffer = new byte[length];
int bytesRead = 0;
while (bytesRead < length) {
int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead);
if (len == -1) {
throw new IOException("End of compressed data has reached");
} else {
bytesRead += len;
}
}
return buffer;
}
private static int readInt(InflaterInputStream inflaterInputStream) throws IOException {
byte[] buffer = readBytes(inflaterInputStream, 4);
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
return byteBuffer.getInt();
}
public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}
......@@ -40,4 +181,16 @@ public class RegisterBrokerBody extends RemotingSerializable {
public void setFilterServerList(List<String> filterServerList) {
this.filterServerList = filterServerList;
}
public static ConcurrentMap<String, TopicConfig> cloneTopicConfigTable(
ConcurrentMap<String, TopicConfig> topicConfigConcurrentMap) {
ConcurrentHashMap<String, TopicConfig> result = new ConcurrentHashMap<String, TopicConfig>();
if (topicConfigConcurrentMap != null) {
for (Map.Entry<String, TopicConfig> entry : topicConfigConcurrentMap.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}
}
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class QueryDataVersionRequestHeader implements CommandCustomHeader {
@CFNotNull
private String brokerName;
@CFNotNull
private String brokerAddr;
@CFNotNull
private String clusterName;
@CFNotNull
private Long brokerId;
@Override
public void checkFields() throws RemotingCommandException {
}
public String getBrokerName() {
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public String getBrokerAddr() {
return brokerAddr;
}
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public Long getBrokerId() {
return brokerId;
}
public void setBrokerId(Long brokerId) {
this.brokerId = brokerId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class QueryDataVersionResponseHeader implements CommandCustomHeader {
@CFNotNull
private Boolean changed;
@Override
public void checkFields() throws RemotingCommandException {
}
public Boolean getChanged() {
return changed;
}
public void setChanged(Boolean changed) {
this.changed = changed;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("QueryDataVersionResponseHeader{");
sb.append("changed=").append(changed);
sb.append('}');
return sb.toString();
}
}
......@@ -36,7 +36,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long brokerId;
@Override
private boolean compressed;
public void checkFields() throws RemotingCommandException {
}
......@@ -79,4 +80,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
public void setBrokerId(Long brokerId) {
this.brokerId = brokerId;
}
public boolean isCompressed() {
return compressed;
}
public void setCompressed(boolean compressed) {
this.compressed = compressed;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class RegisterBrokerBodyTest {
@Test
public void test_encode_decode() throws IOException {
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (int i = 0; i < 10000; i++) {
topicConfigTable.put(String.valueOf(i), new TopicConfig(String.valueOf(i)));
}
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
byte[] compareEncode = registerBrokerBody.encode(true);
byte[] encode2 = registerBrokerBody.encode(false);
System.out.println(compareEncode.length);
System.out.println(encode2.length);
RegisterBrokerBody decodeRegisterBrokerBody = RegisterBrokerBody.decode(compareEncode, true);
assertEquals(registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size(), decodeRegisterBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size());
}
}
......@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MQVersion.Version;
import org.apache.rocketmq.common.MixAll;
......@@ -41,6 +42,8 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
......@@ -81,6 +84,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
......@@ -194,7 +199,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
......@@ -221,6 +230,30 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
final QueryDataVersionRequestHeader requestHeader =
(QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);
Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
if (!changed) {
this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
}
DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
if (nameSeverDataVersion != null) {
response.setBody(nameSeverDataVersion.encode());
}
responseHeader.setChanged(changed);
return response;
}
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
......
......@@ -183,13 +183,24 @@ public class RouteInfoManager {
return result;
}
private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
return null == prev || !prev.equals(dataVersion);
}
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
return true;
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
return false;
public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
prev.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册