...
 
Commits (9)
    https://gitcode.net/apacherocketmq/rocketmq/-/commit/45946d475697f0b78e4d3b31105f75bb76e7ee19 Ignore the existed queueId in static topi creation 2021-11-18T10:15:44+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/25a588b81fd58dccb1e5f81e8a65859570a76a89 Init the remapping command 2021-11-18T12:00:15+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/e9cafe9fef748b92ea4d020e16bb95192ffaadff Polish the remapping logic 2021-11-18T14:36:47+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/cabc3838ff68792498631da9e133c711167ff922 Finish the remapping command 2021-11-18T15:13:41+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/ffcc54897f76bc6754673fd686f170ea17fffec8 Refactor the code package for static topic 2021-11-18T15:20:16+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/82364fc28592980c8215edfaf4e20e254c39a861 Polish the tools 2021-11-18T15:52:54+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/165e133a64ae34fa992223f3472f6398d3c9cb2a Enable to run from file for createStaticTopic command 2021-11-18T16:57:19+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/f225fd05de44f5e46dc5da34a27fe0b1e947f0ef Polish the code for command 2021-11-18T17:31:41+08:00 dongeforever dongeforever@apache.org https://gitcode.net/apacherocketmq/rocketmq/-/commit/f05cfb9295bc8439ee65f5d364e5f47ab7330e65 Polish the static topic commands 2021-11-18T17:36:58+08:00 dongeforever dongeforever@apache.org
