提交 f39b9e96 编写于 作者: 张凌哲 提交者: Jialin Qiao

update vm merge utils

上级 1e9297ca
......@@ -52,76 +52,6 @@ public class VmMergeUtils {
throw new IllegalStateException("Utility class");
}
public static void fullMerge(RestorableTsFileIOWriter writer,
List<List<RestorableTsFileIOWriter>> vmWriters, String storageGroup, VmLogger vmLogger,
Set<String> devices, boolean sequence) throws IOException {
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
Map<String, Map<String, MeasurementSchema>> deviceMeasurementMap = new HashMap<>();
for (List<RestorableTsFileIOWriter> subVmWriters : vmWriters) {
fillDeviceMeasurementMap(devices, deviceMeasurementMap, subVmWriters);
}
if (!sequence) {
for (Entry<String, Map<String, MeasurementSchema>> deviceMeasurementEntry : deviceMeasurementMap
.entrySet()) {
String deviceId = deviceMeasurementEntry.getKey();
writer.startChunkGroup(deviceId);
long maxVersion = Long.MIN_VALUE;
for (Entry<String, MeasurementSchema> entry : deviceMeasurementEntry.getValue()
.entrySet()) {
String measurementId = entry.getKey();
Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
for (int i = vmWriters.size() - 1; i >= 0; i--) {
maxVersion = writeUnseqChunk(writer, storageGroup, tsFileSequenceReaderMap, deviceId,
maxVersion, entry,
measurementId, timeValuePairMap, vmWriters.get(i));
}
IChunkWriter chunkWriter = new ChunkWriterImpl(entry.getValue());
for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
writeTVPair(timeValuePair, chunkWriter);
}
chunkWriter.writeToFileWriter(writer);
}
writer.writeVersion(maxVersion);
writer.endChunkGroup();
if (vmLogger != null) {
vmLogger.logDevice(deviceId, writer.getPos());
}
}
} else {
for (Entry<String, Map<String, MeasurementSchema>> deviceMeasurementEntry : deviceMeasurementMap
.entrySet()) {
String deviceId = deviceMeasurementEntry.getKey();
writer.startChunkGroup(deviceId);
for (Entry<String, MeasurementSchema> entry : deviceMeasurementEntry.getValue()
.entrySet()) {
String measurementId = entry.getKey();
ChunkMetadata newChunkMetadata = null;
Chunk newChunk = null;
for (int i = vmWriters.size() - 1; i >= 0; i--) {
Pair<ChunkMetadata, Chunk> chunkPair = writeSeqChunk(writer, storageGroup,
tsFileSequenceReaderMap, deviceId, measurementId,
vmWriters.get(i), newChunkMetadata, newChunk);
newChunkMetadata = chunkPair.left;
newChunk = chunkPair.right;
}
if (newChunkMetadata != null && newChunk != null) {
writer.writeChunk(newChunk, newChunkMetadata);
}
}
writer.endChunkGroup();
if (vmLogger != null) {
vmLogger.logDevice(deviceId, writer.getPos());
}
}
}
for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
reader.close();
logger.info("{} vm file close a reader", reader.getFileName());
}
}
private static Pair<ChunkMetadata, Chunk> writeSeqChunk(RestorableTsFileIOWriter writer,
String storageGroup,
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap, String deviceId,
......@@ -211,7 +141,7 @@ public class VmMergeUtils {
}
}
public static void levelMerge(RestorableTsFileIOWriter writer,
public static void merge(RestorableTsFileIOWriter writer,
List<RestorableTsFileIOWriter> vmWriters, String storageGroup, VmLogger vmLogger,
Set<String> devices, boolean sequence) throws IOException {
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
......
......@@ -84,7 +84,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
......@@ -678,7 +677,7 @@ public class TsFileProcessor {
if (targetFile.getName().endsWith(TSFILE_SUFFIX)) {
if (!isMergeFinished) {
writer.getIOWriterOut().truncate(offset - 1);
VmMergeUtils.fullMerge(writer, vmWriters,
VmMergeUtils.merge(writer, packVmWritersToSequenceList(vmWriters),
storageGroupName,
new VmLogger(tsFileResource.getTsFile().getParent(),
tsFileResource.getTsFile().getName()),
......@@ -719,7 +718,7 @@ public class TsFileProcessor {
List<TsFileResource> levelVmFiles = new ArrayList<>(
vmTsFileResources.get(level)
.subList(startIndex, startIndex + sourceFileList.size()));
VmMergeUtils.levelMerge(newVmWriter, levelVmWriters,
VmMergeUtils.merge(newVmWriter, levelVmWriters,
storageGroupName,
new VmLogger(tsFileResource.getTsFile().getParent(),
tsFileResource.getTsFile().getName()),
......@@ -1106,13 +1105,22 @@ public class TsFileProcessor {
private void flushAllVmToTsFile(List<List<RestorableTsFileIOWriter>> currMergeVmWriters,
List<List<TsFileResource>> currMergeVmFiles, VmLogger vmLogger) throws IOException {
VmMergeUtils.fullMerge(writer, currMergeVmWriters,
VmMergeUtils.merge(writer, packVmWritersToSequenceList(currMergeVmWriters),
storageGroupName, vmLogger, new HashSet<>(), sequence);
for (int i = 0; i < currMergeVmFiles.size(); i++) {
deleteVmFiles(currMergeVmFiles.get(i), currMergeVmWriters.get(i));
}
}
private List<RestorableTsFileIOWriter> packVmWritersToSequenceList(
List<List<RestorableTsFileIOWriter>> vmWriters) {
List<RestorableTsFileIOWriter> sequenceVmWriters = new ArrayList<>();
for (int i = vmWriters.size() - 1; i > 0; i--) {
sequenceVmWriters.addAll(vmWriters.get(i));
}
return sequenceVmWriters;
}
class VmMergeTask implements Runnable {
private final List<List<TsFileResource>> vmMergeTsFiles;
......@@ -1183,7 +1191,7 @@ public class TsFileProcessor {
// merge all vm files into a new vm file
File tmpFile = createNewTmpFile();
RestorableTsFileIOWriter tmpWriter = new RestorableTsFileIOWriter(tmpFile);
VmMergeUtils.levelMerge(tmpWriter, vmMergeWriters.get(i),
VmMergeUtils.merge(tmpWriter, vmMergeWriters.get(i),
storageGroupName, vmLogger, new HashSet<>(), sequence);
tmpWriter.close();
vmMergeLock.writeLock().lock();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册