提交 cabc3838 编写于 作者: D dongeforever

Finish the remapping command

上级 e9cafe9f
......@@ -88,4 +88,22 @@ public class LogicQueueMappingItem {
public long getTimeOfEnd() {
return timeOfEnd;
}
public void setLogicOffset(long logicOffset) {
this.logicOffset = logicOffset;
}
@Override
public String toString() {
return "LogicQueueMappingItem{" +
"gen=" + gen +
", queueId=" + queueId +
", bname='" + bname + '\'' +
", logicOffset=" + logicOffset +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", timeOfStart=" + timeOfStart +
", timeOfEnd=" + timeOfEnd +
'}';
}
}
......@@ -227,6 +227,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineTopicStats(topic);
}
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return defaultMQAdminExtImpl.examineTopicStats(brokerAddr, topic);
}
@Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.defaultMQAdminExtImpl.fetchAllTopicList();
......
......@@ -274,6 +274,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return topicStatsTable;
}
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
}
@Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
......@@ -1092,7 +1099,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
}
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
......@@ -108,6 +108,9 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
TopicStatsTable examineTopicStats(String brokerAddr, final String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
TopicList fetchTopicsByCLuster(
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tools.command.topic;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
......@@ -27,6 +28,9 @@ import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
......@@ -200,8 +204,8 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<>();
//the following logic will make sure that, for one broker, only "take in" or "take out" queues
//It can't, take in some queues but alse take out some queues.
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
globalIdMap.forEach((queueId, mappingOne) -> {
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
......@@ -251,6 +255,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
......@@ -260,13 +265,53 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
//decide the new offset
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
TopicConfigAndQueueMapping configMapping = existedTopicConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
} catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册