diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmMergeUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmMergeUtils.java index dafba0dfa9d833669e534282db501a596c530280..4b24e314945deb84cdfe9334a3f4f56a32775a9b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmMergeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmMergeUtils.java @@ -170,6 +170,7 @@ public class VmMergeUtils { for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) { reader.close(); + logger.info("{} vm file close a reader", reader.getFileName()); } } @@ -179,6 +180,7 @@ public class VmMergeUtils { return tsFileSequenceReaderMap.computeIfAbsent(vmWriter.getFile().getAbsolutePath(), path -> { try { + logger.info("{} vm file create a reader", path); return new TsFileSequenceReader(path); } catch (IOException e) { logger.error( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 653e9a604f54744efbcdf75404e3fc782c95ca58..47bcac09c8703a89c5114ef02832457bfe786bf8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -263,7 +263,8 @@ public class TsFileProcessor { public void deleteDataInMemory(Deletion deletion) { flushQueryLock.writeLock().lock(); if (logger.isDebugEnabled()) { - logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); + logger + .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); } try { if (workMemTable != null) { @@ -383,7 +384,8 @@ public class TsFileProcessor { void asyncClose() { flushQueryLock.writeLock().lock(); if (logger.isDebugEnabled()) { - logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); + logger + .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); } try { @@ -391,7 +393,8 @@ public class TsFileProcessor { if (workMemTable != null) { logger.info( "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}", - storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), workMemTable.memSize(), + storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), + workMemTable.memSize(), tsFileResource.getTsFileSize()); } else { logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", @@ -442,7 +445,8 @@ public class TsFileProcessor { IMemTable tmpMemTable; flushQueryLock.writeLock().lock(); if (logger.isDebugEnabled()) { - logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); + logger + .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); } try { tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; @@ -485,7 +489,8 @@ public class TsFileProcessor { public void asyncFlush() { flushQueryLock.writeLock().lock(); if (logger.isDebugEnabled()) { - logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); + logger + .debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); } try { if (workMemTable == null) { @@ -621,7 +626,8 @@ public class TsFileProcessor { writer.getIOWriterOut().truncate(result.right - 1); VmMergeUtils.fullMerge(writer, vmWriters, storageGroupName, - new VmLogger(tsFileResource.getTsFile().getParent(), tsFileResource.getTsFile().getName()), + new VmLogger(tsFileResource.getTsFile().getParent(), + tsFileResource.getTsFile().getName()), deviceSet, sequence); for (TsFileResource vmTsFileResource : vmTsFileResources) { deleteVmFile(vmTsFileResource); @@ -782,8 +788,14 @@ public class TsFileProcessor { // on other merge task working now, it's safe to submit one. if (!mergeWorking) { mergeWorking = true; + logger.info("{}: {} submit a vm merge task", storageGroupName, + tsFileResource.getTsFile().getName()); VmMergeTaskPoolManager.getInstance() - .submit(new VmMergeTask(new ArrayList<>(vmTsFileResources), new ArrayList<>(vmWriters))); + .submit( + new VmMergeTask(new ArrayList<>(vmTsFileResources), new ArrayList<>(vmWriters))); + } else { + logger.info("{}: {} last vm merge task is working, skip current merge", storageGroupName, + tsFileResource.getTsFile().getName()); } } } @@ -1001,14 +1013,17 @@ public class TsFileProcessor { @Override public void run() { + long startTimeMillis = System.currentTimeMillis(); try { + logger.info("{}: {} start to run vm merge task", storageGroupName, + tsFileResource.getTsFile().getName()); long vmPointNum = 0; // all flush to target file Map pathMeasurementSchemaMap = new HashMap<>(); for (RestorableTsFileIOWriter vmWriter : vmMergeWriters) { Map>> schemaMap = vmWriter .getMetadatasForQuery(); - for(Entry>> schemaMapEntry : schemaMap + for (Entry>> schemaMapEntry : schemaMap .entrySet()) { String device = schemaMapEntry.getKey(); for (Entry> entry : schemaMapEntry.getValue().entrySet()) { @@ -1022,17 +1037,21 @@ public class TsFileProcessor { } } } + logger.info("{}: {} current vm point num: {}, measurement num: {}", storageGroupName, + tsFileResource.getTsFile().getName(), vmPointNum, pathMeasurementSchemaMap.size()); if (pathMeasurementSchemaMap.size() > 0 && vmPointNum / pathMeasurementSchemaMap.size() > config .getMergeChunkPointNumberThreshold() || flushVmTimes >= config .getMaxMergeChunkNumInTsFile()) { // merge vm to tsfile flushVmTimes = 0; - logger.info("[Flush] merge {} vms to TsFile", vmMergeWriters.size() + 1); + logger.info("{}: {} merge {} vms to TsFile", storageGroupName, + tsFileResource.getTsFile().getName(), vmMergeWriters.size()); flushAllVmToTsFile(vmMergeWriters, vmMergeTsFiles); } else if (config.getMaxVmNum() <= vmMergeTsFiles.size()) { // merge vm files - logger.info("[Flush] merge {} vms to vm", vmMergeTsFiles.size() + 1); + logger.info("{}: {} merge {} vms to vm", storageGroupName, + tsFileResource.getTsFile().getName(), vmMergeTsFiles.size()); // merge all vm files into a new vm file File tmpFile = createNewTmpFile(); RestorableTsFileIOWriter tmpWriter = new RestorableTsFileIOWriter(tmpFile); @@ -1044,22 +1063,29 @@ public class TsFileProcessor { if (!tmpFile.renameTo(mergedFile)) { logger.error("Failed to rename {} to {}", newVmFile, mergedFile); } + logger.info("{}: {} vm merge starts to delete file", storageGroupName, + tsFileResource.getTsFile().getName()); for (TsFileResource vmTsFileResource : vmMergeTsFiles) { deleteVmFile(vmTsFileResource); } vmWriters.removeAll(vmMergeWriters); vmTsFileResources.removeAll(vmMergeTsFiles); + logger.info("{}: {} vm merge end file", storageGroupName, + tsFileResource.getTsFile().getName()); if (!mergedFile.renameTo(newVmFile)) { logger.error("Failed to rename {} to {}", mergedFile, newVmFile); } vmTsFileResources.add(0, new TsFileResource(newVmFile)); vmWriters.add(0, new RestorableTsFileIOWriter(newVmFile)); + logger.info("{} vm file close a writer", newVmFile.getName()); } } catch (Exception e) { logger.error("Error occurred in Vm Merge thread", e); } finally { // reset the merge working state to false mergeWorking = false; + logger.info("{}: {} vm merge end time consumption: {} ms", storageGroupName, + tsFileResource.getTsFile().getName(), System.currentTimeMillis() - startTimeMillis); } }