提交 11f747af 编写于 作者: J JackieTien97

fix bug

上级 a7e90302
......@@ -87,7 +87,7 @@ public class VmMergeUtils {
.computeIfAbsent(vmWriter.getFile().getAbsolutePath(),
path -> {
try {
return new TsFileSequenceReader(path);
return new TsFileSequenceReader(path, false);
} catch (IOException e) {
logger.error(
"Storage group {} tsfile {}, flush recover meets error. reader create failed.",
......@@ -140,7 +140,7 @@ public class VmMergeUtils {
.computeIfAbsent(vmWriter.getFile().getAbsolutePath(),
path -> {
try {
return new TsFileSequenceReader(path);
return new TsFileSequenceReader(path, false);
} catch (IOException e) {
logger.error(
"Storage group {} tsfile {}, flush recover meets error. reader create failed.",
......@@ -152,6 +152,9 @@ public class VmMergeUtils {
if (reader == null) {
continue;
}
if (!vmWriter.getMetadatasForQuery().containsKey(deviceId)) {
continue;
}
List<ChunkMetadata> chunkMetadataList = vmWriter.getMetadatasForQuery()
.get(deviceId).get(measurementId);
if (chunkMetadataList == null) {
......
......@@ -34,10 +34,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
......@@ -79,7 +78,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
......@@ -133,6 +131,8 @@ public class TsFileProcessor {
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock";
private static final String FLUSH_QUERY_WRITE_RELEASE = "{}: {} get flushQueryLock write lock released";
private volatile boolean mergeWorking = false;
TsFileProcessor(String storageGroupName, File tsfile, List<File> vmFiles,
VersionController versionController,
CloseTsFileCallBack closeTsFileCallback,
......@@ -664,16 +664,18 @@ public class TsFileProcessor {
flushVmTimes++;
try {
MemTableFlushTask flushTask;
RestorableTsFileIOWriter curWriter;
if (config.isEnableVm()) {
logger.info("[Flush] flush a vm");
File newVmFile = createNewVMFile(tsFileResource);
vmTsFileResources.add(new TsFileResource(newVmFile));
RestorableTsFileIOWriter currVmWriter = new RestorableTsFileIOWriter(newVmFile);
vmWriters.add(currVmWriter);
flushTask = new MemTableFlushTask(memTableToFlush, currVmWriter, storageGroupName);
curWriter = new RestorableTsFileIOWriter(newVmFile);
vmWriters.add(curWriter);
} else {
flushTask = new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
curWriter = writer;
}
curWriter.mark();
flushTask = new MemTableFlushTask(memTableToFlush, curWriter, storageGroupName);
flushTask.syncFlushMemTable();
} catch (Exception e) {
logger.error("{}: {} meet error when flushing a memtable, change system mode to read-only",
......@@ -708,13 +710,25 @@ public class TsFileProcessor {
memTableToFlush.isSignalMemTable(), flushingMemTables.size());
}
}
Future<Void> vmFuture = VmMergeTaskPoolManager.getInstance()
.submit(new VmMergeTask(new ArrayList<>(vmTsFileResources), new ArrayList<>(vmWriters)));
if (shouldClose && flushingMemTables.isEmpty()) {
try {
// confirm that all vm file is flushed to tsfile when closed
vmFuture.get();
// merge vm to tsfile
while (true) {
if (!mergeWorking) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
logger.error("{}: {}, closing task is interrupted.",
storageGroupName, tsFileResource.getFile().getName(), e);
// generally it is because the thread pool is shutdown so the task should be aborted
break;
}
}
logger.info("[Flush] merge all {} vms to TsFile", vmTsFileResources.size() + 1);
flushAllVmToTsFile(vmWriters, vmTsFileResources);
writer.mark();
try {
double compressionRatio = ((double) totalMemTableSize) / writer.getPos();
......@@ -766,6 +780,13 @@ public class TsFileProcessor {
synchronized (flushingMemTables) {
flushingMemTables.notifyAll();
}
} else {
// on other merge task working now, it's safe to submit one.
if (!mergeWorking) {
mergeWorking = true;
VmMergeTaskPoolManager.getInstance()
.submit(new VmMergeTask(new ArrayList<>(vmTsFileResources), new ArrayList<>(vmWriters)));
}
}
}
......@@ -979,10 +1000,10 @@ public class TsFileProcessor {
logFile.delete();
}
class VmMergeTask implements Callable<Void> {
class VmMergeTask implements Runnable {
private List<TsFileResource> vmMergeTsFiles;
private List<RestorableTsFileIOWriter> vmMergeWriters;
private final List<TsFileResource> vmMergeTsFiles;
private final List<RestorableTsFileIOWriter> vmMergeWriters;
public VmMergeTask(
List<TsFileResource> vmMergeTsFiles,
......@@ -991,15 +1012,9 @@ public class TsFileProcessor {
this.vmMergeWriters = vmMergeWriters;
}
public void merge() {
@Override
public void run() {
try {
if (shouldClose && flushingMemTables.isEmpty()) {
// merge vm to tsfile
logger.info("[Flush] merge all {} vms to TsFile", vmMergeTsFiles.size() + 1);
flushAllVmToTsFile(vmMergeWriters, vmMergeTsFiles);
return;
}
long vmPointNum = 0;
// all flush to target file
Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>();
......@@ -1048,13 +1063,10 @@ public class TsFileProcessor {
}
} catch (Exception e) {
logger.error("Error occurred in Vm Merge thread", e);
} finally {
// reset the merge working state to false
mergeWorking = false;
}
}
@Override
public Void call() {
merge();
return null;
}
}
}
......@@ -163,6 +163,7 @@ public class TsFileIOWriter {
.add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
currentChunkGroupDeviceId = null;
chunkMetadataList = null;
out.flush();
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册