未验证 提交 da5d30b4 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #3153 from chenzlalvin/logicqueue

[RIP-21] RocketMQ Logic Queue
......@@ -16,13 +16,18 @@
*/
package org.apache.rocketmq.broker;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -33,6 +38,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
......@@ -42,6 +49,7 @@ import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
......@@ -79,9 +87,11 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
......@@ -89,13 +99,18 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
......@@ -159,6 +174,7 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ConcurrentMap<String, String> brokerName2AddrMap = Maps.newConcurrentMap();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
......@@ -277,9 +293,9 @@ public class BrokerController {
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
DefaultMessageStore messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
messageStore.registerCleanFileHook(topicConfigManager.getLogicalQueueCleanHook());
this.messageStore = messageStore;
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
......@@ -467,6 +483,14 @@ public class BrokerController {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.refreshBrokerNameMapping();
} catch (Exception e) {
log.error("ScheduledTask examineBrokerClusterInfo exception", e);
}
}, 10, 10, TimeUnit.SECONDS);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
......@@ -593,6 +617,18 @@ public class BrokerController {
}
}
private void refreshBrokerNameMapping() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo brokerClusterInfo = this.brokerOuterAPI.getBrokerClusterInfo();
brokerClusterInfo.getBrokerAddrTable().forEach((brokerName, data) -> {
String masterBrokerAddr = data.getBrokerAddrs().get(MixAll.MASTER_ID);
this.brokerName2AddrMap.put(brokerName, masterBrokerAddr);
});
}
public String getBrokerAddrByName(String brokerName) {
return this.brokerName2AddrMap.get(brokerName);
}
public void registerProcessor() {
/**
* SendMessageProcessor
......@@ -1009,20 +1045,54 @@ public class BrokerController {
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
}
public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConfigList, DataVersion dataVersion) {
if (topicConfigList == null || topicConfigList.isEmpty()) {
return;
}
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
.map(topicConfig -> {
TopicConfig registerTopicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(),
topicConfig.getReadQueueNums(),
topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
} else {
registerTopicConfig = new TopicConfig(topicConfig);
}
return registerTopicConfig;
})
.collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
String brokerName = this.brokerConfig.getBrokerName();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicConfigManager.selectLogicalQueuesInfo(topicName))
.map(info -> {
info.readLock().lock();
try {
return new AbstractMap.SimpleImmutableEntry<>(topicName, new LogicalQueuesInfoInBroker(info, data -> Objects.equals(data.getBrokerName(), brokerName)));
} finally {
info.readLock().unlock();
}
})
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!logicalQueuesInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
......@@ -1032,13 +1102,21 @@ public class BrokerController {
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = Maps.newHashMapWithExpectedSize(topicConfigWrapper.getTopicConfigTable().size());
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
String topicName = topicConfig.getTopicName();
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
new TopicConfig(topicName, topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfigTable.put(topicName, tmp);
LogicalQueuesInfoInBroker logicalQueuesInfo = this.topicConfigManager.selectLogicalQueuesInfo(topicName);
if (logicalQueuesInfo != null) {
String brokerName = this.brokerConfig.getBrokerName();
logicalQueuesInfoMap.put(topicName, new LogicalQueuesInfoInBroker(logicalQueuesInfo, data -> Objects.equals(data.getBrokerName(), brokerName)));
}
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.domain;
import com.alibaba.fastjson.parser.ParserConfig;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.common.fastjson.GenericMapSuperclassDeserializer;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
import static java.util.Optional.ofNullable;
public class LogicalQueuesInfoInBroker extends LogicalQueuesInfo {
private final ConcurrentMap<Integer, ConcurrentNavigableMap<Long, LogicalQueueRouteData>> queueId2LogicalQueueMap = Maps.newConcurrentMap();
public LogicalQueuesInfoInBroker() {
}
public LogicalQueuesInfoInBroker(LogicalQueuesInfoInBroker other) {
this(other, null);
}
// deep copy
public LogicalQueuesInfoInBroker(LogicalQueuesInfoInBroker other, Predicate<LogicalQueueRouteData> predicate) {
other.readLock().lock();
try {
for (Entry<Integer, List<LogicalQueueRouteData>> entry : other.entrySet()) {
Stream<LogicalQueueRouteData> stream = entry.getValue().stream();
if (predicate != null) {
stream = stream.filter(predicate);
}
this.put(entry.getKey(), stream.map(LogicalQueueRouteData::new).collect(Collectors.toList()));
}
} finally {
other.readLock().unlock();
}
}
public void updateQueueRouteDataByQueueId(int queueId, LogicalQueueRouteData queueRouteData) {
if (queueRouteData == null) {
return;
}
ConcurrentHashMapUtil.computeIfAbsent(queueId2LogicalQueueMap, queueId, k -> new ConcurrentSkipListMap<>()).put(queueRouteData.getOffsetDelta(), queueRouteData);
}
/**
* find logical queue route data for message queues owned by this broker
*/
public LogicalQueueRouteData queryQueueRouteDataByQueueId(int queueId, long offset) {
ConcurrentNavigableMap<Long, LogicalQueueRouteData> m = this.queueId2LogicalQueueMap.get(queueId);
if (m == null || m.isEmpty()) {
return null;
}
Entry<Long, LogicalQueueRouteData> entry = m.floorEntry(offset);
if (entry == null) {
return null;
}
return entry.getValue();
}
public void deleteQueueRouteData(LogicalQueueRouteData logicalQueueRouteData) {
ConcurrentNavigableMap<Long, LogicalQueueRouteData> m = this.queueId2LogicalQueueMap.get(logicalQueueRouteData.getQueueId());
if (m != null) {
m.remove(logicalQueueRouteData.getOffsetDelta(), logicalQueueRouteData);
}
}
public LogicalQueueRouteData nextAvailableLogicalRouteData(LogicalQueueRouteData queueRouteData,
Predicate<LogicalQueueRouteData> predicate) {
this.readLock().lock();
try {
List<LogicalQueueRouteData> queueRouteDataList = ofNullable(this.get(queueRouteData.getLogicalQueueIndex())).orElse(Collections.emptyList());
int idx = Collections.binarySearch(queueRouteDataList, queueRouteData);
if (idx >= 0) {
for (int i = idx + 1, size = queueRouteDataList.size(); i < size; i++) {
LogicalQueueRouteData tmp = queueRouteDataList.get(i);
if (predicate.test(tmp)) {
return tmp;
}
}
}
} finally {
this.readLock().unlock();
}
return null;
}
static {
// workaround https://github.com/alibaba/fastjson/issues/3730
ParserConfig.getGlobalInstance().putDeserializer(LogicalQueuesInfoInBroker.class, GenericMapSuperclassDeserializer.INSTANCE);
}
}
......@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
......@@ -48,6 +49,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerReques
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -432,4 +434,23 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public ClusterInfo getBrokerClusterInfo() throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, 3_000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
}
......@@ -109,6 +109,11 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
return next.getMaxOffsetInQueue(topic, queueId);
}
@Override
public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
return next.getMaxOffsetInQueue(topic, queueId, committed);
}
@Override
public long getMinOffsetInQueue(String topic, int queueId) {
return next.getMinOffsetInQueue(topic, queueId);
......
......@@ -16,19 +16,28 @@
*/
package org.apache.rocketmq.broker.processor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueId;
import org.apache.rocketmq.common.constant.DBMsgConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
......@@ -42,17 +51,23 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -63,6 +78,8 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
protected final SocketAddress storeHost;
private List<SendMessageHook> sendMessageHookList;
private final ConcurrentMap<TopicQueueId, LongAdder> inFlyWritingCounterMap = Maps.newConcurrentMap();
public AbstractSendMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.storeHost =
......@@ -330,4 +347,158 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
public boolean rejectRequest() {
return false;
}
public ConcurrentMap<TopicQueueId, LongAdder> getInFlyWritingCounterMap() {
return inFlyWritingCounterMap;
}
protected LogicalQueueContext buildLogicalQueueContext(String topic, int queueId,
RemotingCommand response) {
TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager();
LogicalQueuesInfoInBroker logicalQueuesInfo = topicConfigManager.selectLogicalQueuesInfo(topic);
if (logicalQueuesInfo == null) {
return noopLogicalQueueContext;
}
// writable route data will has largest offset
LogicalQueueRouteData curQueueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, Long.MAX_VALUE);
if (curQueueRouteData == null) {
// topic enabled logical queue, but some message queues are not converted or being converted
String msg = String.format(Locale.ENGLISH, "queueId %d not included in logical queue", queueId);
log.debug("buildLogicalQueueContext unexpected error, topic {} {}", topic, msg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(msg);
return noopLogicalQueueContext;
}
LongAdder inFlyWritingCounter = ConcurrentHashMapUtil.computeIfAbsent(inFlyWritingCounterMap, new TopicQueueId(topic, queueId), ignore -> new LongAdder());
return new LogicalQueueContext(topic, queueId, logicalQueuesInfo, curQueueRouteData, inFlyWritingCounter);
}
protected class LogicalQueueContext {
private final String topic;
private final int queueId;
private final LogicalQueuesInfoInBroker logicalQueuesInfo;
private final LogicalQueueRouteData curQueueRouteData;
private final LongAdder inFlyWritingCounter;
public LogicalQueueContext(String topic, int queueId,
LogicalQueuesInfoInBroker logicalQueuesInfo,
LogicalQueueRouteData curQueueRouteData, LongAdder inFlyWritingCounter) {
this.topic = topic;
this.queueId = queueId;
this.logicalQueuesInfo = logicalQueuesInfo;
this.curQueueRouteData = curQueueRouteData;
this.inFlyWritingCounter = inFlyWritingCounter;
}
public CompletableFuture<RemotingCommand> hookBeforePut(ChannelHandlerContext ctx, SendMessageRequestHeader requestHeader,
RemotingCommand request, RemotingCommand response) {
if (curQueueRouteData.isWritable()) {
this.inFlyWritingCounter.increment();
return null;
}
int logicalQueueIdx = curQueueRouteData.getLogicalQueueIndex();
List<LogicalQueueRouteData> queueRouteDataList = logicalQueuesInfo.get(logicalQueueIdx);
LogicalQueueRouteData writableQueueRouteData = null;
for (int i = queueRouteDataList.size() - 1; i >= 0; i--) {
LogicalQueueRouteData queueRouteData = queueRouteDataList.get(i);
if (queueRouteData.isWritable()) {
writableQueueRouteData = queueRouteData;
break;
}
}
if (writableQueueRouteData == null) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format(Locale.ENGLISH, "broker[%s] topic[%s] queueId[%d] logicalQueueIdx[%d] not writable", AbstractSendMessageProcessor.this.brokerController.getBrokerConfig().getBrokerIP1(), topic, queueId, logicalQueueIdx));
return CompletableFuture.completedFuture(response);
}
if ((Optional.ofNullable(requestHeader.getSysFlag()).orElse(0) & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0) {
// new client, use redirect
response.setCode(ResponseCode.NO_PERMISSION);
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
response.setBody(RemotingSerializable.encode(ImmutableList.of(curQueueRouteData, writableQueueRouteData)));
return CompletableFuture.completedFuture(response);
} else {
// old client, use forward
this.logicalQueueHookForward(ctx, writableQueueRouteData, requestHeader, request, response);
}
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
} else if (response.getCode() == ResponseCode.ASYNC_AND_RETURN_NULL) {
return CompletableFuture.completedFuture(null);
}
return null;
}
private void logicalQueueHookForward(ChannelHandlerContext ctx,
LogicalQueueRouteData writableQueueRouteData,
SendMessageRequestHeader requestHeader, RemotingCommand request,
RemotingCommand response) {
response.setCode(ResponseCode.SUCCESS);
requestHeader.setQueueId(writableQueueRouteData.getQueueId());
request.writeCustomHeader(requestHeader);
String brokerName = writableQueueRouteData.getBrokerName();
BrokerController brokerController = AbstractSendMessageProcessor.this.brokerController;
String brokerAddr = brokerController.getBrokerAddrByName(brokerName);
if (brokerAddr == null) {
log.warn("getBrokerAddrByName brokerName={} got null, fallback to queueRouteData.getBrokerAddr()", brokerName);
brokerAddr = writableQueueRouteData.getBrokerAddr();
}
if (brokerAddr == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
String msg = String.format(Locale.ENGLISH, "unknown brokerName %s", brokerName);
response.setRemark(msg);
log.warn("logicalQueueHookForward can not look up brokerName={}: {}", brokerName, requestHeader);
return;
}
try {
String finalBrokerAddr = brokerAddr;
brokerController.getBrokerOuterAPI().forwardRequest(brokerAddr, request, brokerController.getBrokerConfig().getForwardTimeout(), responseFuture -> {
RemotingCommand forwardResponse = responseFuture.getResponseCommand();
if (forwardResponse == null) {
forwardResponse = response;
forwardResponse.setCode(ResponseCode.SYSTEM_ERROR);
if (!responseFuture.isSendRequestOK()) {
forwardResponse.setRemark(String.format(Locale.ENGLISH, "send request failed to %s: %s", finalBrokerAddr, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
forwardResponse.setRemark(String.format(Locale.ENGLISH, "wait response from %s timeout: %dms", finalBrokerAddr, responseFuture.getTimeoutMillis()));
} else {
forwardResponse.setRemark(String.format(Locale.ENGLISH, "unknown reason. addr: %s, timeoutMillis: %d: %s", finalBrokerAddr, responseFuture.getTimeoutMillis(), responseFuture.getCause()));
}
} else {
CommandCustomHeader customHeader = forwardResponse.readCustomHeader();
if (customHeader instanceof SendMessageResponseHeader) {
SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) customHeader;
Integer forwardQueueId = responseHeader.getQueueId();
forwardResponse.addExtField(MessageConst.PROPERTY_FORWARD_QUEUE_ID, forwardQueueId != null ? Integer.toString(forwardQueueId) : "null");
responseHeader.setQueueId(requestHeader.getQueueId());
// queueOffset should not be changed since forwarded broker will add delta to it.
}
}
AbstractSendMessageProcessor.this.doResponse(ctx, request, forwardResponse);
});
response.setCode(ResponseCode.ASYNC_AND_RETURN_NULL);
} catch (Exception e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("forward error");
log.warn(String.format(Locale.ENGLISH, "logicalQueueHookForward to %s error", brokerAddr), e);
}
}
public void hookAfterPut(CompletableFuture<PutMessageResult> putMessageResult) {
Optional.ofNullable(putMessageResult).orElse(CompletableFuture.completedFuture(null)).whenComplete((result, throwable) -> {
this.inFlyWritingCounter.decrement();
});
}
}
private final LogicalQueueContext noopLogicalQueueContext = new LogicalQueueContext(null, 0, null, null, null) {
@Override public CompletableFuture<RemotingCommand> hookBeforePut(ChannelHandlerContext ctx, SendMessageRequestHeader requestHeader,
RemotingCommand request, RemotingCommand response) {
return null;
}
@Override public void hookAfterPut(CompletableFuture<PutMessageResult> putMessageResult) {
}
};
}
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
......@@ -25,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
......@@ -41,6 +43,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -48,6 +51,8 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
......@@ -62,6 +67,7 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
......@@ -126,23 +132,25 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
String topic = requestHeader.getTopic();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
log.error("the topic {} not exist, consumer: {}", topic, RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", topic, FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
response.setRemark("the topic[" + topic + "] pulling message is forbidden");
return response;
}
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
int queueId = requestHeader.getQueueId();
if (queueId < 0 || queueId >= topicConfig.getReadQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
queueId, topic, topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
......@@ -154,11 +162,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
topic, requestHeader.getSubscription(), requestHeader.getExpressionType()
);
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
topic, requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
......@@ -187,9 +195,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
subscriptionData = consumerGroupInfo.findSubscriptionData(topic);
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), topic);
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
......@@ -203,7 +211,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
consumerFilterData = this.brokerController.getConsumerFilterManager().get(topic,
requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
......@@ -212,7 +220,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
requestHeader.getConsumerGroup(), topic, consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
......@@ -236,13 +244,74 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getConsumerFilterManager());
}
long offset = requestHeader.getQueueOffset();
int maxMsgNums = requestHeader.getMaxMsgNums();
LogicalQueuesInfoInBroker logicalQueuesInfo = this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic);
LogicalQueueRouteData queueRouteData = null;
if (logicalQueuesInfo != null) {
int responseErrorCode = ResponseCode.SUCCESS;
queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, offset);
if (queueRouteData != null) {
if (queueRouteData.isWriteOnly()) {
responseErrorCode = ResponseCode.PULL_NOT_FOUND;
response.setRemark("logical queue write only");
} else if (queueRouteData.isExpired()) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("logical queue expired");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
} else if (MessageQueueRouteState.ReadOnly.equals(queueRouteData.getState()) && queueRouteData.getOffsetMax() >= 0) {
if (offset >= queueRouteData.getOffsetMax()) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("queue offset exceed offsetMax");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
} else if (offset + maxMsgNums > queueRouteData.getOffsetMax()) {
if ((queueRouteData.getOffsetMax() - 1 <= this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId)) &&
(this.brokerController.getMessageStore().getCommitLogOffsetInQueue(topic, queueId, queueRouteData.getOffsetMax() - 1) < this.brokerController.getMessageStore().getMinPhyOffset())) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("queue offset removed");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
} else {
maxMsgNums = (int) (queueRouteData.getOffsetMax() - offset);
if (maxMsgNums <= 0) {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("queue offset out of range");
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
}
}
}
}
} else {
responseErrorCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
response.setRemark("no suitable queue");
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
// instruct client to refresh all
response.setBody(null);
queueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, 0L);
}
if (responseErrorCode != ResponseCode.SUCCESS) {
response.setCode(responseErrorCode);
responseHeader.setMinOffset(offset);
responseHeader.setMaxOffset(queueRouteData != null ? queueRouteData.getOffsetMax() : offset);
responseHeader.setNextBeginOffset(queueRouteData != null ? queueRouteData.getOffsetMax() : offset);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
return response;
}
}
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic,
queueId, offset, maxMsgNums, messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
long nextBeginOffset = getMessageResult.getNextBeginOffset();
if (queueRouteData != null && queueRouteData.getOffsetMax() >= 0 && nextBeginOffset > queueRouteData.getOffsetMax()) {
// prevent from pulling messages from next logical queue route data
nextBeginOffset = queueRouteData.getOffsetMax();
}
responseHeader.setNextBeginOffset(nextBeginOffset);
responseHeader.setMinOffset(getMessageResult.getMinOffset());
// this does not need to be modified since it's not an accurate value under logical queue.
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
if (getMessageResult.isSuggestPullingFromSlave()) {
......@@ -291,9 +360,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
nextBeginOffset,
topic,
queueId,
requestHeader.getConsumerGroup()
);
} else {
......@@ -318,7 +387,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
requestHeader.getConsumerGroup(), topic, requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
......@@ -329,8 +398,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if (this.hasConsumeMessageHook()) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getConsumerGroup());
context.setTopic(requestHeader.getTopic());
context.setQueueId(requestHeader.getQueueId());
context.setTopic(topic);
context.setQueueId(queueId);
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
......@@ -414,9 +483,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
......@@ -424,6 +490,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
break;
}
if (queueRouteData != null) {
logicalQueuesInfo.readLock().lock();
try {
List<LogicalQueueRouteData> queueRouteDataList = logicalQueuesInfo.get(queueRouteData.getLogicalQueueIndex());
MessageQueue latestMessageQueue = queueRouteDataList.get(queueRouteDataList.size() - 1).getMessageQueue();
if (!latestMessageQueue.getBrokerName().equals(brokerController.getBrokerConfig().getBrokerName()) || latestMessageQueue.getQueueId() != queueId) {
// There are other newer message queue, instruct client to refresh meta-data to access these
prepareRedirectResponse(response, logicalQueuesInfo, queueRouteData);
}
} finally {
logicalQueuesInfo.readLock().unlock();
}
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
......@@ -438,7 +518,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
event.setOffsetNew(nextBeginOffset);
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
......@@ -467,11 +547,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
requestHeader.getConsumerGroup(), topic, queueId, requestHeader.getCommitOffset());
}
return response;
}
private void prepareRedirectResponse(RemotingCommand response, LogicalQueuesInfoInBroker logicalQueuesInfo,
LogicalQueueRouteData queueRouteData) {
LogicalQueueRouteData nextReadableLogicalQueueRouteData = logicalQueuesInfo.nextAvailableLogicalRouteData(queueRouteData, LogicalQueueRouteData::isReadable);
if (nextReadableLogicalQueueRouteData != null) {
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
response.setBody(RemotingSerializable.encode(ImmutableList.of(queueRouteData, nextReadableLogicalQueueRouteData)));
}
}
public boolean hasConsumeMessageHook() {
return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
}
......
......@@ -287,6 +287,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), response);
CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
if (future != null) {
return future;
}
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -296,12 +302,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
logicalQueueContext.hookAfterPut(null);
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
logicalQueueContext.hookAfterPut(putMessageResult);
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
......@@ -362,82 +370,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return true;
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
......@@ -587,7 +519,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), response);
CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
if (future != null) {
return future;
}
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);
logicalQueueContext.hookAfterPut(putMessageResult);
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}
......
......@@ -17,16 +17,20 @@
package org.apache.rocketmq.broker.topic;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
......@@ -35,10 +39,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
import org.apache.rocketmq.store.CleanFilesHook;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......@@ -52,8 +61,19 @@ public class TopicConfigManager extends ConfigManager {
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
public TopicConfigManager() {
}
private final ConcurrentMap<String, LogicalQueuesInfoInBroker> logicalQueuesInfoTable = new ConcurrentHashMap<>();
private final CleanFilesHook logicalQueueCleanHook = new CleanFilesHook() {
@Override public void execute(DefaultMessageStore defaultMessageStore, long deleteCount) {
if (deleteCount == 0) {
return;
}
TopicConfigManager.this.logicalQueueClean();
}
@Override public String getName() {
return TopicConfigManager.class.getSimpleName() + ".logicalQueueCleanHook";
}
};
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
......@@ -362,7 +382,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
this.persist(topicConfig.getTopicName(), topicConfig);
}
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
......@@ -421,6 +441,8 @@ public class TopicConfigManager extends ConfigManager {
public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue(), data -> Objects.equals(data.getBrokerName(), brokerName)))));
topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
return topicConfigSerializeWrapper;
}
......@@ -452,6 +474,7 @@ public class TopicConfigManager extends ConfigManager {
public String encode(final boolean prettyFormat) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(this.logicalQueuesInfoTable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new LogicalQueuesInfoInBroker(e.getValue()))));
topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
return topicConfigSerializeWrapper.toJson(prettyFormat);
}
......@@ -471,4 +494,92 @@ public class TopicConfigManager extends ConfigManager {
public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
return topicConfigTable;
}
public LogicalQueuesInfoInBroker selectLogicalQueuesInfo(String topicName) {
return this.logicalQueuesInfoTable.get(topicName);
}
public LogicalQueuesInfoInBroker getOrCreateLogicalQueuesInfo(String topicName) {
return ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignored -> new LogicalQueuesInfoInBroker());
}
public boolean replaceTopicConfig(String topic, TopicConfig oldTopicConfig, TopicConfig newTopicConfig) {
boolean ok = this.topicConfigTable.replace(topic, oldTopicConfig, newTopicConfig);
if (ok) {
this.dataVersion.nextVersion();
persist(topic, newTopicConfig);
}
return ok;
}
public CleanFilesHook getLogicalQueueCleanHook() {
return logicalQueueCleanHook;
}
void logicalQueueClean() {
String brokerName = this.brokerController.getBrokerConfig().getBrokerName();
MessageStore messageStore = this.brokerController.getMessageStore();
for (Entry<String, LogicalQueuesInfoInBroker> entry : this.logicalQueuesInfoTable.entrySet()) {
String topic = entry.getKey();
LogicalQueuesInfoInBroker logicalQueuesInfo = entry.getValue();
Lock readLock = logicalQueuesInfo.readLock();
Lock writeLock = logicalQueuesInfo.writeLock();
boolean changed = false;
readLock.lock();
try {
for (List<LogicalQueueRouteData> list : logicalQueuesInfo.values()) {
while (!list.isEmpty()) {
LogicalQueueRouteData logicalQueueRouteData = list.get(0);
String brokerBelongs;
if (brokerName.equals(logicalQueueRouteData.getBrokerName())) {
if (logicalQueueRouteData.isWritable()) {
break;
}
boolean canRemove = logicalQueueRouteData.isExpired() || logicalQueueRouteData.getMessagesCount() == 0;
if (!canRemove) {
// do not use getMinOffsetInQueue method, since it is using ConsumeQueue data, but not CommitLog, CQ data is not accurate after CommitLog cleaning.
long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, logicalQueueRouteData.getQueueId(), logicalQueueRouteData.getOffsetMax() - 1);
canRemove = commitLogOffset == 0 || messageStore.getMinPhyOffset() > commitLogOffset;
}
if (!canRemove) {
break;
}
brokerBelongs = "self";
} else {
brokerBelongs = "other";
}
readLock.unlock();
writeLock.lock();
try {
list.remove(0);
} finally {
readLock.lock();
writeLock.unlock();
}
log.info("logicalQueueClean remove {} broker {}", brokerBelongs, logicalQueueRouteData);
changed = true;
}
}
if (changed) {
logicalQueuesInfo = new LogicalQueuesInfoInBroker(logicalQueuesInfo);
}
} finally {
readLock.unlock();
}
if (changed) {
this.dataVersion.nextVersion();
this.persist(topic, logicalQueuesInfo);
this.brokerController.registerIncrementBrokerData(this.selectTopicConfig(topic), this.dataVersion);
log.info("registerIncrementBrokerData because logicalQueueClean: {}", topic);
}
}
}
public void deleteQueueRouteData(String topic) {
if (this.logicalQueuesInfoTable.remove(topic) != null) {
log.info("delete queueRouteData config OK, topic: {}", topic);
this.dataVersion.nextVersion();
persist(topic, (LogicalQueuesInfoInBroker) null);
}
}
}
......@@ -22,19 +22,26 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
......@@ -50,6 +57,8 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
......@@ -182,6 +191,26 @@ public class BrokerOuterAPITest {
assertEquals(2, registerBrokerResultList.size());
}
@Test
public void testGetBrokerClusterInfo() throws Exception {
init();
brokerOuterAPI.start();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
ClusterInfo want = new ClusterInfo();
want.setBrokerAddrTable(new HashMap<>(Collections.singletonMap("key", new BrokerData("cluster", "broker", new HashMap<>(Collections.singletonMap(MixAll.MASTER_ID, "127.0.0.1:10911"))))));
response.setBody(RemotingSerializable.encode(want));
when(nettyRemotingClient.invokeSync(isNull(), argThat(argument -> argument.getCode() == RequestCode.GET_BROKER_CLUSTER_INFO), anyLong())).thenReturn(response);
ClusterInfo got = brokerOuterAPI.getBrokerClusterInfo();
assertEquals(want, got);
}
private RemotingCommand buildResponse(Boolean changed) {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
......
......@@ -20,24 +20,31 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
......@@ -46,6 +53,7 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -53,11 +61,15 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static java.util.Optional.ofNullable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.intThat;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
......@@ -92,6 +104,7 @@ public class PullMessageProcessorTest {
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
brokerController.getTopicConfigManager().updateTopicConfig(new TopicConfig(topic));
}
@Test
......@@ -192,6 +205,94 @@ public class PullMessageProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED);
}
@Test
public void testProcessRequest_LogicalQueue() throws Exception {
String brokerName = brokerController.getBrokerConfig().getBrokerName();
int queueId = 1;
GetMessageResult getMessageResult = createGetMessageResult();
when(messageStore.getMessage(anyString(), eq(topic), eq(queueId), eq(456L), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
when(messageStore.getMaxOffsetInQueue(eq(topic), eq(queueId))).thenReturn(2000L);
when(messageStore.getMinPhyOffset()).thenReturn(0L);
LogicalQueuesInfoInBroker logicalQueuesInfo = brokerController.getTopicConfigManager().getOrCreateLogicalQueuesInfo(topic);
LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(0, 0, new MessageQueue(topic, brokerName, queueId), MessageQueueRouteState.Normal, 0, -1, -1, -1, brokerController.getBrokerAddr());
logicalQueuesInfo.put(0, Lists.newArrayList(queueRouteData1));
logicalQueuesInfo.updateQueueRouteDataByQueueId(queueRouteData1.getQueueId(), queueRouteData1);
// normal
{
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
// write only
queueRouteData1.setState(MessageQueueRouteState.WriteOnly);
{
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_NOT_FOUND);
}
// no message and redirect
queueRouteData1.setState(MessageQueueRouteState.ReadOnly);
queueRouteData1.setOffsetMax(460);
queueRouteData1.setFirstMsgTimeMillis(100);
queueRouteData1.setLastMsgTimeMillis(200);
LogicalQueueRouteData queueRouteData2 = new LogicalQueueRouteData(0, 460, new MessageQueue(topic, "broker2", 1), MessageQueueRouteState.Normal, 0, -1, -1, -1, brokerController.getBrokerAddr());
logicalQueuesInfo.get(0).add(queueRouteData2);
getMessageResult.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
when(messageStore.getCommitLogOffsetInQueue(eq(topic), eq(queueId), eq(460L - 1L))).thenReturn(1000L);
{
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_NOT_FOUND);
assertThat(response.getExtFields()).containsKey(MessageConst.PROPERTY_REDIRECT);
}
// same message queue has two routes
queueRouteData2.setState(MessageQueueRouteState.ReadOnly);
queueRouteData2.setOffsetMax(50);
queueRouteData2.setFirstMsgTimeMillis(300);
queueRouteData2.setLastMsgTimeMillis(400);
LogicalQueueRouteData queueRouteData3 = new LogicalQueueRouteData(0, 510, new MessageQueue(topic, queueRouteData2.getBrokerName(), queueId), MessageQueueRouteState.Normal, 460, -1, -1, -1, queueRouteData1.getBrokerAddr());
logicalQueuesInfo.get(0).add(queueRouteData3);
logicalQueuesInfo.updateQueueRouteDataByQueueId(queueRouteData3.getQueueId(), queueRouteData3);
{
GetMessageResult getMessageResult2 = createGetMessageResult();
getMessageResult2.setStatus(GetMessageStatus.FOUND);
getMessageResult2.setNextBeginOffset(460);
when(messageStore.getMessage(anyString(), eq(queueRouteData1.getTopic()), eq(queueRouteData1.getQueueId()), eq(456L), eq(4), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult2);
}
{
GetMessageResult getMessageResult2 = createGetMessageResult();
getMessageResult2.setStatus(GetMessageStatus.FOUND);
getMessageResult2.setNextBeginOffset(470);
lenient().when(messageStore.getMessage(anyString(), eq(queueRouteData1.getTopic()), eq(queueRouteData1.getQueueId()), eq(456L), intThat(i -> i > 4), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult2);
}
{
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(ofNullable(response.getExtFields()).orElse(new HashMap<>())).doesNotContainKey(MessageConst.PROPERTY_REDIRECT);
PullMessageResponseHeader header = (PullMessageResponseHeader) response.readCustomHeader();
assertThat(header.getNextBeginOffset()).isEqualTo(460);
}
{
when(messageStore.getMinPhyOffset()).thenReturn(100000L);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY);
assertThat(ofNullable(response.getExtFields()).orElse(new HashMap<>())).containsKey(MessageConst.PROPERTY_REDIRECT);
PullMessageResponseHeader header = (PullMessageResponseHeader) response.readCustomHeader();
assertThat(header.getNextBeginOffset()).isEqualTo(460);
}
}
private RemotingCommand createPullMsgCommand(int requestCode) {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setCommitOffset(123L);
......
......@@ -18,18 +18,30 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -43,6 +55,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -52,12 +65,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -90,6 +97,8 @@ public class SendMessageProcessorTest {
when(handlerContext.channel()).thenReturn(mockChannel);
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
sendMessageProcessor = new SendMessageProcessor(brokerController);
brokerController.getTopicConfigManager().updateTopicConfig(new TopicConfig(topic, 8, 8, PermName.PERM_WRITE|PermName.PERM_READ));
}
@Test
......@@ -220,6 +229,62 @@ public class SendMessageProcessorTest {
assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testProcessRequest_LogicalQueue() throws Exception {
when(messageStore.asyncPutMessage(any(MessageExtBrokerInner.class)))
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
LogicalQueuesInfoInBroker logicalQueuesInfo = brokerController.getTopicConfigManager().getOrCreateLogicalQueuesInfo(topic);
LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(0, 0, new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1), MessageQueueRouteState.Normal, 0, -1, -1, -1, brokerController.getBrokerAddr());
logicalQueuesInfo.put(0, Lists.newArrayList(queueRouteData1));
logicalQueuesInfo.updateQueueRouteDataByQueueId(queueRouteData1.getQueueId(), queueRouteData1);
SendMessageRequestHeader requestHeader = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
request.setBody(new byte[] {'a'});
request.makeCustomHeaderToNet();
// normal
RemotingCommand responseToReturn;
{
CompletableFuture<RemotingCommand> responseFuture = new CompletableFuture<>();
doAnswer(invocation -> {
responseFuture.complete(invocation.getArgument(0));
return null;
}).when(handlerContext).writeAndFlush(any(Object.class));
responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
if (responseToReturn == null) {
responseToReturn = responseFuture.get(3, TimeUnit.SECONDS);
}
}
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
// read only
queueRouteData1.setState(MessageQueueRouteState.ReadOnly);
responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
assertThat(responseToReturn.getRemark()).contains("not writable");
// read only and forward
logicalQueuesInfo.get(0).add(new LogicalQueueRouteData(0, 100, new MessageQueue(topic, "broker2", 1), MessageQueueRouteState.Normal, 0, -1, -1, -1, brokerController.getBrokerAddr()));
responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
assertThat(responseToReturn.getRemark()).contains("forward error");
// read only and redirect
requestHeader = (SendMessageRequestHeader) request.readCustomHeader();
requestHeader.setSysFlag(MessageSysFlag.LOGICAL_QUEUE_FLAG);
request.makeCustomHeaderToNet();
responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
assertThat(responseToReturn.getExtFields()).containsKey(MessageConst.PROPERTY_REDIRECT);
}
private RemotingCommand createSendTransactionMsgCommand(int requestCode) {
SendMessageRequestHeader header = createSendMsgRequestHeader();
int sysFlag = header.getSysFlag();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
import com.google.common.collect.Lists;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TopicConfigManagerTest {
@Mock
private DefaultMessageStore messageStore;
@Mock
private BrokerController brokerController;
private TopicConfigManager topicConfigManager;
private static final String topic = "FooBar";
private static final String broker1Name = "broker1";
private static final String broker1Addr = "127.0.0.1:12345";
private static final int queueId1 = 1;
private static final String broker2Name = "broker2";
private static final String broker2Addr = "127.0.0.2:12345";
private static final int queueId2 = 2;
@Before
public void before() {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName(broker1Name);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getMessageStore()).thenReturn(messageStore);
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir"));
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
this.topicConfigManager = new TopicConfigManager(brokerController);
this.topicConfigManager.getTopicConfigTable().put(topic, new TopicConfig(topic));
}
@After
public void after() throws Exception {
if (topicConfigManager != null) {
Files.deleteIfExists(Paths.get(topicConfigManager.configFilePath()));
}
}
@Test
public void logicalQueueCleanTest() {
LogicalQueuesInfoInBroker info = this.topicConfigManager.getOrCreateLogicalQueuesInfo(topic);
topicConfigManager.logicalQueueClean();
assertThat(info).isEmpty();
final int logicalQueueIndex = 0;
LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(logicalQueueIndex, 0, new MessageQueue(topic, broker1Name, queueId1), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker1Addr);
List<LogicalQueueRouteData> l = Lists.newArrayList(new LogicalQueueRouteData(queueRouteData1));
info.put(logicalQueueIndex, l);
topicConfigManager.logicalQueueClean();
assertThat(info.get(logicalQueueIndex)).isEqualTo(Collections.singletonList(queueRouteData1));
verify(messageStore, never()).getCommitLogOffsetInQueue(eq(topic), eq(queueId1), anyLong());
verify(messageStore, never()).getMinPhyOffset();
verify(brokerController, never()).registerIncrementBrokerData(ArgumentMatchers.<TopicConfig>argThat(arg -> topic.equals(arg.getTopicName())), any(DataVersion.class));
LogicalQueueRouteData queueRouteData2 = new LogicalQueueRouteData(logicalQueueIndex, 100, new MessageQueue(topic, broker2Name, queueId2), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
l.add(new LogicalQueueRouteData(queueRouteData2));
queueRouteData1 = l.get(0);
queueRouteData1.setState(MessageQueueRouteState.ReadOnly);
queueRouteData1.setOffsetMax(100);
queueRouteData1.setFirstMsgTimeMillis(200);
queueRouteData1.setLastMsgTimeMillis(300);
queueRouteData1 = new LogicalQueueRouteData(queueRouteData1);
LogicalQueueRouteData queueRouteData3 = new LogicalQueueRouteData(logicalQueueIndex, 200, new MessageQueue(topic, broker1Name, queueId1), MessageQueueRouteState.Normal, 100, -1, -1, -1, broker1Addr);
l.add(new LogicalQueueRouteData(queueRouteData3));
queueRouteData2 = l.get(1);
queueRouteData2.setState(MessageQueueRouteState.ReadOnly);
queueRouteData2.setOffsetMax(100);
queueRouteData2.setFirstMsgTimeMillis(400);
queueRouteData2.setLastMsgTimeMillis(500);
queueRouteData2 = new LogicalQueueRouteData(queueRouteData2);
when(messageStore.getCommitLogOffsetInQueue(eq(topic), eq(queueId1), eq(queueRouteData1.getOffsetMax() - 1))).thenReturn(1000L);
when(messageStore.getMinPhyOffset()).thenReturn(0L);
topicConfigManager.logicalQueueClean();
assertThat(info.get(logicalQueueIndex)).isEqualTo(Arrays.asList(queueRouteData1, queueRouteData2, queueRouteData3));
verify(messageStore).getCommitLogOffsetInQueue(eq(topic), eq(queueId1), eq(queueRouteData1.getOffsetMax() - 1));
verify(messageStore).getMinPhyOffset();
verify(brokerController, never()).registerIncrementBrokerData(ArgumentMatchers.<TopicConfig>argThat(arg -> topic.equals(arg.getTopicName())), any(DataVersion.class));
when(messageStore.getMinPhyOffset()).thenReturn(2000L);
topicConfigManager.logicalQueueClean();
assertThat(info.get(logicalQueueIndex)).isEqualTo(Collections.singletonList(queueRouteData3));
verify(brokerController).registerIncrementBrokerData(ArgumentMatchers.<TopicConfig>argThat(arg -> topic.equals(arg.getTopicName())), any(DataVersion.class));
}
}
\ No newline at end of file
......@@ -66,6 +66,11 @@
</encoder>
</appender>
<logger name="STDOUT" additivity="false">
<level value="INFO"/>
<appender-ref ref="STDOUT"/>
</logger>
<logger name="RocketmqTools" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqToolsAppender"/>
......
......@@ -48,5 +48,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -56,7 +56,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), false);
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
......@@ -27,8 +28,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -50,7 +49,11 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -346,7 +349,9 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
boolean includeLogicalQueuesInfo = (requestHeader.getSysFlag() & MessageSysFlag.LOGICAL_QUEUE_FLAG) > 0;
TopicRouteDataNameSrv topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic(), includeLogicalQueuesInfo);
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
......@@ -356,6 +361,16 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
topicRouteData.setOrderTopicConf(orderTopicConf);
}
Set<Integer> logicalQueueIdsFilter = requestHeader.getLogicalQueueIdsFilter();
if (logicalQueueIdsFilter != null) {
LogicalQueuesInfoUnordered logicalQueuesInfo = topicRouteData.getLogicalQueuesInfoUnordered();
if (logicalQueuesInfo != null) {
LogicalQueuesInfoUnordered filtered = new LogicalQueuesInfoUnordered(logicalQueueIdsFilter.size());
logicalQueueIdsFilter.forEach(integer -> filtered.put(integer, logicalQueuesInfo.get(integer)));
topicRouteData.setLogicalQueuesInfoUnordered(filtered);
}
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.namesrv.routeinfo;
import io.netty.channel.Channel;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -24,26 +25,32 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
......@@ -54,6 +61,7 @@ public class RouteInfoManager {
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final ConcurrentMap<String/* topic */, LogicalQueuesInfoUnordered> logicalQueuesInfoTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
......@@ -61,6 +69,7 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
this.logicalQueuesInfoTable = new ConcurrentHashMap<>(1024);
}
public byte[] getAllClusterInfo() {
......@@ -148,18 +157,28 @@ public class RouteInfoManager {
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigWrapper.getLogicalQueuesInfoMap();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
if (logicalQueuesInfoMap != null) {
long startTime = System.nanoTime();
for (Map.Entry<String, LogicalQueuesInfo> entry : logicalQueuesInfoMap.entrySet()) {
String topicName = entry.getKey();
LogicalQueuesInfoUnordered logicalQueuesInfo = ConcurrentHashMapUtil.computeIfAbsent(this.logicalQueuesInfoTable, topicName, ignore -> new LogicalQueuesInfoUnordered());
mergeLogicalQueuesInfo(brokerName, topicName, logicalQueuesInfo, entry.getValue());
}
log.debug("mergeQueueRouteDataTable topic={} time={}ns", System.nanoTime() - startTime);
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
topicConfigWrapper != null ? topicConfigWrapper.getDataVersion() : new DataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
......@@ -371,8 +390,12 @@ public class RouteInfoManager {
}
}
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
public TopicRouteDataNameSrv pickupTopicRouteData(final String topic) {
return pickupTopicRouteData(topic, false);
}
public TopicRouteDataNameSrv pickupTopicRouteData(final String topic, boolean includeLogicalQueuesInfo) {
TopicRouteDataNameSrv topicRouteData = new TopicRouteDataNameSrv();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
......@@ -420,6 +443,10 @@ public class RouteInfoManager {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
if (includeLogicalQueuesInfo) {
topicRouteData.setLogicalQueuesInfoUnordered(logicalQueuesInfoTable.get(topic));
}
return topicRouteData;
}
......@@ -750,6 +777,34 @@ public class RouteInfoManager {
return topicList.encode();
}
private static void mergeLogicalQueuesInfo(String brokerName, String topicName,
LogicalQueuesInfoUnordered logicalQueuesInfoInNamesrv,
LogicalQueuesInfo logicalQueuesInfoFromBroker) {
Set<LogicalQueuesInfoUnordered.Key> newKeys = logicalQueuesInfoFromBroker.values()
.stream()
.flatMap(Collection::stream)
.filter(v -> Objects.equals(brokerName, v.getBrokerName()))
.map(v -> new LogicalQueuesInfoUnordered.Key(null, v.getQueueId(), v.getOffsetDelta()))
.collect(Collectors.toSet());
logicalQueuesInfoInNamesrv.values().forEach(m ->
m.values().removeIf(queueRouteData ->
Objects.equals(brokerName, queueRouteData.getBrokerName()) &&
!newKeys.contains(new LogicalQueuesInfoUnordered.Key(null, queueRouteData.getQueueId(), queueRouteData.getOffsetDelta()))));
logicalQueuesInfoFromBroker.forEach((logicalQueueId, queueRouteDataListFromBroker) -> {
if (logicalQueueId == null) {
log.warn("queueRouteDataTable topic {} contains null logicalQueueId: {}", topicName, logicalQueuesInfoFromBroker);
return;
}
queueRouteDataListFromBroker.stream()
.filter(queueRouteDataFromBroker -> Objects.equals(brokerName, queueRouteDataFromBroker.getBrokerName()))
.forEach(queueRouteDataFromBroker ->
ConcurrentHashMapUtil.computeIfAbsent(logicalQueuesInfoInNamesrv, logicalQueueId, ignored -> new ConcurrentHashMap<>(queueRouteDataListFromBroker.size()))
.put(new LogicalQueuesInfoUnordered.Key(brokerName, queueRouteDataFromBroker.getQueueId(), queueRouteDataFromBroker.getOffsetDelta()),
queueRouteDataFromBroker)
);
});
}
}
class BrokerLiveInfo {
......
......@@ -16,16 +16,22 @@
*/
package org.apache.rocketmq.namesrv.processor;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -34,14 +40,23 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfoUnordered;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteDataNameSrv;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
......@@ -184,6 +199,98 @@ public class DefaultRequestProcessorTest {
.contains(new HashMap.SimpleEntry("broker", broker));
}
@Test
public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
String cluster = "cluster";
String broker1Name = "broker1";
String broker1Addr = "10.10.1.1";
String broker2Name = "broker2";
String broker2Addr = "10.10.1.2";
String topic = "foobar";
LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 10, 100, 100, broker1Addr);
{
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
header.setBrokerName(broker1Name);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.REGISTER_BROKER, header);
request.addExtField("brokerName", broker1Name);
request.addExtField("brokerAddr", broker1Addr);
request.addExtField("clusterName", cluster);
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(Collections.singletonMap(0, Lists.newArrayList(
queueRouteData1
)))));
topicConfigSerializeWrapper.setDataVersion(new DataVersion());
request.setBody(RemotingSerializable.encode(topicConfigSerializeWrapper));
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
}
LogicalQueueRouteData queueRouteData2 = new LogicalQueueRouteData(0, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
LogicalQueueRouteData queueRouteData3 = new LogicalQueueRouteData(1, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
{
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
header.setBrokerName(broker2Name);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.REGISTER_BROKER, header);
request.addExtField("brokerName", broker2Name);
request.addExtField("brokerAddr", broker2Addr);
request.addExtField("clusterName", cluster);
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(ImmutableMap.of(
0, Collections.singletonList(queueRouteData2),
1, Collections.singletonList(queueRouteData3)
))));
topicConfigSerializeWrapper.setDataVersion(new DataVersion());
request.setBody(RemotingSerializable.encode(topicConfigSerializeWrapper));
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
}
{
GetRouteInfoRequestHeader header = new GetRouteInfoRequestHeader();
header.setTopic(topic);
header.setSysFlag(MessageSysFlag.LOGICAL_QUEUE_FLAG);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, header);
request.makeCustomHeaderToNet();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
TopicRouteDataNameSrv topicRouteDataNameSrv = JSON.parseObject(response.getBody(), TopicRouteDataNameSrv.class);
assertThat(topicRouteDataNameSrv).isNotNull();
LogicalQueuesInfoUnordered logicalQueuesInfoUnordered = new LogicalQueuesInfoUnordered();
logicalQueuesInfoUnordered.put(0, ImmutableMap.of(
new LogicalQueuesInfoUnordered.Key(queueRouteData1.getBrokerName(), queueRouteData1.getQueueId(), queueRouteData1.getOffsetDelta()), queueRouteData1,
new LogicalQueuesInfoUnordered.Key(queueRouteData2.getBrokerName(), queueRouteData2.getQueueId(), queueRouteData2.getOffsetDelta()), queueRouteData2
));
logicalQueuesInfoUnordered.put(1, ImmutableMap.of(new LogicalQueuesInfoUnordered.Key(queueRouteData3.getBrokerName(), queueRouteData3.getQueueId(), queueRouteData3.getOffsetDelta()), queueRouteData3));
assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
}
}
@Test
public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.srvutil;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
public class ConcurrentHashMapUtil {
private static final boolean IS_JDK8;
static {
// Java 8 or lower: 1.6.0_23, 1.7.0, 1.7.0_80, 1.8.0_211
// Java 9 or higher: 9.0.1, 11.0.4, 12, 12.0.1
IS_JDK8 = System.getProperty("java.version").startsWith("1.8.");
}
private ConcurrentHashMapUtil() {
}
/**
* A temporary workaround for Java 8 specific performance issue JDK-8161372 .<br> Use implementation of
* ConcurrentMap.computeIfAbsent instead.
*
* @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a>
*/
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) {
if (IS_JDK8) {
V v, newValue;
return ((v = map.get(key)) == null &&
(newValue = func.apply(key)) != null &&
(v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
} else {
return map.computeIfAbsent(key, func);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
public interface CleanFilesHook {
void execute(DefaultMessageStore defaultMessageStore, long deleteCount);
String getName();
}
......@@ -140,7 +140,17 @@ public class CommitLog {
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
return deleteExpiredFile(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately, 0);
}
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately,
final int deleteFileBatchMax
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately, deleteFileBatchMax);
}
/**
......
......@@ -28,12 +28,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -118,6 +120,8 @@ public class DefaultMessageStore implements MessageStore {
private final ScheduledExecutorService diskCheckScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread"));
private final List<CleanFilesHook> cleanFilesHooks = new CopyOnWriteArrayList<>();
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
......@@ -720,10 +724,20 @@ public class DefaultMessageStore implements MessageStore {
}
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long offset = logic.getMaxOffsetInQueue();
return offset;
return getMaxOffsetInQueue(topic, queueId, true);
}
public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
if (committed) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMaxOffsetInQueue();
}
} else {
Long offset = this.commitLog.getTopicQueueTable().get(topic + "-" + queueId);
if (offset != null) {
return offset;
}
}
return 0;
......@@ -1301,12 +1315,23 @@ public class DefaultMessageStore implements MessageStore {
log.info(fileName + (result ? " create OK" : " already exists"));
}
public void registerCleanFileHook(CleanFilesHook hook) {
this.cleanFilesHooks.add(hook);
}
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
long deleteCount = DefaultMessageStore.this.cleanFilesPeriodically();
DefaultMessageStore.this.cleanFilesHooks.forEach(hook -> {
try {
hook.execute(DefaultMessageStore.this, deleteCount);
} catch (Throwable t) {
log.error("execute CleanFilesHook[{}] error", hook.getName(), t);
}
});
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
......@@ -1351,9 +1376,11 @@ public class DefaultMessageStore implements MessageStore {
}, 1000L, 10000L, TimeUnit.MILLISECONDS);
}
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
private long cleanFilesPeriodically() {
long deleteCount = 0L;
deleteCount += this.cleanCommitLogService.run();
deleteCount += this.cleanConsumeQueueService.run();
return deleteCount;
}
private void checkSelf() {
......@@ -1611,17 +1638,19 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
}
public void run() {
public long run() {
int deleteCount = 0;
try {
this.deleteExpiredFiles();
deleteCount = this.deleteExpiredFiles();
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
return deleteCount;
}
private void deleteExpiredFiles() {
private int deleteExpiredFiles() {
int deleteCount = 0;
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
......@@ -1654,6 +1683,7 @@ public class DefaultMessageStore implements MessageStore {
log.warn("disk space will be full soon, but delete file failed.");
}
}
return deleteCount;
}
private void redeleteHangedFile() {
......@@ -1775,17 +1805,20 @@ public class DefaultMessageStore implements MessageStore {
class CleanConsumeQueueService {
private long lastPhysicalMinOffset = 0;
public void run() {
public long run() {
long deleteCount = 0;
try {
this.deleteExpiredFiles();
deleteCount = this.deleteExpiredFiles();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
return deleteCount;
}
private void deleteExpiredFiles() {
private long deleteExpiredFiles() {
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
long deleteCountSum = 0L;
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
......@@ -1795,7 +1828,7 @@ public class DefaultMessageStore implements MessageStore {
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
int deleteCount = logic.deleteExpiredFile(minOffset);
deleteCountSum += deleteCount;
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
......@@ -1807,6 +1840,7 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
return deleteCountSum;
}
public String getServiceName() {
......
......@@ -56,8 +56,8 @@ public class MappedFileQueue {
}
public void checkSelf() {
if (!this.mappedFiles.isEmpty()) {
List<MappedFile> mappedFiles = new ArrayList<>(this.mappedFiles);
if (!mappedFiles.isEmpty()) {
Iterator<MappedFile> iterator = mappedFiles.iterator();
MappedFile pre = null;
while (iterator.hasNext()) {
......@@ -238,21 +238,8 @@ public class MappedFileQueue {
}
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
MappedFile[] mappedFiles = this.mappedFiles.toArray(new MappedFile[0]);
return mappedFiles.length == 0 ? null : mappedFiles[mappedFiles.length - 1];
}
public boolean resetOffset(long offset) {
......@@ -336,7 +323,11 @@ public class MappedFileQueue {
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
final boolean cleanImmediately,
int deleteFileBatchMax) {
if (deleteFileBatchMax == 0) {
deleteFileBatchMax = DELETE_FILES_BATCH_MAX;
}
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
......@@ -354,7 +345,7 @@ public class MappedFileQueue {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
if (files.size() >= deleteFileBatchMax) {
break;
}
......
......@@ -115,6 +115,16 @@ public interface MessageStore {
*/
long getMaxOffsetInQueue(final String topic, final int queueId);
/**
* Get maximum offset of the topic queue.
*
* @param topic Topic name.
* @param queueId Queue ID.
* @param committed If only count committed
* @return Maximum offset at present.
*/
long getMaxOffsetInQueue(final String topic, final int queueId, final boolean committed);
/**
* Get the minimum offset of the topic queue.
*
......
......@@ -225,7 +225,7 @@ public class MappedFileQueueTest {
mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2);
}
}
mappedFileQueue.deleteExpiredFileByTime(expiredTime, 0, 0, false);
mappedFileQueue.deleteExpiredFileByTime(expiredTime, 0, 0, false, Integer.MAX_VALUE);
assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
}
......
......@@ -20,6 +20,7 @@ package org.apache.rocketmq.test.util;
import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
......@@ -60,7 +61,7 @@ public class MQAdmin {
}
}
mqAdminExt.shutdown();
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
return createResult;
}
......@@ -99,7 +100,7 @@ public class MQAdmin {
createResult = false;
e.printStackTrace();
}
mqAdminExt.shutdown();
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
return createResult;
}
......@@ -113,7 +114,7 @@ public class MQAdmin {
} catch (Exception e) {
e.printStackTrace();
}
mqAdminExt.shutdown();
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
return clusterInfo;
}
......@@ -159,7 +160,7 @@ public class MQAdmin {
createResult = false;
e.printStackTrace();
}
mqAdminExt.shutdown();
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
}
}
......@@ -17,13 +17,23 @@
package org.apache.rocketmq.test.base;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
......@@ -36,21 +46,28 @@ import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import static org.awaitility.Awaitility.await;
public class BaseConf {
public static String nsAddr;
protected static String broker1Name;
protected static String broker2Name;
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
protected static int consumeTime = 2 * 60 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
protected static List<Object> mqClients = new ArrayList<Object>();
protected static boolean debug = false;
private static Logger log = Logger.getLogger(BaseConf.class);
public final static String nsAddr;
protected final static String broker1Name;
protected final static String broker2Name;
protected final static String clusterName;
protected final static int brokerNum;
protected final static int waitTime = 5;
protected final static int consumeTime = 2 * 60 * 1000;
protected final static int QUEUE_NUMBERS = 8;
protected final static NamesrvController namesrvController;
protected final static BrokerController brokerController1;
protected final static BrokerController brokerController2;
protected final static List<BrokerController> brokerControllerList;
protected final static Map<String, BrokerController> brokerControllerMap;
protected final static List<Object> mqClients = new ArrayList<Object>();
protected final static boolean debug = false;
private final static Logger log = Logger.getLogger(BaseConf.class);
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
......@@ -62,14 +79,32 @@ public class BaseConf {
broker1Name = brokerController1.getBrokerConfig().getBrokerName();
broker2Name = brokerController2.getBrokerConfig().getBrokerName();
brokerNum = 2;
brokerControllerList = ImmutableList.of(brokerController1, brokerController2);
brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
}
public BaseConf() {
}
// This method can't be placed in the static block of BaseConf, which seems to lead to a strange dead lock.
public static void waitBrokerRegistered(final String nsAddr, final String clusterName) {
final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
mqAdminExt.setNamesrvAddr(nsAddr);
try {
mqAdminExt.start();
await().atMost(30, TimeUnit.SECONDS).until(() -> {
List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
return brokerDatas.size() == brokerNum;
});
} catch (MQClientException e) {
log.error("init failed, please check BaseConf");
}
ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
}
public static String initTopic() {
String topic = MQRandomUtils.getRandomTopic();
String topic = "tt-" + MQRandomUtils.getRandomTopic();
IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
return topic;
......@@ -157,18 +192,26 @@ public class BaseConf {
}
public static void shutdown() {
try {
for (Object mqClient : mqClients) {
if (mqClient instanceof AbstractMQProducer) {
((AbstractMQProducer) mqClient).shutdown();
ImmutableList<Object> mqClients = ImmutableList.copyOf(BaseConf.mqClients);
BaseConf.mqClients.clear();
shutdown(mqClients);
}
} else {
((AbstractMQConsumer) mqClient).shutdown();
}
public static void shutdown(List<Object> mqClients) {
mqClients.forEach(mqClient -> ForkJoinPool.commonPool().execute(() -> {
if (mqClient instanceof AbstractMQProducer) {
((AbstractMQProducer) mqClient).shutdown();
} else if (mqClient instanceof AbstractMQConsumer) {
((AbstractMQConsumer) mqClient).shutdown();
} else if (mqClient instanceof MQAdminExt) {
((MQAdminExt) mqClient).shutdown();
} else if (mqClient instanceof MQProducer) {
((MQProducer) mqClient).shutdown();
} else if (mqClient instanceof MQPullConsumer) {
((MQPullConsumer) mqClient).shutdown();
} else if (mqClient instanceof MQPushConsumer) {
((MQPushConsumer) mqClient).shutdown();
}
} catch (Exception e) {
e.printStackTrace();
}
}));
}
}
......@@ -47,7 +47,7 @@ public class IntegrationTestBase {
protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>();
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
protected static int topicCreateTime = 30 * 1000;
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
public static volatile int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
protected static final int INDEX_NUM = 1000;
private static final AtomicInteger port = new AtomicInteger(40000);
......@@ -183,7 +183,7 @@ public class IntegrationTestBase {
}
public static boolean initTopic(String topic, String nsAddr, String clusterName) {
return initTopic(topic, nsAddr, clusterName, 8);
return initTopic(topic, nsAddr, clusterName, BaseConf.QUEUE_NUMBERS);
}
public static void deleteFile(File file) {
......
......@@ -64,5 +64,9 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
......@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
......@@ -49,6 +50,9 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -96,7 +100,8 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
TopicConfig examineTopicConfig(final String addr, final String topic);
TopicConfig examineTopicConfig(final String addr,
final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -295,4 +300,28 @@ public interface MQAdminExt extends MQAdmin {
MessageRequestMode mode, final int popWorkGroupSize, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;
void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException;
LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId, int logicalQueueIdx,
MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic, int logicalQueueIdx,
MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
}
......@@ -26,6 +26,10 @@ public class SubCommandException extends Exception {
super(msg);
}
public SubCommandException(String format, Object... args) {
super(String.format(format, args));
}
/**
* @param msg Message.
* @param cause Cause.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册