From e862ac88fc5aae370648e6495f54476bff844e52 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Sat, 6 Nov 2021 11:54:21 +0800 Subject: [PATCH] Add definition for logic queue --- .../broker/BrokerPathConfigHelper.java | 4 + .../topic/TopicQueueMappingManager.java | 86 +++++++++++++++++++ .../TopicQueueMappingSerializeWrapper.java | 45 ++++++++++ .../protocol/route/LogicQueueMappingItem.java | 54 ++++++++++++ .../protocol/route/TopicQueueMappingInfo.java | 61 +++++++++++++ 5 files changed, 250 insertions(+) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java index 43a9946f..e7a72e0b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -35,6 +35,10 @@ public class BrokerPathConfigHelper { return rootDir + File.separator + "config" + File.separator + "topics.json"; } + public static String getTopicQueueMappingPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json"; + } + public static String getConsumerOffsetPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "consumerOffset.json"; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java new file mode 100644 index 00000000..9ee0f514 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import com.alibaba.fastjson.JSON; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper; +import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class TopicQueueMappingManager extends ConfigManager { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private transient final Lock lock = new ReentrantLock(); + + private final DataVersion dataVersion = new DataVersion(); + private transient BrokerController brokerController; + + private final ConcurrentMap topicQueueMappingTable = new ConcurrentHashMap<>(); + + + public TopicQueueMappingManager(BrokerController brokerController) { + this.brokerController = brokerController; + + } + + @Override + public String encode(boolean pretty) { + TopicQueueMappingSerializeWrapper wrapper = new TopicQueueMappingSerializeWrapper(); + wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable); + wrapper.setDataVersion(this.dataVersion); + return JSON.toJSONString(wrapper, pretty); + } + + @Override + public String encode() { + return encode(false); + } + + @Override + public String configFilePath() { + return BrokerPathConfigHelper.getTopicQueueMappingPath(this.brokerController.getMessageStoreConfig() + .getStorePathRootDir()); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + TopicQueueMappingSerializeWrapper wrapper = TopicQueueMappingSerializeWrapper.fromJson(jsonString, TopicQueueMappingSerializeWrapper.class); + if (wrapper != null) { + this.topicQueueMappingTable.putAll(wrapper.getTopicQueueMappingInfoMap()); + this.dataVersion.assignNewOne(wrapper.getDataVersion()); + } + } + } + + public DataVersion getDataVersion() { + return dataVersion; + } + + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java new file mode 100644 index 00000000..ef3f7581 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.protocol.body; + +import org.apache.rocketmq.common.DataVersion; +import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.Map; + +public class TopicQueueMappingSerializeWrapper extends RemotingSerializable { + private Map topicQueueMappingInfoMap; + private DataVersion dataVersion = new DataVersion(); + + public Map getTopicQueueMappingInfoMap() { + return topicQueueMappingInfoMap; + } + + public void setTopicQueueMappingInfoMap(Map topicQueueMappingInfoMap) { + this.topicQueueMappingInfoMap = topicQueueMappingInfoMap; + } + + public DataVersion getDataVersion() { + return dataVersion; + } + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion = dataVersion; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java new file mode 100644 index 00000000..fc5cbe62 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java @@ -0,0 +1,54 @@ +package org.apache.rocketmq.common.protocol.route; + +public class LogicQueueMappingItem { + + private int gen; //generation, mutable + private int queueId; + private String bname; + private long logicOffset; // the start of the logic offset + private long startOffset; // the start of the physical offset + private long timeOfStart = -1; //mutable + + public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) { + this.gen = gen; + this.queueId = queueId; + this.bname = bname; + this.logicOffset = logicOffset; + this.startOffset = startOffset; + this.timeOfStart = timeOfStart; + } + + public int getGen() { + return gen; + } + + public void setGen(int gen) { + this.gen = gen; + } + + + public long getTimeOfStart() { + return timeOfStart; + } + + public void setTimeOfStart(long timeOfStart) { + this.timeOfStart = timeOfStart; + } + + + public int getQueueId() { + return queueId; + } + + public String getBname() { + return bname; + } + + public long getLogicOffset() { + return logicOffset; + } + + public long getStartOffset() { + return startOffset; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java new file mode 100644 index 00000000..03769650 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.route; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TopicQueueMappingInfo { + + private int totalQueues; + private String bname; //identify the host name + //the newest mapping is in current broker + private Map> hostedQueues = new HashMap>(); + + + public TopicQueueMappingInfo(int totalQueues, String bname) { + this.totalQueues = totalQueues; + this.bname = bname; + } + + public boolean putMappingInfo(Integer globalId, List mappingInfo) { + if (mappingInfo.isEmpty()) { + return true; + } + hostedQueues.put(globalId, mappingInfo); + return true; + } + + public List getMappingInfo(Integer globalId) { + return hostedQueues.get(globalId); + } + + public int getTotalQueues() { + return totalQueues; + } + + public void setTotalQueues(int totalQueues) { + this.totalQueues = totalQueues; + } + + public String getBname() { + return bname; + } + + +} -- GitLab