提交 e862ac88 编写于 作者: D dongeforever

Add definition for logic queue

上级 4506f34e
......@@ -35,6 +35,10 @@ public class BrokerPathConfigHelper {
return rootDir + File.separator + "config" + File.separator + "topics.json";
}
public static String getTopicQueueMappingPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json";
}
public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TopicQueueMappingManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lock = new ReentrantLock();
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
private final ConcurrentMap<String, TopicQueueMappingInfo> topicQueueMappingTable = new ConcurrentHashMap<>();
public TopicQueueMappingManager(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public String encode(boolean pretty) {
TopicQueueMappingSerializeWrapper wrapper = new TopicQueueMappingSerializeWrapper();
wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
wrapper.setDataVersion(this.dataVersion);
return JSON.toJSONString(wrapper, pretty);
}
@Override
public String encode() {
return encode(false);
}
@Override
public String configFilePath() {
return BrokerPathConfigHelper.getTopicQueueMappingPath(this.brokerController.getMessageStoreConfig()
.getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
TopicQueueMappingSerializeWrapper wrapper = TopicQueueMappingSerializeWrapper.fromJson(jsonString, TopicQueueMappingSerializeWrapper.class);
if (wrapper != null) {
this.topicQueueMappingTable.putAll(wrapper.getTopicQueueMappingInfoMap());
this.dataVersion.assignNewOne(wrapper.getDataVersion());
}
}
}
public DataVersion getDataVersion() {
return dataVersion;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.util.Map;
public class TopicQueueMappingSerializeWrapper extends RemotingSerializable {
private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap;
private DataVersion dataVersion = new DataVersion();
public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
return topicQueueMappingInfoMap;
}
public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
}
public DataVersion getDataVersion() {
return dataVersion;
}
public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}
}
package org.apache.rocketmq.common.protocol.route;
public class LogicQueueMappingItem {
private int gen; //generation, mutable
private int queueId;
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset
private long timeOfStart = -1; //mutable
public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) {
this.gen = gen;
this.queueId = queueId;
this.bname = bname;
this.logicOffset = logicOffset;
this.startOffset = startOffset;
this.timeOfStart = timeOfStart;
}
public int getGen() {
return gen;
}
public void setGen(int gen) {
this.gen = gen;
}
public long getTimeOfStart() {
return timeOfStart;
}
public void setTimeOfStart(long timeOfStart) {
this.timeOfStart = timeOfStart;
}
public int getQueueId() {
return queueId;
}
public String getBname() {
return bname;
}
public long getLogicOffset() {
return logicOffset;
}
public long getStartOffset() {
return startOffset;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.route;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopicQueueMappingInfo {
private int totalQueues;
private String bname; //identify the host name
//the newest mapping is in current broker
private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
public TopicQueueMappingInfo(int totalQueues, String bname) {
this.totalQueues = totalQueues;
this.bname = bname;
}
public boolean putMappingInfo(Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
if (mappingInfo.isEmpty()) {
return true;
}
hostedQueues.put(globalId, mappingInfo);
return true;
}
public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId);
}
public int getTotalQueues() {
return totalQueues;
}
public void setTotalQueues(int totalQueues) {
this.totalQueues = totalQueues;
}
public String getBname() {
return bname;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册