未验证 提交 36f34ee1 编写于 作者: X Xiangdong Huang 提交者: GitHub

Merge pull request #61 from apache/refactor_fileNode

refactor lock and unlock in  recovery() of fileNodeManager [WIP]
......@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
......@@ -75,6 +74,10 @@ public class FileNodeManager implements IStatistic, IService {
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeManager.class);
private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
private static final Directories directories = Directories.getInstance();
/**
* a folder that persist FileNodeProcessorStore classes. Each stroage group will have a subfolder.
* by default, it is system/info
*/
private final String baseDir;
/**
......@@ -92,6 +95,7 @@ public class FileNodeManager implements IStatistic, IService {
private FileNodeManager(String baseDir) {
processorMap = new ConcurrentHashMap<>();
statParamsHashMap = new HashMap<>();
//label: A
for (MonitorConstants.FileNodeManagerStatConstants fileNodeManagerStatConstant :
MonitorConstants.FileNodeManagerStatConstants.values()) {
statParamsHashMap.put(fileNodeManagerStatConstant.name(), new AtomicLong(0));
......@@ -106,7 +110,7 @@ public class FileNodeManager implements IStatistic, IService {
if (dir.mkdirs()) {
LOGGER.info("{} dir home doesn't exist, create it", dir.getPath());
}
//TODO merge this with label A
if (TsFileDBConf.enableStatMonitor) {
StatMonitor statMonitor = StatMonitor.getInstance();
registStatMetadata();
......@@ -182,6 +186,12 @@ public class FileNodeManager implements IStatistic, IService {
processorMap.clear();
}
/**
*
* @param filenodeName storage name, e.g., root.a.b
* @return
* @throws FileNodeManagerException
*/
private FileNodeProcessor constructNewProcessor(String filenodeName)
throws FileNodeManagerException {
try {
......@@ -196,6 +206,7 @@ public class FileNodeManager implements IStatistic, IService {
throws FileNodeManagerException {
String filenodeName;
try {
// return the stroage name
filenodeName = MManager.getInstance().getFileNameByPath(path);
} catch (PathErrorException e) {
LOGGER.error("MManager get filenode name error, seriesPath is {}", path);
......@@ -214,8 +225,8 @@ public class FileNodeManager implements IStatistic, IService {
processor.lock(isWriteLock);
} else {
// calculate the value with the key monitor
LOGGER.debug("Calcuate the processor, the filenode is {}, Thread is {}", filenodeName,
Thread.currentThread().getId());
LOGGER.debug("construct a processor instance, the filenode is {}, Thread is {}",
filenodeName, Thread.currentThread().getId());
processor = constructNewProcessor(filenodeName);
processor.lock(isWriteLock);
processorMap.put(filenodeName, processor);
......@@ -229,22 +240,30 @@ public class FileNodeManager implements IStatistic, IService {
* recovery the filenode processor.
*/
public void recovery() {
List<String> filenodeNames = null;
try {
List<String> filenodeNames = MManager.getInstance().getAllFileNames();
for (String filenodeName : filenodeNames) {
FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true);
filenodeNames = MManager.getInstance().getAllFileNames();
} catch (PathErrorException e) {
LOGGER.error("Restoring all FileNodes failed.", e);
return;
}
for (String filenodeName : filenodeNames) {
FileNodeProcessor fileNodeProcessor = null;
try {
fileNodeProcessor = getProcessor(filenodeName, true);
if (fileNodeProcessor.shouldRecovery()) {
LOGGER.info("Recovery the filenode processor, the filenode is {}, the status is {}",
filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
fileNodeProcessor.fileNodeRecovery();
} else {
}
} catch (FileNodeManagerException | FileNodeProcessorException e) {
LOGGER.error("Restoring fileNode {} failed.", filenodeName, e);
} finally {
if (fileNodeProcessor != null) {
fileNodeProcessor.writeUnlock();
}
// add index check sum
}
} catch (PathErrorException | FileNodeManagerException | FileNodeProcessorException e) {
LOGGER.error("Restoring all FileNodes failed, the reason is ", e);
// add index check sum
}
}
......@@ -607,6 +626,8 @@ public class FileNodeManager implements IStatistic, IService {
/**
* begin query.
* @param deviceId queried deviceId
* @return a query token for the device.
*/
public int beginQuery(String deviceId) throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
......@@ -1150,6 +1171,8 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeProcessor.fileNodeRecovery();
} catch (FileNodeProcessorException e) {
throw new FileNodeManagerException(e);
} finally {
fileNodeProcessor.writeUnlock();
}
}
......
......@@ -462,8 +462,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
// unlock
writeUnlock();
LOGGER.error(
"The filenode processor {} failed to recovery the bufferwrite processor, "
+ "the last bufferwrite file is {}.",
......@@ -480,7 +478,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
versionController);
} catch (IOException e) {
writeUnlock();
LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
getProcessorName());
throw new FileNodeProcessorException(e);
......@@ -498,10 +495,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// unlock
LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
getProcessorName(), isMerging);
writeUnlock();
//writeUnlock();
switchWaitingToWorking();
} else {
writeUnlock();
//writeUnlock();
}
// add file into index of file
addAllFileIntoIndex(newFileNodes);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.filenode;
import java.io.File;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.conf.directories.Directories;
/**
* This class is used to store the TsFile status.<br>
*/
public class TimeIntervalTsFile implements Serializable {
private static final long serialVersionUID = -4309683416067212549L;
public OverflowChangeType overflowChangeType;
private int baseDirIndex;
private String relativePath;
private Map<String, Long> startTimeMap;
private Map<String, Long> endTimeMap;
private Set<String> mergeChanged = new HashSet<>();
/**
* construct function for TimeIntervalTsFile.
*/
public TimeIntervalTsFile(Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
OverflowChangeType type,
int baseDirIndex, String relativePath) {
this.overflowChangeType = type;
this.baseDirIndex = baseDirIndex;
this.relativePath = relativePath;
this.startTimeMap = startTimeMap;
this.endTimeMap = endTimeMap;
}
/**
* This is just used to construct a new TsFile.
*/
public TimeIntervalTsFile(OverflowChangeType type, String relativePath) {
this.overflowChangeType = type;
this.relativePath = relativePath;
startTimeMap = new HashMap<>();
endTimeMap = new HashMap<>();
}
public void setStartTime(String deviceId, long startTime) {
startTimeMap.put(deviceId, startTime);
}
/**
* get start time.
*
* @param deviceId -Map key
* @return -start time
*/
public long getStartTime(String deviceId) {
if (startTimeMap.containsKey(deviceId)) {
return startTimeMap.get(deviceId);
} else {
return -1;
}
}
public Map<String, Long> getStartTimeMap() {
return startTimeMap;
}
public void setStartTimeMap(Map<String, Long> startTimeMap) {
this.startTimeMap = startTimeMap;
}
public void setEndTime(String deviceId, long timestamp) {
this.endTimeMap.put(deviceId, timestamp);
}
/**
* get end time for given device.
*
* @param deviceId -id of device
* @return -end time of the device
*/
public long getEndTime(String deviceId) {
if (endTimeMap.get(deviceId) == null) {
return -1;
}
return endTimeMap.get(deviceId);
}
public Map<String, Long> getEndTimeMap() {
return endTimeMap;
}
public void setEndTimeMap(Map<String, Long> endTimeMap) {
this.endTimeMap = endTimeMap;
}
/**
* remove given device'startTime start time and end time.
*
* @param deviceId -id of the device
*/
public void removeTime(String deviceId) {
startTimeMap.remove(deviceId);
endTimeMap.remove(deviceId);
}
/**
* get file path.
*/
public String getFilePath() {
if (relativePath == null) {
return relativePath;
}
return new File(Directories.getInstance().getTsFileFolder(baseDirIndex), relativePath)
.getPath();
}
public String getRelativePath() {
return relativePath;
}
public void setRelativePath(String relativePath) {
this.relativePath = relativePath;
}
public boolean checkEmpty() {
return startTimeMap.isEmpty() && endTimeMap.isEmpty();
}
/**
* clear the member variable of the given object.
*/
public void clear() {
startTimeMap.clear();
endTimeMap.clear();
mergeChanged.clear();
overflowChangeType = OverflowChangeType.NO_CHANGE;
relativePath = null;
}
/**
* change file type corresponding to the given param.
*/
public void changeTypeToChanged(FileNodeProcessorStatus fileNodeProcessorState) {
if (fileNodeProcessorState == FileNodeProcessorStatus.MERGING_WRITE) {
overflowChangeType = OverflowChangeType.MERGING_CHANGE;
} else {
overflowChangeType = OverflowChangeType.CHANGED;
}
}
public void addMergeChanged(String deviceId) {
mergeChanged.add(deviceId);
}
public Set<String> getMergeChanged() {
return mergeChanged;
}
public void clearMergeChanged() {
mergeChanged.clear();
}
/**
* judge whether the time interval is closed.
*/
public boolean isClosed() {
return !endTimeMap.isEmpty();
}
/**
* back up the time interval of tsfile.
*/
public TimeIntervalTsFile backUp() {
Map<String, Long> startTimeMap = new HashMap<>(this.startTimeMap);
Map<String, Long> endTimeMap = new HashMap<>(this.endTimeMap);
return new TimeIntervalTsFile(startTimeMap, endTimeMap, overflowChangeType, baseDirIndex,
relativePath);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((endTimeMap == null) ? 0 : endTimeMap.hashCode());
result = prime * result + ((relativePath == null) ? 0 : relativePath.hashCode());
result = prime * result + ((overflowChangeType == null) ? 0 : overflowChangeType.hashCode());
result = prime * result + ((startTimeMap == null) ? 0 : startTimeMap.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TimeIntervalTsFile other = (TimeIntervalTsFile) obj;
if (endTimeMap == null) {
if (other.endTimeMap != null) {
return false;
}
} else if (!endTimeMap.equals(other.endTimeMap)) {
return false;
}
if (relativePath == null) {
if (other.relativePath != null) {
return false;
}
} else if (!relativePath.equals(other.relativePath)) {
return false;
}
if (overflowChangeType != other.overflowChangeType) {
return false;
}
if (startTimeMap == null) {
if (other.startTimeMap != null) {
return false;
}
} else if (!startTimeMap.equals(other.startTimeMap)) {
return false;
}
return true;
}
@Override
public String toString() {
return "TimeIntervalTsFile [relativePath=" + relativePath + ", overflowChangeType="
+ overflowChangeType
+ ", startTimeMap=" + startTimeMap + ", endTimeMap=" + endTimeMap + ", mergeChanged="
+ mergeChanged
+ "]";
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册