提交 938e80aa 编写于 作者: 张凌哲

add log

上级 dd88ab15
......@@ -170,6 +170,7 @@ public class VmMergeUtils {
for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
reader.close();
logger.info("{} vm file close a reader", reader.getFileName());
}
}
......@@ -179,6 +180,7 @@ public class VmMergeUtils {
return tsFileSequenceReaderMap.computeIfAbsent(vmWriter.getFile().getAbsolutePath(),
path -> {
try {
logger.info("{} vm file create a reader", path);
return new TsFileSequenceReader(path);
} catch (IOException e) {
logger.error(
......
......@@ -263,7 +263,8 @@ public class TsFileProcessor {
public void deleteDataInMemory(Deletion deletion) {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
logger
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
if (workMemTable != null) {
......@@ -383,7 +384,8 @@ public class TsFileProcessor {
void asyncClose() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
logger
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
......@@ -391,7 +393,8 @@ public class TsFileProcessor {
if (workMemTable != null) {
logger.info(
"{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}",
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), workMemTable.memSize(),
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
workMemTable.memSize(),
tsFileResource.getTsFileSize());
} else {
logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
......@@ -442,7 +445,8 @@ public class TsFileProcessor {
IMemTable tmpMemTable;
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
logger
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
......@@ -485,7 +489,8 @@ public class TsFileProcessor {
public void asyncFlush() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
logger
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
if (workMemTable == null) {
......@@ -621,7 +626,8 @@ public class TsFileProcessor {
writer.getIOWriterOut().truncate(result.right - 1);
VmMergeUtils.fullMerge(writer, vmWriters,
storageGroupName,
new VmLogger(tsFileResource.getTsFile().getParent(), tsFileResource.getTsFile().getName()),
new VmLogger(tsFileResource.getTsFile().getParent(),
tsFileResource.getTsFile().getName()),
deviceSet, sequence);
for (TsFileResource vmTsFileResource : vmTsFileResources) {
deleteVmFile(vmTsFileResource);
......@@ -782,8 +788,14 @@ public class TsFileProcessor {
// on other merge task working now, it's safe to submit one.
if (!mergeWorking) {
mergeWorking = true;
logger.info("{}: {} submit a vm merge task", storageGroupName,
tsFileResource.getTsFile().getName());
VmMergeTaskPoolManager.getInstance()
.submit(new VmMergeTask(new ArrayList<>(vmTsFileResources), new ArrayList<>(vmWriters)));
.submit(
new VmMergeTask(new ArrayList<>(vmTsFileResources), new ArrayList<>(vmWriters)));
} else {
logger.info("{}: {} last vm merge task is working, skip current merge", storageGroupName,
tsFileResource.getTsFile().getName());
}
}
}
......@@ -1001,14 +1013,17 @@ public class TsFileProcessor {
@Override
public void run() {
long startTimeMillis = System.currentTimeMillis();
try {
logger.info("{}: {} start to run vm merge task", storageGroupName,
tsFileResource.getTsFile().getName());
long vmPointNum = 0;
// all flush to target file
Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>();
for (RestorableTsFileIOWriter vmWriter : vmMergeWriters) {
Map<String, Map<String, List<ChunkMetadata>>> schemaMap = vmWriter
.getMetadatasForQuery();
for(Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry : schemaMap
for (Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry : schemaMap
.entrySet()) {
String device = schemaMapEntry.getKey();
for (Entry<String, List<ChunkMetadata>> entry : schemaMapEntry.getValue().entrySet()) {
......@@ -1022,17 +1037,21 @@ public class TsFileProcessor {
}
}
}
logger.info("{}: {} current vm point num: {}, measurement num: {}", storageGroupName,
tsFileResource.getTsFile().getName(), vmPointNum, pathMeasurementSchemaMap.size());
if (pathMeasurementSchemaMap.size() > 0
&& vmPointNum / pathMeasurementSchemaMap.size() > config
.getMergeChunkPointNumberThreshold() || flushVmTimes >= config
.getMaxMergeChunkNumInTsFile()) {
// merge vm to tsfile
flushVmTimes = 0;
logger.info("[Flush] merge {} vms to TsFile", vmMergeWriters.size() + 1);
logger.info("{}: {} merge {} vms to TsFile", storageGroupName,
tsFileResource.getTsFile().getName(), vmMergeWriters.size());
flushAllVmToTsFile(vmMergeWriters, vmMergeTsFiles);
} else if (config.getMaxVmNum() <= vmMergeTsFiles.size()) {
// merge vm files
logger.info("[Flush] merge {} vms to vm", vmMergeTsFiles.size() + 1);
logger.info("{}: {} merge {} vms to vm", storageGroupName,
tsFileResource.getTsFile().getName(), vmMergeTsFiles.size());
// merge all vm files into a new vm file
File tmpFile = createNewTmpFile();
RestorableTsFileIOWriter tmpWriter = new RestorableTsFileIOWriter(tmpFile);
......@@ -1044,22 +1063,29 @@ public class TsFileProcessor {
if (!tmpFile.renameTo(mergedFile)) {
logger.error("Failed to rename {} to {}", newVmFile, mergedFile);
}
logger.info("{}: {} vm merge starts to delete file", storageGroupName,
tsFileResource.getTsFile().getName());
for (TsFileResource vmTsFileResource : vmMergeTsFiles) {
deleteVmFile(vmTsFileResource);
}
vmWriters.removeAll(vmMergeWriters);
vmTsFileResources.removeAll(vmMergeTsFiles);
logger.info("{}: {} vm merge end file", storageGroupName,
tsFileResource.getTsFile().getName());
if (!mergedFile.renameTo(newVmFile)) {
logger.error("Failed to rename {} to {}", mergedFile, newVmFile);
}
vmTsFileResources.add(0, new TsFileResource(newVmFile));
vmWriters.add(0, new RestorableTsFileIOWriter(newVmFile));
logger.info("{} vm file close a writer", newVmFile.getName());
}
} catch (Exception e) {
logger.error("Error occurred in Vm Merge thread", e);
} finally {
// reset the merge working state to false
mergeWorking = false;
logger.info("{}: {} vm merge end time consumption: {} ms", storageGroupName,
tsFileResource.getTsFile().getName(), System.currentTimeMillis() - startTimeMillis);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册