diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java index 4135de358425772a889033a46f32c259d20a50e9..eec4942073bd5eb4d0a4dc003cdd197d27638360 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java @@ -700,7 +700,7 @@ public class FileNodeManager implements IStatistic, IService { * @param fileNodeName the seriesPath of storage group * @param appendFile the appended tsfile information */ - public boolean appendFileToFileNode(String fileNodeName, IntervalFileNode appendFile, + public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile, String appendFilePath) throws FileNodeManagerException { FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true); try { @@ -729,7 +729,7 @@ public class FileNodeManager implements IStatistic, IService { * @param fileNodeName the seriesPath of storage group * @param appendFile the appended tsfile information */ - public List getOverlapFilesFromFileNode(String fileNodeName, IntervalFileNode appendFile, + public List getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile, String uuid) throws FileNodeManagerException { FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true); List overlapFiles; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 0d43be8385295e4979bbcb3e23f2469919c4df1a..6f56ebc1cc7b78510df62e316c50eb97f3cd3885 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -76,7 +76,6 @@ import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.IReader; -import org.apache.iotdb.db.utils.FileSchemaUtils; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.db.utils.TimeValuePair; @@ -98,11 +97,8 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; import org.apache.iotdb.tsfile.write.schema.FileSchema; -import org.apache.iotdb.tsfile.write.schema.JsonConverter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; -import org.json.JSONArray; -import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,10 +121,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { private volatile boolean isOverflowed; private Map lastUpdateTimeMap; private Map flushLastUpdateTimeMap; - private Map> invertedIndexOfFiles; - private IntervalFileNode emptyIntervalFileNode; - private IntervalFileNode currentIntervalFileNode; - private List newFileNodes; + private Map> invertedIndexOfFiles; + private TsFileResource emptyTsFileResource; + private TsFileResource currentTsFileResource; + private List newFileNodes; private FileNodeProcessorStatus isMerging; // this is used when work->merge operation private int numOfMergeFile; @@ -187,21 +183,21 @@ public class FileNodeProcessor extends Processor implements IStatistic { if (!newFileNodes.isEmpty()) { // end time with one start time Map endTimeMap = new HashMap<>(); - for (Entry startTime : currentIntervalFileNode.getStartTimeMap().entrySet()) { + for (Entry startTime : currentTsFileResource.getStartTimeMap().entrySet()) { String deviceId = startTime.getKey(); endTimeMap.put(deviceId, lastUpdateTimeMap.get(deviceId)); } - currentIntervalFileNode.setEndTimeMap(endTimeMap); + currentTsFileResource.setEndTimeMap(endTimeMap); } } }; private Action overflowFlushAction = () -> { - // update the new IntervalFileNode List and emptyIntervalFile. + // update the new TsFileResource List and emptyIntervalFile. // Notice: thread safe synchronized (fileNodeProcessorStore) { fileNodeProcessorStore.setOverflowed(isOverflowed); - fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode); + fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource); fileNodeProcessorStore.setNewFileNodes(newFileNodes); } }; @@ -264,7 +260,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { } // TODO deep clone the lastupdate time lastUpdateTimeMap = fileNodeProcessorStore.getLastUpdateTimeMap(); - emptyIntervalFileNode = fileNodeProcessorStore.getEmptyIntervalFileNode(); + emptyTsFileResource = fileNodeProcessorStore.getEmptyTsFileResource(); newFileNodes = fileNodeProcessorStore.getNewFileNodes(); isMerging = fileNodeProcessorStore.getFileNodeProcessorStatus(); numOfMergeFile = fileNodeProcessorStore.getNumOfMergeFile(); @@ -350,10 +346,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { */ void addIntervalFileNode(String baseDir, String fileName) throws ActionException { - IntervalFileNode intervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir, + TsFileResource tsFileResource = new TsFileResource(OverflowChangeType.NO_CHANGE, baseDir, fileName); - this.currentIntervalFileNode = intervalFileNode; - newFileNodes.add(intervalFileNode); + this.currentTsFileResource = tsFileResource; + newFileNodes.add(tsFileResource); fileNodeProcessorStore.setNewFileNodes(newFileNodes); flushFileNodeProcessorAction.act(); } @@ -364,12 +360,12 @@ public class FileNodeProcessor extends Processor implements IStatistic { * @param deviceId device ID */ void setIntervalFileNodeStartTime(String deviceId) { - if (currentIntervalFileNode.getStartTime(deviceId) == -1) { - currentIntervalFileNode.setStartTime(deviceId, flushLastUpdateTimeMap.get(deviceId)); + if (currentTsFileResource.getStartTime(deviceId) == -1) { + currentTsFileResource.setStartTime(deviceId, flushLastUpdateTimeMap.get(deviceId)); if (!invertedIndexOfFiles.containsKey(deviceId)) { invertedIndexOfFiles.put(deviceId, new ArrayList<>()); } - invertedIndexOfFiles.get(deviceId).add(currentIntervalFileNode); + invertedIndexOfFiles.get(deviceId).add(currentTsFileResource); } } @@ -378,7 +374,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { */ public void clearFileNode() { isOverflowed = false; - emptyIntervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, null); + emptyTsFileResource = new TsFileResource(OverflowChangeType.NO_CHANGE, null); newFileNodes = new ArrayList<>(); isMerging = FileNodeProcessorStatus.NONE; numOfMergeFile = 0; @@ -386,14 +382,14 @@ public class FileNodeProcessor extends Processor implements IStatistic { fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging); fileNodeProcessorStore.setNewFileNodes(newFileNodes); fileNodeProcessorStore.setNumOfMergeFile(numOfMergeFile); - fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode); + fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource); } - private void addAllFileIntoIndex(List fileList) { + private void addAllFileIntoIndex(List fileList) { // clear map invertedIndexOfFiles.clear(); // add all file to index - for (IntervalFileNode fileNode : fileList) { + for (TsFileResource fileNode : fileList) { if (fileNode.getStartTimeMap().isEmpty()) { continue; } @@ -437,7 +433,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { // // add the current file // - currentIntervalFileNode = newFileNodes.get(newFileNodes.size() - 1); + currentTsFileResource = newFileNodes.get(newFileNodes.size() - 1); // this bufferwrite file is not close by normal operation String damagedFilePath = newFileNodes.get(newFileNodes.size() - 1).getFilePath(); @@ -626,17 +622,17 @@ public class FileNodeProcessor extends Processor implements IStatistic { WARN_NO_SUCH_OVERFLOWED_FILE + "the data is [device:{},time:{}]", getProcessorName(), deviceId, timestamp); - emptyIntervalFileNode.setStartTime(deviceId, 0L); - emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId)); - emptyIntervalFileNode.changeTypeToChanged(isMerging); + emptyTsFileResource.setStartTime(deviceId, 0L); + emptyTsFileResource.setEndTime(deviceId, getLastUpdateTime(deviceId)); + emptyTsFileResource.changeTypeToChanged(isMerging); } else { - List temp = invertedIndexOfFiles.get(deviceId); + List temp = invertedIndexOfFiles.get(deviceId); int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp); changeTypeToChanged(temp.get(index), deviceId); } } - private void changeTypeToChanged(IntervalFileNode fileNode, String deviceId) { + private void changeTypeToChanged(TsFileResource fileNode, String deviceId) { fileNode.changeTypeToChanged(isMerging); if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) { fileNode.addMergeChanged(deviceId); @@ -652,11 +648,11 @@ public class FileNodeProcessor extends Processor implements IStatistic { WARN_NO_SUCH_OVERFLOWED_FILE + "the data is [device:{}, start time:{}, end time:{}]", getProcessorName(), deviceId, startTime, endTime); - emptyIntervalFileNode.setStartTime(deviceId, 0L); - emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId)); - emptyIntervalFileNode.changeTypeToChanged(isMerging); + emptyTsFileResource.setStartTime(deviceId, 0L); + emptyTsFileResource.setEndTime(deviceId, getLastUpdateTime(deviceId)); + emptyTsFileResource.changeTypeToChanged(isMerging); } else { - List temp = invertedIndexOfFiles.get(deviceId); + List temp = invertedIndexOfFiles.get(deviceId); int left = searchIndexNodeByTimestamp(deviceId, startTime, temp); int right = searchIndexNodeByTimestamp(deviceId, endTime, temp); for (int i = left; i <= right; i++) { @@ -674,11 +670,11 @@ public class FileNodeProcessor extends Processor implements IStatistic { WARN_NO_SUCH_OVERFLOWED_FILE + "the data is [device:{}, delete time:{}]", getProcessorName(), deviceId, timestamp); - emptyIntervalFileNode.setStartTime(deviceId, 0L); - emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId)); - emptyIntervalFileNode.changeTypeToChanged(isMerging); + emptyTsFileResource.setStartTime(deviceId, 0L); + emptyTsFileResource.setEndTime(deviceId, getLastUpdateTime(deviceId)); + emptyTsFileResource.changeTypeToChanged(isMerging); } else { - List temp = invertedIndexOfFiles.get(deviceId); + List temp = invertedIndexOfFiles.get(deviceId); int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp); for (int i = 0; i <= index; i++) { temp.get(i).changeTypeToChanged(isMerging); @@ -695,7 +691,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { * @return index of interval */ private int searchIndexNodeByTimestamp(String deviceId, long timestamp, - List fileList) { + List fileList) { int index = 1; while (index < fileList.size()) { if (timestamp < fileList.get(index).getStartTime(deviceId)) { @@ -768,11 +764,11 @@ public class FileNodeProcessor extends Processor implements IStatistic { throw new FileNodeProcessorException(e); } // tsfile dataØØ - List bufferwriteDataInFiles = new ArrayList<>(); - for (IntervalFileNode intervalFileNode : newFileNodes) { - // add the same intervalFileNode, but not the same reference - if (intervalFileNode.isClosed()) { - bufferwriteDataInFiles.add(intervalFileNode.backUp()); + List bufferwriteDataInFiles = new ArrayList<>(); + for (TsFileResource tsFileResource : newFileNodes) { + // add the same tsFileResource, but not the same reference + if (tsFileResource.isClosed()) { + bufferwriteDataInFiles.add(tsFileResource.backUp()); } } Pair> bufferwritedata = new Pair<>(null, null); @@ -798,7 +794,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { try { List pathModifications = context.getPathModifications( - currentIntervalFileNode.getModFile(), deviceId + currentTsFileResource.getModFile(), deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId ); if (!pathModifications.isEmpty()) { @@ -823,7 +819,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { * @param appendFile the appended tsfile information * @param appendFilePath the seriesPath of appended file */ - public void appendFile(IntervalFileNode appendFile, String appendFilePath) + public void appendFile(TsFileResource appendFile, String appendFilePath) throws FileNodeProcessorException { try { if (!new File(appendFile.getFilePath()).getParentFile().exists()) { @@ -869,12 +865,12 @@ public class FileNodeProcessor extends Processor implements IStatistic { * * @param appendFile the appended tsfile information */ - public List getOverlapFiles(IntervalFileNode appendFile, String uuid) + public List getOverlapFiles(TsFileResource appendFile, String uuid) throws FileNodeProcessorException { List overlapFiles = new ArrayList<>(); try { - for (IntervalFileNode intervalFileNode : newFileNodes) { - getOverlapFiles(appendFile, intervalFileNode, uuid, overlapFiles); + for (TsFileResource tsFileResource : newFileNodes) { + getOverlapFiles(appendFile, tsFileResource, uuid, overlapFiles); } } catch (IOException e) { LOGGER.error("Failed to get overlap tsfiles which conflict with the appendFile."); @@ -883,24 +879,24 @@ public class FileNodeProcessor extends Processor implements IStatistic { return overlapFiles; } - private void getOverlapFiles(IntervalFileNode appendFile, IntervalFileNode intervalFileNode, + private void getOverlapFiles(TsFileResource appendFile, TsFileResource tsFileResource, String uuid, List overlapFiles) throws IOException { for (Entry entry : appendFile.getStartTimeMap().entrySet()) { - if (intervalFileNode.getStartTimeMap().containsKey(entry.getKey()) && - intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue() - && intervalFileNode.getStartTime(entry.getKey()) <= appendFile + if (tsFileResource.getStartTimeMap().containsKey(entry.getKey()) && + tsFileResource.getEndTime(entry.getKey()) >= entry.getValue() + && tsFileResource.getStartTime(entry.getKey()) <= appendFile .getEndTime(entry.getKey())) { String relativeFilePath = "postback" + File.separator + uuid + File.separator + "backup" - + File.separator + intervalFileNode.getRelativePath(); + + File.separator + tsFileResource.getRelativePath(); File newFile = new File( - Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()), + Directories.getInstance().getTsFileFolder(tsFileResource.getBaseDirIndex()), relativeFilePath); if (!newFile.getParentFile().exists()) { newFile.getParentFile().mkdirs(); } java.nio.file.Path link = FileSystems.getDefault().getPath(newFile.getPath()); java.nio.file.Path target = FileSystems.getDefault() - .getPath(intervalFileNode.getFilePath()); + .getPath(tsFileResource.getFilePath()); Files.createLink(link, target); overlapFiles.add(newFile.getPath()); break; @@ -1028,12 +1024,12 @@ public class FileNodeProcessor extends Processor implements IStatistic { // change status from work to merge isMerging = FileNodeProcessorStatus.MERGING_WRITE; // check the empty file - Map startTimeMap = emptyIntervalFileNode.getStartTimeMap(); + Map startTimeMap = emptyTsFileResource.getStartTimeMap(); mergeCheckEmptyFile(startTimeMap); - for (IntervalFileNode intervalFileNode : newFileNodes) { - if (intervalFileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { - intervalFileNode.setOverflowChangeType(OverflowChangeType.CHANGED); + for (TsFileResource tsFileResource : newFileNodes) { + if (tsFileResource.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { + tsFileResource.setOverflowChangeType(OverflowChangeType.CHANGED); } } @@ -1042,7 +1038,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { fileNodeProcessorStore.setOverflowed(isOverflowed); fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging); fileNodeProcessorStore.setNewFileNodes(newFileNodes); - fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode); + fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource); // flush this filenode information try { writeStoreToDisk(fileNodeProcessorStore); @@ -1054,17 +1050,17 @@ public class FileNodeProcessor extends Processor implements IStatistic { } } // add numOfMergeFile to control the number of the merge file - List backupIntervalFiles; + List backupIntervalFiles; backupIntervalFiles = switchFileNodeToMerge(); // // clear empty file // boolean needEmtpy = false; - if (emptyIntervalFileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { + if (emptyTsFileResource.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { needEmtpy = true; } - emptyIntervalFileNode.clear(); + emptyTsFileResource.clear(); // attention try { overflowProcessor.switchWorkToMerge(); @@ -1081,7 +1077,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { // query tsfile data and overflow data, and merge them int numOfMergeFiles = 0; int allNeedMergeFiles = backupIntervalFiles.size(); - for (IntervalFileNode backupIntervalFile : backupIntervalFiles) { + for (TsFileResource backupIntervalFile : backupIntervalFiles) { numOfMergeFiles++; if (backupIntervalFile.getOverflowChangeType() == OverflowChangeType.CHANGED) { // query data and merge @@ -1133,10 +1129,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { } private void mergeCheckEmptyFile(Map startTimeMap) { - if (emptyIntervalFileNode.getOverflowChangeType() == OverflowChangeType.NO_CHANGE) { + if (emptyTsFileResource.getOverflowChangeType() == OverflowChangeType.NO_CHANGE) { return; } - Iterator> iterator = emptyIntervalFileNode.getEndTimeMap().entrySet() + Iterator> iterator = emptyTsFileResource.getEndTimeMap().entrySet() .iterator(); while (iterator.hasNext()) { Entry entry = iterator.next(); @@ -1147,32 +1143,32 @@ public class FileNodeProcessor extends Processor implements IStatistic { iterator.remove(); } } - if (emptyIntervalFileNode.checkEmpty()) { - emptyIntervalFileNode.clear(); + if (emptyTsFileResource.checkEmpty()) { + emptyTsFileResource.clear(); } else { if (!newFileNodes.isEmpty()) { - IntervalFileNode first = newFileNodes.get(0); - for (String deviceId : emptyIntervalFileNode.getStartTimeMap().keySet()) { - first.setStartTime(deviceId, emptyIntervalFileNode.getStartTime(deviceId)); - first.setEndTime(deviceId, emptyIntervalFileNode.getEndTime(deviceId)); + TsFileResource first = newFileNodes.get(0); + for (String deviceId : emptyTsFileResource.getStartTimeMap().keySet()) { + first.setStartTime(deviceId, emptyTsFileResource.getStartTime(deviceId)); + first.setEndTime(deviceId, emptyTsFileResource.getEndTime(deviceId)); first.setOverflowChangeType(OverflowChangeType.CHANGED); } - emptyIntervalFileNode.clear(); + emptyTsFileResource.clear(); } else { - emptyIntervalFileNode.setOverflowChangeType(OverflowChangeType.CHANGED); + emptyTsFileResource.setOverflowChangeType(OverflowChangeType.CHANGED); } } } - private List switchFileNodeToMerge() throws FileNodeProcessorException { - List result = new ArrayList<>(); - if (emptyIntervalFileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { + private List switchFileNodeToMerge() throws FileNodeProcessorException { + List result = new ArrayList<>(); + if (emptyTsFileResource.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { // add empty - result.add(emptyIntervalFileNode.backUp()); + result.add(emptyTsFileResource.backUp()); if (!newFileNodes.isEmpty()) { throw new FileNodeProcessorException( String.format("The status of empty file is %s, but the new file list is not empty", - emptyIntervalFileNode.getOverflowChangeType())); + emptyTsFileResource.getOverflowChangeType())); } return result; } @@ -1181,43 +1177,43 @@ public class FileNodeProcessor extends Processor implements IStatistic { throw new FileNodeProcessorException( "No file was changed when merging, the filenode is " + getProcessorName()); } - for (IntervalFileNode intervalFileNode : newFileNodes) { - updateFileNode(intervalFileNode, result); + for (TsFileResource tsFileResource : newFileNodes) { + updateFileNode(tsFileResource, result); } return result; } - private void updateFileNode(IntervalFileNode intervalFileNode, List result) { - if (intervalFileNode.getOverflowChangeType() == OverflowChangeType.NO_CHANGE) { - result.add(intervalFileNode.backUp()); + private void updateFileNode(TsFileResource tsFileResource, List result) { + if (tsFileResource.getOverflowChangeType() == OverflowChangeType.NO_CHANGE) { + result.add(tsFileResource.backUp()); } else { Map startTimeMap = new HashMap<>(); Map endTimeMap = new HashMap<>(); - for (String deviceId : intervalFileNode.getEndTimeMap().keySet()) { - List temp = invertedIndexOfFiles.get(deviceId); - int index = temp.indexOf(intervalFileNode); + for (String deviceId : tsFileResource.getEndTimeMap().keySet()) { + List temp = invertedIndexOfFiles.get(deviceId); + int index = temp.indexOf(tsFileResource); int size = temp.size(); // start time if (index == 0) { startTimeMap.put(deviceId, 0L); } else { - startTimeMap.put(deviceId, intervalFileNode.getStartTime(deviceId)); + startTimeMap.put(deviceId, tsFileResource.getStartTime(deviceId)); } // end time if (index < size - 1) { endTimeMap.put(deviceId, temp.get(index + 1).getStartTime(deviceId) - 1); } else { - endTimeMap.put(deviceId, intervalFileNode.getEndTime(deviceId)); + endTimeMap.put(deviceId, tsFileResource.getEndTime(deviceId)); } } - IntervalFileNode node = new IntervalFileNode(startTimeMap, endTimeMap, - intervalFileNode.getOverflowChangeType(), intervalFileNode.getBaseDirIndex(), - intervalFileNode.getRelativePath()); + TsFileResource node = new TsFileResource(startTimeMap, endTimeMap, + tsFileResource.getOverflowChangeType(), tsFileResource.getBaseDirIndex(), + tsFileResource.getRelativePath()); result.add(node); } } - private void switchMergeToWaiting(List backupIntervalFiles, boolean needEmpty) + private void switchMergeToWaiting(List backupIntervalFiles, boolean needEmpty) throws FileNodeProcessorException { LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(), FileNodeProcessorStatus.MERGING_WRITE, FileNodeProcessorStatus.WAITING); @@ -1227,10 +1223,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { oldMultiPassLock = newMultiPassLock; newMultiPassTokenSet = new HashSet<>(); newMultiPassLock = new ReentrantReadWriteLock(false); - List result = new ArrayList<>(); + List result = new ArrayList<>(); int beginIndex = 0; if (needEmpty) { - IntervalFileNode empty = backupIntervalFiles.get(0); + TsFileResource empty = backupIntervalFiles.get(0); if (!empty.checkEmpty()) { updateEmpty(empty, result); beginIndex++; @@ -1240,8 +1236,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { addAllFileIntoIndex(backupIntervalFiles); // check the merge changed file for (int i = beginIndex; i < backupIntervalFiles.size(); i++) { - IntervalFileNode newFile = newFileNodes.get(i - beginIndex); - IntervalFileNode temp = backupIntervalFiles.get(i); + TsFileResource newFile = newFileNodes.get(i - beginIndex); + TsFileResource temp = backupIntervalFiles.get(i); if (newFile.getOverflowChangeType() == OverflowChangeType.MERGING_CHANGE) { updateMergeChanged(newFile, temp); } @@ -1251,7 +1247,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { } // add new file when merge for (int i = backupIntervalFiles.size() - beginIndex; i < newFileNodes.size(); i++) { - IntervalFileNode fileNode = newFileNodes.get(i); + TsFileResource fileNode = newFileNodes.get(i); if (fileNode.isClosed()) { result.add(fileNode.backUp()); } else { @@ -1264,13 +1260,13 @@ public class FileNodeProcessor extends Processor implements IStatistic { // reconstruct the index addAllFileIntoIndex(newFileNodes); // clear merge changed - for (IntervalFileNode fileNode : newFileNodes) { + for (TsFileResource fileNode : newFileNodes) { fileNode.clearMergeChanged(); } synchronized (fileNodeProcessorStore) { fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging); - fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode); + fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource); fileNodeProcessorStore.setNewFileNodes(newFileNodes); try { writeStoreToDisk(fileNodeProcessorStore); @@ -1289,10 +1285,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { } } - private void updateEmpty(IntervalFileNode empty, List result) { + private void updateEmpty(TsFileResource empty, List result) { for (String deviceId : empty.getStartTimeMap().keySet()) { if (invertedIndexOfFiles.containsKey(deviceId)) { - IntervalFileNode temp = invertedIndexOfFiles.get(deviceId).get(0); + TsFileResource temp = invertedIndexOfFiles.get(deviceId).get(0); if (temp.getMergeChanged().contains(deviceId)) { empty.setOverflowChangeType(OverflowChangeType.CHANGED); break; @@ -1303,7 +1299,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { result.add(empty.backUp()); } - private void updateMergeChanged(IntervalFileNode newFile, IntervalFileNode temp) { + private void updateMergeChanged(TsFileResource newFile, TsFileResource temp) { for (String deviceId : newFile.getMergeChanged()) { if (temp.getStartTimeMap().containsKey(deviceId)) { temp.setOverflowChangeType(OverflowChangeType.CHANGED); @@ -1359,7 +1355,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { synchronized (fileNodeProcessorStore) { fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging); fileNodeProcessorStore.setNewFileNodes(newFileNodes); - fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode); + fileNodeProcessorStore.setEmptyTsFileResource(emptyTsFileResource); writeStoreToDisk(fileNodeProcessorStore); } } catch (IOException e) { @@ -1400,7 +1396,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { } private void collectBufferWriteFiles(Set bufferFiles) { - for (IntervalFileNode bufferFileNode : newFileNodes) { + for (TsFileResource bufferFileNode : newFileNodes) { String bufferFilePath = bufferFileNode.getFilePath(); if (bufferFilePath != null) { bufferFiles.add(bufferFilePath); @@ -1423,14 +1419,14 @@ public class FileNodeProcessor extends Processor implements IStatistic { } private void changeFileNodes() { - for (IntervalFileNode fileNode : newFileNodes) { + for (TsFileResource fileNode : newFileNodes) { if (fileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) { fileNode.setOverflowChangeType(OverflowChangeType.CHANGED); } } } - private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile) + private String queryAndWriteDataForMerge(TsFileResource backupIntervalFile) throws IOException, FileNodeProcessorException, PathErrorException { Map startTimeMap = new HashMap<>(); Map endTimeMap = new HashMap<>(); @@ -1751,7 +1747,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { public void close() throws FileNodeProcessorException { closeBufferWrite(); closeOverflow(); - for (IntervalFileNode fileNode : newFileNodes) { + for (TsFileResource fileNode : newFileNodes) { if (fileNode.getModFile() != null) { try { fileNode.getModFile().close(); @@ -1773,7 +1769,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { } closeBufferWrite(); closeOverflow(); - for (IntervalFileNode fileNode : newFileNodes) { + for (TsFileResource fileNode : newFileNodes) { if (fileNode.getModFile() != null) { try { fileNode.getModFile().close(); @@ -1819,7 +1815,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { try { processorStore = serializeUtil.deserialize(fileNodeRestoreFilePath) .orElse(new FileNodeProcessorStore(false, new HashMap<>(), - new IntervalFileNode(OverflowChangeType.NO_CHANGE, null), + new TsFileResource(OverflowChangeType.NO_CHANGE, null), new ArrayList<>(), FileNodeProcessorStatus.NONE, 0)); } catch (IOException e) { throw new FileNodeProcessorException(e); @@ -1875,12 +1871,12 @@ public class FileNodeProcessor extends Processor implements IStatistic { private void deleteBufferWriteFiles(String deviceId, Deletion deletion, List updatedModFiles) throws IOException { - if (currentIntervalFileNode != null && currentIntervalFileNode.containsDevice(deviceId)) { - currentIntervalFileNode.getModFile().write(deletion); - updatedModFiles.add(currentIntervalFileNode.getModFile()); + if (currentTsFileResource != null && currentTsFileResource.containsDevice(deviceId)) { + currentTsFileResource.getModFile().write(deletion); + updatedModFiles.add(currentTsFileResource.getModFile()); } - for (IntervalFileNode fileNode : newFileNodes) { - if (fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId) + for (TsFileResource fileNode : newFileNodes) { + if (fileNode != currentTsFileResource && fileNode.containsDevice(deviceId) && fileNode.getStartTime(deviceId) <= deletion.getTimestamp()) { fileNode.getModFile().write(deletion); updatedModFiles.add(fileNode.getModFile()); @@ -1953,8 +1949,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { Objects.equals(lastUpdateTimeMap, that.lastUpdateTimeMap) && Objects.equals(flushLastUpdateTimeMap, that.flushLastUpdateTimeMap) && Objects.equals(invertedIndexOfFiles, that.invertedIndexOfFiles) && - Objects.equals(emptyIntervalFileNode, that.emptyIntervalFileNode) && - Objects.equals(currentIntervalFileNode, that.currentIntervalFileNode) && + Objects.equals(emptyTsFileResource, that.emptyTsFileResource) && + Objects.equals(currentTsFileResource, that.currentTsFileResource) && Objects.equals(newFileNodes, that.newFileNodes) && isMerging == that.isMerging && Objects.equals(fileNodeProcessorStore, that.fileNodeProcessorStore) && @@ -1978,7 +1974,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { public int hashCode() { return Objects.hash(super.hashCode(), statStorageDeltaName, statParamsHashMap, isOverflowed, lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles, - emptyIntervalFileNode, currentIntervalFileNode, newFileNodes, isMerging, + emptyTsFileResource, currentTsFileResource, newFileNodes, isMerging, numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, baseDirPath, lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet, newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, shouldRecovery, parameters, diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java index ed8944ffbfcdd9d3564fdbc062ab0913286b7d48..db563649ec2b32d2f5d8d872df17d9e5b79e93b6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorStore.java @@ -26,7 +26,7 @@ import java.util.Map; /** * FileNodeProcessorStore is used to store information about FileNodeProcessor's status. * lastUpdateTime is changed and stored by BufferWrite flush or BufferWrite close. - * emptyIntervalFileNode and newFileNodes are changed and stored by Overflow flush and + * emptyTsFileResource and newFileNodes are changed and stored by Overflow flush and * Overflow close. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed * and stored when FileNodeProcessor's status changes from work to merge. @@ -39,8 +39,8 @@ public class FileNodeProcessorStore implements Serializable { private boolean isOverflowed; private Map lastUpdateTimeMap; - private IntervalFileNode emptyIntervalFileNode; - private List newFileNodes; + private TsFileResource emptyTsFileResource; + private List newFileNodes; private int numOfMergeFile; private FileNodeProcessorStatus fileNodeProcessorStatus; @@ -48,19 +48,19 @@ public class FileNodeProcessorStore implements Serializable { * Constructor of FileNodeProcessorStore. * @param isOverflowed whether this FileNode contains unmerged Overflow operations. * @param lastUpdateTimeMap the timestamp of last data point of each device in this FileNode. - * @param emptyIntervalFileNode a place holder when the FileNode contains no TsFile. + * @param emptyTsFileResource a place holder when the FileNode contains no TsFile. * @param newFileNodes TsFiles in the FileNode. * @param fileNodeProcessorStatus the status of the FileNode. * @param numOfMergeFile the number of files already merged in one merge operation. */ public FileNodeProcessorStore(boolean isOverflowed, Map lastUpdateTimeMap, - IntervalFileNode emptyIntervalFileNode, - List newFileNodes, + TsFileResource emptyTsFileResource, + List newFileNodes, FileNodeProcessorStatus fileNodeProcessorStatus, int numOfMergeFile) { this.isOverflowed = isOverflowed; this.lastUpdateTimeMap = lastUpdateTimeMap; - this.emptyIntervalFileNode = emptyIntervalFileNode; + this.emptyTsFileResource = emptyTsFileResource; this.newFileNodes = newFileNodes; this.fileNodeProcessorStatus = fileNodeProcessorStatus; this.numOfMergeFile = numOfMergeFile; @@ -90,19 +90,19 @@ public class FileNodeProcessorStore implements Serializable { this.lastUpdateTimeMap = lastUpdateTimeMap; } - public IntervalFileNode getEmptyIntervalFileNode() { - return emptyIntervalFileNode; + public TsFileResource getEmptyTsFileResource() { + return emptyTsFileResource; } - public void setEmptyIntervalFileNode(IntervalFileNode emptyIntervalFileNode) { - this.emptyIntervalFileNode = emptyIntervalFileNode; + public void setEmptyTsFileResource(TsFileResource emptyTsFileResource) { + this.emptyTsFileResource = emptyTsFileResource; } - public List getNewFileNodes() { + public List getNewFileNodes() { return newFileNodes; } - public void setNewFileNodes(List newFileNodes) { + public void setNewFileNodes(List newFileNodes) { this.newFileNodes = newFileNodes; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java similarity index 92% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java rename to iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java index 6a63b619f8c28cf64307ed8f2b6ec9fbdaa4246e..5f0b376d16573287aa274a84ff522bd5b3c5152b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/TsFileResource.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile; /** * This class is used to store one bufferwrite file status.
*/ -public class IntervalFileNode implements Serializable { +public class TsFileResource implements Serializable { private static final long serialVersionUID = -4309683416067212549L; @@ -45,7 +45,7 @@ public class IntervalFileNode implements Serializable { private transient ModificationFile modFile; - public IntervalFileNode(Map startTimeMap, Map endTimeMap, + public TsFileResource(Map startTimeMap, Map endTimeMap, OverflowChangeType type, int baseDirIndex, String relativePath) { this.overflowChangeType = type; @@ -65,7 +65,7 @@ public class IntervalFileNode implements Serializable { * @param type whether this file is affected by overflow and how it is affected. * @param relativePath the path of the file relative to the FileNode. */ - public IntervalFileNode(OverflowChangeType type, int baseDirIndex, String relativePath) { + public TsFileResource(OverflowChangeType type, int baseDirIndex, String relativePath) { this.overflowChangeType = type; this.baseDirIndex = baseDirIndex; @@ -78,7 +78,7 @@ public class IntervalFileNode implements Serializable { + relativePath + ModificationFile.FILE_SUFFIX); } - public IntervalFileNode(OverflowChangeType type, String baseDir, String relativePath) { + public TsFileResource(OverflowChangeType type, String baseDir, String relativePath) { this.overflowChangeType = type; this.baseDirIndex = Directories.getInstance().getTsFileFolderIndex(baseDir); @@ -91,7 +91,7 @@ public class IntervalFileNode implements Serializable { + relativePath + ModificationFile.FILE_SUFFIX); } - public IntervalFileNode(OverflowChangeType type, String relativePath) { + public TsFileResource(OverflowChangeType type, String relativePath) { this(type, 0, relativePath); } @@ -220,11 +220,11 @@ public class IntervalFileNode implements Serializable { } - public IntervalFileNode backUp() { + public TsFileResource backUp() { Map startTimeMapCopy = new HashMap<>(this.startTimeMap); Map endTimeMapCopy = new HashMap<>(this.endTimeMap); - return new IntervalFileNode(startTimeMapCopy, endTimeMapCopy, overflowChangeType, + return new TsFileResource(startTimeMapCopy, endTimeMapCopy, overflowChangeType, baseDirIndex, relativePath); } @@ -244,7 +244,7 @@ public class IntervalFileNode implements Serializable { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - IntervalFileNode fileNode = (IntervalFileNode) o; + TsFileResource fileNode = (TsFileResource) o; return baseDirIndex == fileNode.baseDirIndex && overflowChangeType == fileNode.overflowChangeType && Objects.equals(relativePath, fileNode.relativePath) && @@ -257,7 +257,7 @@ public class IntervalFileNode implements Serializable { public String toString() { return String.format( - "IntervalFileNode [relativePath=%s,overflowChangeType=%s, startTimeMap=%s," + "TsFileResource [relativePath=%s,overflowChangeType=%s, startTimeMap=%s," + " endTimeMap=%s, mergeChanged=%s]", relativePath, overflowChangeType, startTimeMap, endTimeMap, mergeChanged); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java similarity index 79% rename from iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java rename to iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java index 8a0a97ebdd429cf30edb2323aad311bc688ac88f..5fadb545d74acb09b86a83cbdaedbc95ef1d323d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java @@ -24,27 +24,28 @@ import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; /** * This class is used to store and query all overflow data in memory.
- * This just represent someone storage group.
*/ -public class OverflowSupport { +public class OverflowMemtable { /** * store update and delete data */ - private Map> indexTrees; + private Map> indexTrees; /** * store insert data */ private IMemTable memTable; - public OverflowSupport() { + public OverflowMemtable() { indexTrees = new HashMap<>(); memTable = new PrimitiveMemTable(); } @@ -68,9 +69,9 @@ public class OverflowSupport { indexTrees.put(deviceId, new HashMap<>()); } if (!indexTrees.get(deviceId).containsKey(measurementId)) { - indexTrees.get(deviceId).put(measurementId, new OverflowSeriesImpl(measurementId, dataType)); + indexTrees.get(deviceId).put(measurementId, new LongStatistics()); } - indexTrees.get(deviceId).get(measurementId).update(startTime, endTime); + indexTrees.get(deviceId).get(measurementId).updateStats(startTime, endTime); } public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) { @@ -87,20 +88,11 @@ public class OverflowSupport { return memTable.query(deviceId, measurementId, dataType, props); } - public BatchData queryOverflowUpdateInMemory(String deviceId, String measurementId, - TSDataType dataType) { - if (indexTrees.containsKey(deviceId) && indexTrees.get(deviceId).containsKey(measurementId) - && indexTrees.get(deviceId).get(measurementId).getDataType().equals(dataType)) { - return indexTrees.get(deviceId).get(measurementId).query(); - } - return null; - } - public boolean isEmptyOfOverflowSeriesMap() { return indexTrees.isEmpty(); } - public Map> getOverflowSeriesMap() { + public Map> getOverflowSeriesMap() { return indexTrees; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java index 7b9c7bfcf1474b664e622b1674182a07687bd273..b4669ec2f2baabe50f7aa9aa1dd03445889398e9 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java @@ -73,8 +73,8 @@ public class OverflowProcessor extends Processor { private OverflowResource workResource; private OverflowResource mergeResource; - private OverflowSupport workSupport; - private OverflowSupport flushSupport; + private OverflowMemtable workSupport; + private OverflowMemtable flushSupport; private volatile Future flushFuture = new ImmediateFuture<>(true); private volatile boolean isMerge; @@ -113,7 +113,7 @@ public class OverflowProcessor extends Processor { // recover file recovery(processorDataDir); // memory - workSupport = new OverflowSupport(); + workSupport = new OverflowMemtable(); overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION); filenodeFlushAction = parameters .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION); @@ -380,7 +380,7 @@ public class OverflowProcessor extends Processor { queryFlushLock.lock(); try { flushSupport = workSupport; - workSupport = new OverflowSupport(); + workSupport = new OverflowMemtable(); } finally { queryFlushLock.unlock(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSeriesImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSeriesImpl.java deleted file mode 100644 index 8aac49975329470c67c462f17587ef9549bfc761..0000000000000000000000000000000000000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSeriesImpl.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.overflow.io; - -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.BatchData; - -/** - * This class is only used to store and query overflow overflowIndex {@code IIntervalTreeOperator} - * data in memory. - */ -public class OverflowSeriesImpl { - - /** - * The data of update and delete in memory for this time series. - */ - private String measurementId; - private TSDataType dataType; - private Statistics statistics; - private int valueCount; - - public OverflowSeriesImpl(String measurementId, TSDataType dataType) { - this.measurementId = measurementId; - this.dataType = dataType; - statistics = new LongStatistics(); - } - - public void update(long startTime, long endTime) { - statistics.updateStats(startTime, endTime); - valueCount++; - } - - public void delete(long timestamp) { - statistics.updateStats(timestamp, timestamp); - valueCount++; - } - - public BatchData query() { - return null; - } - - public long getSize() { - return 0; - } - - public String getMeasurementId() { - return measurementId; - } - - public TSDataType getDataType() { - return dataType; - } - - public Statistics getStatistics() { - return statistics; - } - - public int getValueCount() { - return valueCount; - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java index c78ee578fb5d4bd832996c9fa6cee1e534ae2699..176a30e6da9c18db7d74388c9f5bcc321e8c2b7f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/GlobalSortedSeriesDataSource.java @@ -19,8 +19,7 @@ package org.apache.iotdb.db.engine.querycontext; import java.util.List; -import org.apache.iotdb.db.engine.filenode.IntervalFileNode; -import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; +import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.tsfile.read.common.Path; public class GlobalSortedSeriesDataSource { @@ -28,7 +27,7 @@ public class GlobalSortedSeriesDataSource { private Path seriesPath; // sealed tsfile - private List sealedTsFiles; + private List sealedTsFiles; // unsealed tsfile private UnsealedTsFile unsealedTsFile; @@ -36,7 +35,7 @@ public class GlobalSortedSeriesDataSource { // seq mem-table private ReadOnlyMemChunk readableChunk; - public GlobalSortedSeriesDataSource(Path seriesPath, List sealedTsFiles, + public GlobalSortedSeriesDataSource(Path seriesPath, List sealedTsFiles, UnsealedTsFile unsealedTsFile, ReadOnlyMemChunk readableChunk) { this.seriesPath = seriesPath; @@ -50,11 +49,11 @@ public class GlobalSortedSeriesDataSource { return sealedTsFiles != null && !sealedTsFiles.isEmpty(); } - public List getSealedTsFiles() { + public List getSealedTsFiles() { return sealedTsFiles; } - public void setSealedTsFiles(List sealedTsFiles) { + public void setSealedTsFiles(List sealedTsFiles) { this.sealedTsFiles = sealedTsFiles; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java index adac27c0dedf8a568e76b163219246e670394237..7bbdb16dc55a0dd4123f61803aeb3766c88297bf 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java @@ -39,7 +39,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.filenode.FileNodeManager; -import org.apache.iotdb.db.engine.filenode.IntervalFileNode; +import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.filenode.OverflowChangeType; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.utils.PostbackUtils; @@ -660,7 +660,7 @@ public class ServerServiceImpl implements ServerService.Iface { // create a new fileNode String header = postbackPath + uuid.get() + File.separator + "data" + File.separator; String relativePath = path.substring(header.length()); - IntervalFileNode fileNode = new IntervalFileNode(startTimeMap, endTimeMap, + TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap, OverflowChangeType.NO_CHANGE, Directories.getInstance().getNextFolderIndexForTsFile(), relativePath); // call interface of load external file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java index b2c1452b19615ed481f60b5439e18f161087fa86..585d85da6b938c595f867b141af4ba82a40fc5c0 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.control; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.iotdb.db.engine.filenode.IntervalFileNode; +import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; @@ -66,8 +66,8 @@ public class OpenedFilePathsManager { * Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap. */ public void addUsedFilesForCurrentRequestThread(long jobId, QueryDataSource dataSource) { - for (IntervalFileNode intervalFileNode : dataSource.getSeqDataSource().getSealedTsFiles()) { - String sealedFilePath = intervalFileNode.getFilePath(); + for (TsFileResource tsFileResource : dataSource.getSeqDataSource().getSealedTsFiles()) { + String sealedFilePath = tsFileResource.getFilePath(); addFilePathToMap(jobId, sealedFilePath, true); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java index 51ce94b64237003773478b7f45fb89751e83ad81..0e14b4bb7faf70883223f9dfd34beaca4e14e43d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.query.factory; import java.io.IOException; import java.util.List; -import org.apache.iotdb.db.engine.filenode.IntervalFileNode; +import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile; import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; @@ -131,7 +131,7 @@ public class SeriesReaderFactory { * This method is used to construct reader for merge process in IoTDB. To merge only one TsFile * data and one UnSeqFile data. */ - public IReader createSeriesReaderForMerge(IntervalFileNode intervalFileNode, + public IReader createSeriesReaderForMerge(TsFileResource tsFileResource, OverflowSeriesDataSource overflowSeriesDataSource, SingleSeriesExpression singleSeriesExpression, QueryContext context) @@ -139,12 +139,12 @@ public class SeriesReaderFactory { logger.debug("Create seriesReaders for merge. SeriesFilter = {}. TsFilePath = {}", singleSeriesExpression, - intervalFileNode.getFilePath()); + tsFileResource.getFilePath()); PriorityMergeReader priorityMergeReader = new PriorityMergeReader(); // Sequence reader - IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode, + IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(tsFileResource, singleSeriesExpression, context); priorityMergeReader.addReaderWithPriority(seriesInTsFileReader, 1); @@ -156,7 +156,7 @@ public class SeriesReaderFactory { return priorityMergeReader; } - private IReader createSealedTsFileReaderForMerge(IntervalFileNode fileNode, + private IReader createSealedTsFileReaderForMerge(TsFileResource fileNode, SingleSeriesExpression singleSeriesExpression, QueryContext context) throws IOException { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java index a8465bf66ed4a647f405ac84c05ba2b3c1d756a7..d83af0f8e1a96dddf47b7ea76c9bd419f6d34be1 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java @@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.db.engine.filenode.IntervalFileNode; +import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; @@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter; public class SealedTsFilesReader implements IReader { private Path seriesPath; - private List sealedTsFiles; + private List sealedTsFiles; private int usedIntervalFileIndex; private FileSeriesReader seriesReader; private Filter filter; @@ -53,7 +53,7 @@ public class SealedTsFilesReader implements IReader { private boolean hasCachedData; private QueryContext context; - public SealedTsFilesReader(Path seriesPath, List sealedTsFiles, Filter filter, + public SealedTsFilesReader(Path seriesPath, List sealedTsFiles, Filter filter, QueryContext context) { this(seriesPath, sealedTsFiles, context); this.filter = filter; @@ -63,7 +63,7 @@ public class SealedTsFilesReader implements IReader { /** * init with seriesPath and sealedTsFiles. */ - public SealedTsFilesReader(Path seriesPath, List sealedTsFiles, + public SealedTsFilesReader(Path seriesPath, List sealedTsFiles, QueryContext context) { this.seriesPath = seriesPath; this.sealedTsFiles = sealedTsFiles; @@ -109,7 +109,7 @@ public class SealedTsFilesReader implements IReader { while (!flag && usedIntervalFileIndex < sealedTsFiles.size()) { // init until reach a satisfied reader if (seriesReader == null || !seriesReader.hasNextBatch()) { - IntervalFileNode fileNode = sealedTsFiles.get(usedIntervalFileIndex++); + TsFileResource fileNode = sealedTsFiles.get(usedIntervalFileIndex++); if (singleTsFileSatisfied(fileNode)) { initSingleTsFileReader(fileNode, context); } else { @@ -155,7 +155,7 @@ public class SealedTsFilesReader implements IReader { } } - private boolean singleTsFileSatisfied(IntervalFileNode fileNode) { + private boolean singleTsFileSatisfied(TsFileResource fileNode) { if (filter == null) { return true; @@ -166,7 +166,7 @@ public class SealedTsFilesReader implements IReader { return filter.satisfyStartEndTime(startTime, endTime); } - private void initSingleTsFileReader(IntervalFileNode fileNode, QueryContext context) + private void initSingleTsFileReader(TsFileResource fileNode, QueryContext context) throws IOException { // to avoid too many opened files diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java index 5411b46b7a6756bf14bb10ecf9ed0b369629db1e..c7504390cd6afea123b013f40642510716d5ca52 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/SerializeUtilTest.java @@ -83,13 +83,13 @@ public class SerializeUtilTest { @Test public void testFileStore() { - IntervalFileNode emptyIntervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, + TsFileResource emptyTsFileResource = new TsFileResource(OverflowChangeType.NO_CHANGE, null); - List newFilenodes = new ArrayList<>(); + List newFilenodes = new ArrayList<>(); String deviceId = "d0.s0"; for (int i = 1; i <= 3; i++) { // i * 100, i * 100 + 99 - IntervalFileNode node = new IntervalFileNode(OverflowChangeType.NO_CHANGE, + TsFileResource node = new TsFileResource(OverflowChangeType.NO_CHANGE, "bufferfiletest" + i); node.setStartTime(deviceId, i * 100); node.setEndTime(deviceId, i * 100 + 99); @@ -100,7 +100,7 @@ public class SerializeUtilTest { lastUpdateTimeMap.put(deviceId, (long) 500); FileNodeProcessorStore fileNodeProcessorStore = new FileNodeProcessorStore(false, lastUpdateTimeMap, - emptyIntervalFileNode, newFilenodes, fileNodeProcessorState, 0); + emptyTsFileResource, newFilenodes, fileNodeProcessorState, 0); SerializeUtil serializeUtil = new SerializeUtil<>(); @@ -114,13 +114,13 @@ public class SerializeUtilTest { try { FileNodeProcessorStore fileNodeProcessorStore2 = serializeUtil.deserialize(filePath) .orElse(new FileNodeProcessorStore(false, new HashMap<>(), - new IntervalFileNode(OverflowChangeType.NO_CHANGE, null), - new ArrayList(), + new TsFileResource(OverflowChangeType.NO_CHANGE, null), + new ArrayList(), FileNodeProcessorStatus.NONE, 0)); assertEquals(fileNodeProcessorStore.getLastUpdateTimeMap(), fileNodeProcessorStore2.getLastUpdateTimeMap()); - assertEquals(fileNodeProcessorStore.getEmptyIntervalFileNode(), - fileNodeProcessorStore2.getEmptyIntervalFileNode()); + assertEquals(fileNodeProcessorStore.getEmptyTsFileResource(), + fileNodeProcessorStore2.getEmptyTsFileResource()); assertEquals(fileNodeProcessorStore.getNewFileNodes(), fileNodeProcessorStore2.getNewFileNodes()); assertEquals(fileNodeProcessorStore.getNumOfMergeFile(), diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtableTest.java similarity index 97% rename from iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtableTest.java index 3cc74db6055c39eb24ae629c391b2ba82622c035..7a4ea057447eaf50d4bc6e30597fadcab0088ed8 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtableTest.java @@ -28,9 +28,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -public class OverflowSupportTest { +public class OverflowMemtableTest { - private OverflowSupport support = new OverflowSupport(); + private OverflowMemtable support = new OverflowMemtable(); private String deviceId1 = "d1"; private String deviceId2 = "d2"; private String measurementId1 = "s1"; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java index 818dd9ac4179c16a6a825db701db92b01a5f78f4..13eee7b0d36dd58e72a5af2c0a91b894991b8516 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java @@ -44,7 +44,7 @@ public class OverflowResourceTest { private String positionFileName = "positionFile"; private String filePath = "overflow"; private String dataPath = "1"; - private OverflowSupport support = new OverflowSupport(); + private OverflowMemtable support = new OverflowMemtable(); @Before public void setUp() throws Exception { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java index 4b386ca3d10ca78fc20081311fb3de464c5e3684..5b408705914e56ae82739bb7c1c46fb5a02ab6ec 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowTestUtils.java @@ -47,7 +47,7 @@ public class OverflowTestUtils { return fileSchema; } - public static void produceInsertData(OverflowSupport support) { + public static void produceInsertData(OverflowMemtable support) { support.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(1), 1)); support.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(3), 3)); support.insert(getData(deviceId1, measurementId1, dataType1, String.valueOf(2), 2));