未验证 提交 1484436e 编写于 作者: A Alan Choo 提交者: GitHub

fix (#8285)

上级 84c01ae8
......@@ -389,6 +389,8 @@ public class DataRegion {
private long lastLogTime;
/** last recovery log files num */
private long lastLogCheckFilesNum;
/** recover performers of unsealed TsFiles */
private List<UnsealedTsFileRecoverPerformer> recoverPerformers = new ArrayList<>();
public DataRegionRecoveryContext(long numOfFilesToRecover) {
this.numOfFilesToRecover = numOfFilesToRecover;
......@@ -441,13 +443,13 @@ public class DataRegion {
// split by partition so that we can find the last file of each partition and decide to
// close it or not
DataRegionRecoveryContext DataRegionRecoveryContext =
DataRegionRecoveryContext dataRegionRecoveryContext =
new DataRegionRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size());
Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
splitResourcesByPartition(tmpSeqTsFiles);
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
splitResourcesByPartition(tmpUnseqTsFiles);
// recover unsealed TsFiles
// submit unsealed TsFiles to recover
List<WALRecoverListener> recoverListeners = new ArrayList<>();
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
// tsFiles without resource file are unsealed
......@@ -459,7 +461,7 @@ public class DataRegion {
} else {
value.remove(value.size() - 1);
WALRecoverListener recoverListener =
recoverUnsealedTsFile(tsFileResource, DataRegionRecoveryContext, true);
recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, true);
if (recoverListener != null) {
recoverListeners.add(recoverListener);
}
......@@ -476,13 +478,14 @@ public class DataRegion {
} else {
value.remove(value.size() - 1);
WALRecoverListener recoverListener =
recoverUnsealedTsFile(tsFileResource, DataRegionRecoveryContext, false);
recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false);
if (recoverListener != null) {
recoverListeners.add(recoverListener);
}
}
}
}
// signal wal recover manager to recover this region's files
WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
// recover sealed TsFiles
if (!partitionTmpSeqTsFiles.isEmpty()) {
......@@ -491,7 +494,7 @@ public class DataRegion {
for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
recoverFilesInPartition(
partitionFiles.getKey(),
DataRegionRecoveryContext,
dataRegionRecoveryContext,
partitionFiles.getValue(),
true,
partitionFiles.getKey() == latestPartitionId);
......@@ -500,7 +503,7 @@ public class DataRegion {
for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) {
recoverFilesInPartition(
partitionFiles.getKey(),
DataRegionRecoveryContext,
dataRegionRecoveryContext,
partitionFiles.getValue(),
false,
false);
......@@ -514,7 +517,16 @@ public class DataRegion {
recoverListener.getCause());
}
// update VSGRecoveryContext
DataRegionRecoveryContext.incrementRecoveredFilesNum();
dataRegionRecoveryContext.incrementRecoveredFilesNum();
}
// recover unsealed TsFiles, sort make sure last flush time not be replaced by early files
dataRegionRecoveryContext.recoverPerformers.sort(
(p1, p2) ->
compareFileName(
p1.getTsFileResource().getTsFile(), p2.getTsFileResource().getTsFile()));
for (UnsealedTsFileRecoverPerformer recoverPerformer :
dataRegionRecoveryContext.recoverPerformers) {
recoverUnsealedTsFileCallBack(recoverPerformer);
}
for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
......@@ -732,13 +744,12 @@ public class DataRegion {
TsFileResource unsealedTsFile, DataRegionRecoveryContext context, boolean isSeq) {
UnsealedTsFileRecoverPerformer recoverPerformer =
new UnsealedTsFileRecoverPerformer(
unsealedTsFile, isSeq, idTable, this::callbackAfterUnsealedTsFileRecovered);
unsealedTsFile, isSeq, idTable, performer -> context.recoverPerformers.add(performer));
// remember to close UnsealedTsFileRecoverPerformer
return WALRecoverManager.getInstance().addRecoverPerformer(recoverPerformer);
}
private void callbackAfterUnsealedTsFileRecovered(
UnsealedTsFileRecoverPerformer recoverPerformer) {
private void recoverUnsealedTsFileCallBack(UnsealedTsFileRecoverPerformer recoverPerformer) {
TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
boolean isSeq = recoverPerformer.isSequence();
if (!recoverPerformer.canWrite()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册