...@@ -95,7 +95,7 @@ import org.apache.rocketmq.common.DataVersion; ...@@ -95,7 +95,7 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
......
...@@ -112,6 +112,10 @@ import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServer ...@@ -112,6 +112,10 @@ import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServer
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
......
...@@ -20,7 +20,7 @@ import io.netty.channel.ChannelHandlerContext; ...@@ -20,7 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List; import java.util.List;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
......
...@@ -36,12 +36,12 @@ import org.apache.rocketmq.broker.longpolling.PullRequest; ...@@ -36,12 +36,12 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
......
...@@ -28,13 +28,13 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -28,13 +28,13 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
......
...@@ -22,12 +22,12 @@ import org.apache.rocketmq.broker.BrokerController; ...@@ -22,12 +22,12 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
......
...@@ -28,7 +28,7 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; ...@@ -28,7 +28,7 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
......
...@@ -57,8 +57,8 @@ import org.apache.rocketmq.common.MQVersion; ...@@ -57,8 +57,8 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
......
...@@ -61,7 +61,6 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager; ...@@ -61,7 +61,6 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.ExpressionType;
......
...@@ -33,8 +33,7 @@ import java.util.zip.InflaterInputStream; ...@@ -33,8 +33,7 @@ import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
......
...@@ -17,13 +17,13 @@ ...@@ -17,13 +17,13 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TopicConfigSerializeWrapper extends RemotingSerializable { public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable = private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(); new ConcurrentHashMap<String, TopicConfig>();
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicQueueMappingBody extends RemotingSerializable { public class TopicQueueMappingBody extends RemotingSerializable {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body; package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.Map; import java.util.Map;
......
...@@ -25,7 +25,7 @@ import java.util.HashMap; ...@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicRouteData extends RemotingSerializable { public class TopicRouteData extends RemotingSerializable {
......
package org.apache.rocketmq.common.rpc; package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
......
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
public class LogicQueueMappingItem { public class LogicQueueMappingItem {
...@@ -88,4 +88,22 @@ public class LogicQueueMappingItem { ...@@ -88,4 +88,22 @@ public class LogicQueueMappingItem {
public long getTimeOfEnd() { public long getTimeOfEnd() {
return timeOfEnd; return timeOfEnd;
} }
public void setLogicOffset(long logicOffset) {
this.logicOffset = logicOffset;
}
@Override
public String toString() {
return "LogicQueueMappingItem{" +
"gen=" + gen +
", queueId=" + queueId +
", bname='" + bname + '\'' +
", logicOffset=" + logicOffset +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", timeOfStart=" + timeOfStart +
", timeOfEnd=" + timeOfEnd +
'}';
}
} }
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.common.TopicConfig;
public class TopicConfigAndQueueMapping extends TopicConfig { public class TopicConfigAndQueueMapping extends TopicConfig {
private TopicQueueMappingDetail mappingDetail; private TopicQueueMappingDetail mappingDetail;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicQueueMappingOne extends RemotingSerializable {
String topic; // redundant field
String bname; //identify the hosted broker name
Integer globalId;
ImmutableList<LogicQueueMappingItem> items;
public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList<LogicQueueMappingItem> items) {
this.topic = topic;
this.bname = bname;
this.globalId = globalId;
this.items = items;
}
public String getTopic() {
return topic;
}
public String getBname() {
return bname;
}
public Integer getGlobalId() {
return globalId;
}
public ImmutableList<LogicQueueMappingItem> getItems() {
return items;
}
}
...@@ -14,12 +14,15 @@ ...@@ -14,12 +14,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.MixAll;
import java.io.File;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
...@@ -103,7 +106,63 @@ public class TopicQueueMappingUtils { ...@@ -103,7 +106,63 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum); return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
} }
public static Map<Integer, ImmutableList<LogicQueueMappingItem>> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) { public static List<TopicQueueMappingDetail> getMappingDetailFromConfig(Collection<TopicConfigAndQueueMapping> configs) {
List<TopicQueueMappingDetail> detailList = new ArrayList<TopicQueueMappingDetail>();
for (TopicConfigAndQueueMapping configMapping : configs) {
if (configMapping.getMappingDetail() != null) {
detailList.add(configMapping.getMappingDetail());
}
}
return detailList;
}
public static Map.Entry<Long, Integer> checkConsistenceOfTopicConfigAndQueueMapping(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
if (brokerConfigMap == null
|| brokerConfigMap.isEmpty()) {
return null;
}
//make sure it it not null
long maxEpoch = -1;
int maxNum = -1;
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
TopicConfigAndQueueMapping configMapping = entry.getValue();
if (configMapping.getMappingDetail() == null) {
throw new RuntimeException("Mapping info should not be null in broker " + broker);
}
TopicQueueMappingDetail mappingDetail = configMapping.getMappingDetail();
if (!broker.equals(mappingDetail.getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", broker, mappingDetail.getBname()));
}
if (mappingDetail.isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + broker);
}
if (!configMapping.getTopicName().equals(mappingDetail.getTopic())) {
throw new RuntimeException("The topic name is inconsistent in broker " + broker);
}
if (topic != null
&& !topic.equals(mappingDetail.getTopic())) {
throw new RuntimeException("The topic name is not match for broker " + broker);
}
if (maxEpoch != -1
&& maxEpoch != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpoch, mappingDetail.getEpoch(), mappingDetail.getBname()));
} else {
maxEpoch = mappingDetail.getEpoch();
}
if (maxNum != -1
&& maxNum != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxNum, mappingDetail.getTotalQueues(), mappingDetail.getBname()));
} else {
maxNum = mappingDetail.getTotalQueues();
}
}
return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
}
public static Map<Integer, TopicQueueMappingOne> checkAndBuildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() { Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
@Override @Override
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) { public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
...@@ -111,8 +170,12 @@ public class TopicQueueMappingUtils { ...@@ -111,8 +170,12 @@ public class TopicQueueMappingUtils {
} }
}); });
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<Integer, ImmutableList<LogicQueueMappingItem>>(); int maxNum = 0;
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) { for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
if (mappingDetail.totalQueues > maxNum) {
maxNum = mappingDetail.totalQueues;
}
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) { for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey(); Integer globalid = entry.getKey();
String leaderBrokerName = getLeaderBroker(entry.getValue()); String leaderBrokerName = getLeaderBroker(entry.getValue());
...@@ -125,7 +188,17 @@ public class TopicQueueMappingUtils { ...@@ -125,7 +188,17 @@ public class TopicQueueMappingUtils {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname())); throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
} }
} else { } else {
globalIdMap.put(globalid, entry.getValue()); globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue()));
}
}
}
if (checkConsistence) {
if (maxNum != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxNum, globalIdMap.size()));
}
for (int i = 0; i < maxNum; i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
} }
} }
} }
...@@ -139,4 +212,23 @@ public class TopicQueueMappingUtils { ...@@ -139,4 +212,23 @@ public class TopicQueueMappingUtils {
assert items.size() > 0; assert items.size() > 0;
return items.get(items.size() - 1); return items.get(items.size() - 1);
} }
public static String writeToTemp(TopicRemappingDetailWrapper wrapper, boolean after) {
String topic = wrapper.getTopic();
String data = wrapper.toJson();
String suffix = TopicRemappingDetailWrapper.SUFFIX_BEFORE;
if (after) {
suffix = TopicRemappingDetailWrapper.SUFFIX_AFTER;
}
String fileName = System.getProperty("java.io.tmpdir") + File.separator + topic + "-" + wrapper.getEpoch() + suffix;
try {
MixAll.string2File(data, fileName);
return fileName;
} catch (Exception e) {
throw new RuntimeException("write file failed " + fileName,e);
}
}
} }
package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class TopicRemappingDetailWrapper extends RemotingSerializable {
public static final String TYPE_CREATE_OR_UPDATE = "CREATE_OR_UPDATE";
public static final String TYPE_REMAPPING = "REMAPPING";
public static final String SUFFIX_BEFORE = ".before";
public static final String SUFFIX_AFTER = ".after";
private final String topic;
private final String type;
private final long epoch;
private Map<Integer, String> expectedIdToBroker = new HashMap<Integer, String>();
private Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<String, TopicConfigAndQueueMapping>();
private Set<String> brokerToMapIn = new HashSet<String>();
private Set<String> brokerToMapOut = new HashSet<String>();
public TopicRemappingDetailWrapper(String topic, String type, long epoch, Map<Integer, String> expectedIdToBroker, Map<String, TopicConfigAndQueueMapping> brokerConfigMap) {
this.topic = topic;
this.type = type;
this.epoch = epoch;
this.expectedIdToBroker = expectedIdToBroker;
this.brokerConfigMap = brokerConfigMap;
}
public String getTopic() {
return topic;
}
public String getType() {
return type;
}
public long getEpoch() {
return epoch;
}
public Map<Integer, String> getExpectedIdToBroker() {
return expectedIdToBroker;
}
public Map<String, TopicConfigAndQueueMapping> getBrokerConfigMap() {
return brokerConfigMap;
}
public Set<String> getBrokerToMapIn() {
return brokerToMapIn;
}
public void setBrokerToMapIn(Set<String> brokerToMapIn) {
this.brokerToMapIn = brokerToMapIn;
}
public Set<String> getBrokerToMapOut() {
return brokerToMapOut;
}
public void setBrokerToMapOut(Set<String> brokerToMapOut) {
this.brokerToMapOut = brokerToMapOut;
}
}
package org.apache.rocketmq.common; package org.apache.rocketmq.common.statictopic;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.util.Map; import java.util.Map;
public class TopicQueueMappingTest { public class TopicQueueMappingTest {
@Test
public void testWriteToFile() {
System.out.println(System.getProperty("java.io.tmpdir"));
System.out.println(File.separator);
}
@Test @Test
public void testJsonSerialize() { public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L); LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
...@@ -28,7 +38,7 @@ public class TopicQueueMappingTest { ...@@ -28,7 +38,7 @@ public class TopicQueueMappingTest {
Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart()); Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd()); Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01"); TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem)); mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail); String mappingDetailJson = JSON.toJSONString(mappingDetail);
......
...@@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; ...@@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingInfo; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
......
...@@ -28,7 +28,7 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -28,7 +28,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
...@@ -227,6 +227,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -227,6 +227,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineTopicStats(topic); return defaultMQAdminExtImpl.examineTopicStats(topic);
} }
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return defaultMQAdminExtImpl.examineTopicStats(brokerAddr, topic);
}
@Override @Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.defaultMQAdminExtImpl.fetchAllTopicList(); return this.defaultMQAdminExtImpl.fetchAllTopicList();
......
...@@ -42,7 +42,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -42,7 +42,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.OffsetWrapper;
...@@ -274,6 +274,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -274,6 +274,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return topicStatsTable; return topicStatsTable;
} }
@Override
public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis);
}
@Override @Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis); return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
...@@ -1092,7 +1099,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -1092,7 +1099,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException { public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail); this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail);
} }
@Override @Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
......
...@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.admin.TopicStatsTable;
...@@ -108,6 +108,9 @@ public interface MQAdminExt extends MQAdmin { ...@@ -108,6 +108,9 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws RemotingException, MQClientException, InterruptedException, final String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException; MQBrokerException;
TopicStatsTable examineTopicStats(String brokerAddr, final String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException; TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
TopicList fetchTopicsByCLuster( TopicList fetchTopicsByCLuster(
......
...@@ -87,7 +87,7 @@ public class MigrateTopicLogicalQueueCommand implements SubCommand { ...@@ -87,7 +87,7 @@ public class MigrateTopicLogicalQueueCommand implements SubCommand {
String toBrokerName, String toBrokerName,
Long forceDelta) throws RemotingException, MQBrokerException, InterruptedException, SubCommandException, MQClientException { Long forceDelta) throws RemotingException, MQBrokerException, InterruptedException, SubCommandException, MQClientException {
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic); TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
LogicalQueuesInfo logicalQueuesInfo = topicRouteInfo.getLogicalQueuesInfo(); LogicalQueuesInfo logicalQueuesInfo = null; /*topicRouteInfo.getLogicalQueuesInfo();*/
if (logicalQueuesInfo == null) { if (logicalQueuesInfo == null) {
throw new SubCommandException("topic not enabled logical queue"); throw new SubCommandException("topic not enabled logical queue");
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
public class RemappingStaticTopicSubCommand implements SubCommand {
@Override
public String commandName() {
return "updateStaticTopic";
}
@Override
public String commandDesc() {
return "Update or create static topic, which has fixed number of queues";
}
@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = null;
opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated");
optionGroup.addOption(opt);
opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt.setRequired(false);
options.addOption(opt);
return options;
}
public void executeFromFile(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try {
String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doRemapping(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
public void doRemapping(String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap,
ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step2: forbid the write of old leader
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
//Step3: decide the logic offset
for (String broker: brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicStatsTable statsTable = defaultMQAdminExt.examineTopicStats(addr, topic);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker);
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) {
ImmutableList<LogicQueueMappingItem> items = entry.getValue();
Integer globalId = entry.getKey();
if (items.size() < 2) {
continue;
}
LogicQueueMappingItem newLeader = items.get(items.size() - 1);
LogicQueueMappingItem oldLeader = items.get(items.size() - 2);
if (newLeader.getLogicOffset() > 0) {
continue;
}
TopicOffset topicOffset = statsTable.getOffsetTable().get(new MessageQueue(topic, oldLeader.getBname(), oldLeader.getQueueId()));
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
//TODO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset() + 10000));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
}
}
//Step4: write to the new leader with logic offset
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
if (!commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
if (commandLine.hasOption("f")) {
executeFromFile(commandLine, options, rpcHook);
return;
}
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> targetBrokers = new HashSet<>();
try {
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
String topic = commandLine.getOptionValue('t').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
{
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
for (String broker: brokerStrs.split(",")) {
targetBrokers.add(broker.trim());
}
} else if (commandLine.hasOption("c")) {
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
}
}
}
if (targetBrokers.isEmpty()) {
throw new RuntimeException("Find none brokers, do nothing");
}
for (String broker : targetBrokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
}
}
//get the existed topic config and mapping
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
}
}
}
if (brokerConfigMap.isEmpty()) {
throw new RuntimeException("No topic route to do the remapping");
}
final Map.Entry<Long, Integer> maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
long maxEpoch = maxEpochAndNum.getKey();
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, maxEpoch, new HashMap<>(), brokerConfigMap);
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
Map<Integer, String> expectedIdToBroker = new HashMap<>();
//the following logic will make sure that, for one broker, either "map in" or "map out"
//It can't both, map in some queues but also map out some queues.
globalIdMap.forEach((queueId, mappingOne) -> {
String leaderBroker = mappingOne.getBname();
if (expectedBrokerNumMap.containsKey(leaderBroker)) {
if (expectedBrokerNumMap.get(leaderBroker) > 0) {
expectedIdToBroker.put(queueId, leaderBroker);
expectedBrokerNumMap.put(leaderBroker, expectedBrokerNumMap.get(leaderBroker) - 1);
} else {
waitAssignQueues.add(queueId);
expectedBrokerNumMap.remove(leaderBroker);
}
} else {
waitAssignQueues.add(queueId);
}
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
Integer queueId = waitAssignQueues.poll();
assert queueId != null;
expectedIdToBroker.put(queueId, broker);
}
});
long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
//Now construct the remapping info
Set<String> brokersToMapOut = new HashSet<>();
Set<String> brokersToMapIn = new HashSet<>();
for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
Integer queueId = mapEntry.getKey();
String broker = mapEntry.getValue();
TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
assert topicQueueMappingOne != null;
if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
//remapping
String mapInBroker = broker;
String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
//Use the same object
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
//double check
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(brokerConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
{
TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, expectedIdToBroker, brokerConfigMap);
newWrapper.setBrokerToMapIn(brokersToMapIn);
newWrapper.setBrokerToMapOut(brokersToMapOut);
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
System.out.println("The old mapping data is written to file " + newMappingDataFile);
}
doRemapping(topic, brokersToMapIn, brokersToMapOut, brokerConfigMap, clientMetadata, defaultMQAdminExt);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
...@@ -21,15 +21,18 @@ import org.apache.commons.cli.CommandLine; ...@@ -21,15 +21,18 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.LogicQueueMappingItem; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
...@@ -40,7 +43,6 @@ import java.util.AbstractMap; ...@@ -40,7 +43,6 @@ import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -63,7 +65,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -63,7 +65,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Option opt = null; Option opt = null;
opt = new Option("c", "clusterName", true, "create topic to which cluster"); opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated");
optionGroup.addOption(opt);
opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated");
optionGroup.addOption(opt); optionGroup.addOption(opt);
optionGroup.setRequired(true); optionGroup.setRequired(true);
...@@ -77,104 +82,141 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -77,104 +82,141 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
opt.setRequired(true); opt.setRequired(true);
options.addOption(opt); options.addOption(opt);
opt = new Option("f", "mapFile", true, "The map file name");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
public void executeFromFile(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try {
String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
TopicRemappingDetailWrapper wrapper = TopicRemappingDetailWrapper.decode(mapData.getBytes(), TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, wrapper.getBrokerConfigMap());
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())), false, true);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, defaultMQAdminExt);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
public void doUpdate(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
}
@Override @Override
public void execute(final CommandLine commandLine, final Options options, public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException { RPCHook rpcHook) throws SubCommandException {
if (!commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
if (commandLine.hasOption("f")) {
executeFromFile(commandLine, options, rpcHook);
return;
}
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata(); ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>(); Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
Set<String> targetBrokers = new HashSet<>();
try { try {
if (!commandLine.hasOption('t') if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption('c')
|| !commandLine.hasOption("qn")) { || !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return; return;
} }
String topic = commandLine.getOptionValue('t').trim(); String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String clusters = commandLine.getOptionValue('c').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) { || clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty"); throw new RuntimeException("The Cluster info is empty");
} else {
clientMetadata.refreshClusterInfo(clusterInfo);
} }
Set<String> brokers = new HashSet<>(); clientMetadata.refreshClusterInfo(clusterInfo);
for (String cluster : clusters.split(",")) { {
cluster = cluster.trim(); if (commandLine.hasOption("b")) {
if (clusterInfo.getClusterAddrTable().get(cluster) != null) { String brokerStrs = commandLine.getOptionValue("b").trim();
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster)); for (String broker: brokerStrs.split(",")) {
targetBrokers.add(broker.trim());
}
} else if (commandLine.hasOption("c")) {
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
targetBrokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
}
}
}
if (targetBrokers.isEmpty()) {
throw new RuntimeException("Find none brokers, do nothing");
}
for (String broker : targetBrokers) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
if (addr == null) {
throw new RuntimeException("Can't find addr for broker " + broker);
}
} }
}
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers for " + clusters);
} }
//get the existed topic config and mapping //get the existed topic config and mapping
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
clientMetadata.freshTopicRoute(topic, routeData); {
if (routeData != null TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
&& !routeData.getQueueDatas().isEmpty()) { clientMetadata.freshTopicRoute(topic, routeData);
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName(); if (routeData != null
String addr = clientMetadata.findMasterBrokerAddr(bname); && !routeData.getQueueDatas().isEmpty()) {
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); for (QueueData queueData: routeData.getQueueDatas()) {
//allow the config is null String bname = queueData.getBrokerName();
if (mapping != null) { String addr = clientMetadata.findMasterBrokerAddr(bname);
existedTopicConfigMap.put(bname, mapping); TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} }
} }
} }
Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum); Map.Entry<Long, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
if (!existedTopicConfigMap.isEmpty()) { if (!brokerConfigMap.isEmpty()) {
//make sure it it not null maxEpochAndNum = TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
existedTopicConfigMap.forEach((key, value) -> { globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
if (value.getMappingDetail() != null) {
throw new RuntimeException("Mapping info should be null in broker " + key);
}
});
//make sure the detail is not dirty
existedTopicConfigMap.forEach((key, value) -> {
if (!key.equals(value.getMappingDetail().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
}
if (value.getMappingDetail().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
});
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
}
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
}
}
} }
if (queueNum < globalIdMap.size()) { if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
...@@ -183,48 +225,71 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -183,48 +225,71 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (queueNum == globalIdMap.size()) { if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing"); throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
} }
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), new HashMap<>(), brokerConfigMap);
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
System.out.println("The old mapping data is written to file " + oldMappingDataFile);
}
//the check is ok, now do the mapping allocation //the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)); Map<String, Integer> brokerNumMap = targetBrokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<Integer, String> idToBroker = new HashMap<>(); final Map<Integer, String> oldIdToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> { globalIdMap.forEach((key, value) -> {
String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value); String leaderbroker = value.getBname();
idToBroker.put(key, leaderbroker); oldIdToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) { if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1); brokerNumMap.put(leaderbroker, 1);
} else { } else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1); brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
} }
}); });
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap); TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(oldIdToBroker, brokerNumMap);
allocator.upToNum(queueNum); allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker(); Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping //construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); long newEpoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
newIdToBroker.forEach( (queueId, broker) -> { for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping; TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) { if (!brokerConfigMap.containsKey(broker)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1); configMapping = new TopicConfigAndQueueMapping(new TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch); configMapping.setWriteQueueNums(1);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); configMapping.setReadQueueNums(1);
brokerConfigMap.put(broker, configMapping);
} else { } else {
configMapping = existedTopicConfigMap.get(broker); configMapping = brokerConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
} }
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}
// set the topic config
brokerConfigMap.values().forEach(configMapping -> {
configMapping.getMappingDetail().setEpoch(newEpoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}); });
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, brokerConfigMap);
TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(brokerConfigMap.values())), false, true);
//If some succeed, and others fail, it will cause inconsistent data {
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) { TopicRemappingDetailWrapper newWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, newEpoch, newIdToBroker, brokerConfigMap);
String broker = entry.getKey(); String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
String addr = clientMetadata.findMasterBrokerAddr(broker); System.out.println("The new mapping data is written to file " + newMappingDataFile);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
} }
doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally { } finally {
......