提交 37ff3a41 编写于 作者: H Haojun Liao

fix(tsdb): fix error in load tomb from stt files.

上级 6c20dfa1
...@@ -70,6 +70,7 @@ typedef struct STableBlockScanInfo { ...@@ -70,6 +70,7 @@ typedef struct STableBlockScanInfo {
int32_t fileDelIndex; // file block delete index int32_t fileDelIndex; // file block delete index
int32_t lastBlockDelIndex; // delete index for last block int32_t lastBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not bool iterInit; // whether to initialize the in-memory skip list iterator or not
bool skylineBuilt; // load current stt block
} STableBlockScanInfo; } STableBlockScanInfo;
typedef struct SBlockOrderWrapper { typedef struct SBlockOrderWrapper {
...@@ -253,8 +254,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan ...@@ -253,8 +254,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader);
STbData* piMemTbData);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
...@@ -473,6 +473,8 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t ...@@ -473,6 +473,8 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false; pInfo->iterInit = false;
pInfo->skylineBuilt = false;
pInfo->iter.hasVal = false; pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false; pInfo->iiter.hasVal = false;
...@@ -492,6 +494,7 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t ...@@ -492,6 +494,7 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t
static void clearBlockScanInfo(STableBlockScanInfo* p) { static void clearBlockScanInfo(STableBlockScanInfo* p) {
p->iterInit = false; p->iterInit = false;
p->skylineBuilt = false;
p->iter.hasVal = false; p->iter.hasVal = false;
p->iiter.hasVal = false; p->iiter.hasVal = false;
...@@ -2717,20 +2720,47 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan ...@@ -2717,20 +2720,47 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
return code; return code;
} }
static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
}
SDelData* p = NULL;
if (pMemTbData != NULL) {
p = pMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pScanInfo->pDelData, p);
}
p = p->pNext;
}
}
if (piMemTbData != NULL) {
p = piMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pScanInfo->pDelData, p);
}
p = p->pNext;
}
}
}
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
if (pBlockScanInfo->iterInit) { if (pBlockScanInfo->iterInit) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
TSDBKEY startKey = {0}; STbData* d = NULL;
TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->order)) { if (ASCENDING_TRAVERSE(pReader->order)) {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
} else { } else {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
} }
STbData* d = NULL;
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem,
&pBlockScanInfo->iter, "mem"); &pBlockScanInfo->iter, "mem");
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -2744,9 +2774,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea ...@@ -2744,9 +2774,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
return code; return code;
} }
int64_t st = taosGetTimestampUs(); doLoadMemTombData(pBlockScanInfo, d, di, pReader->verRange.maxVer);
initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
pBlockScanInfo->iterInit = true; pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2794,14 +2822,13 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t ...@@ -2794,14 +2822,13 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t
continue; continue;
} }
// todo use binary search instead here if (record.uid != uid) {
if (record.uid < uid) {
continue; continue;
} }
if (record.uid > uid) { // if (record.uid > uid) {
break; // break;
} // }
if (record.version <= maxVer) { if (record.version <= maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
...@@ -2812,7 +2839,7 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t ...@@ -2812,7 +2839,7 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo, static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo,
uint64_t maxVer) { uint64_t maxVer) {
int32_t size = taosArrayGetSize(pLDataIterList); int32_t size = taosArrayGetSize(pLDataIterList);
if (size <= 0) { if (size <= 0) {
...@@ -2824,6 +2851,8 @@ static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t su ...@@ -2824,6 +2851,8 @@ static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t su
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
} }
pBlockScanInfo->skylineBuilt = false;
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i); SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
...@@ -2880,12 +2909,15 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan ...@@ -2880,12 +2909,15 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
return false; return false;
} }
code = loadTomRecordInfoFromSttFiles(pReader->status.pLDataIterArray, pReader->suid, pScanInfo, pReader->verRange.maxVer); code = loadTombRecordInfoFromSttFiles(pReader->status.pLDataIterArray, pReader->suid, pScanInfo, pReader->verRange.maxVer);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
initMemDataIterator(pScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
// todo: del tomb order problem
initDelSkylineIterator(pScanInfo, pReader);
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange); return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
} }
...@@ -3159,47 +3191,25 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { ...@@ -3159,47 +3191,25 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
} }
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
STbData* piMemTbData) {
if (pBlockScanInfo->delSkyline != NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0; int32_t code = 0;
if (pBlockScanInfo->skylineBuilt) {
if (pBlockScanInfo->pDelData == NULL) { return code;
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
} }
SDelData* p = NULL; int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData);
if (pMemTbData != NULL) {
p = pMemTbData->pHead;
while (p) {
if (p->version <= pReader->verRange.maxVer) {
taosArrayPush(pBlockScanInfo->pDelData, p);
}
p = p->pNext;
}
}
if (piMemTbData != NULL) { if (numOfElems > 0) {
p = piMemTbData->pHead; if (pBlockScanInfo->delSkyline != NULL) {
while (p) { taosArrayClear(pBlockScanInfo->delSkyline);
if (p->version <= pReader->verRange.maxVer) { } else {
taosArrayPush(pBlockScanInfo->pDelData, p); pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
}
p = p->pNext;
} }
}
int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData);
if (numOfElems > 0) {
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
code = tsdbBuildDeleteSkyline(pBlockScanInfo->pDelData, 0, numOfElems - 1, pBlockScanInfo->delSkyline); code = tsdbBuildDeleteSkyline(pBlockScanInfo->pDelData, 0, numOfElems - 1, pBlockScanInfo->delSkyline);
} }
pBlockScanInfo->pDelData = taosArrayDestroy(pBlockScanInfo->pDelData); pBlockScanInfo->skylineBuilt = true;
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order); int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);
pBlockScanInfo->iter.index = index; pBlockScanInfo->iter.index = index;
...@@ -3208,10 +3218,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* ...@@ -3208,10 +3218,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
pBlockScanInfo->lastBlockDelIndex = index; pBlockScanInfo->lastBlockDelIndex = index;
return code; return code;
// _err:
// taosArrayDestroy(pBlockScanInfo->pDelData);
// return code;
} }
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
...@@ -3322,7 +3328,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr ...@@ -3322,7 +3328,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return code; return code;
} }
uint64_t uid = pReader->status.uidList.tableUidList[j]; uint64_t uid = pReader->status.uidList.tableUidList[j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pDelData == NULL) { if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
...@@ -3352,6 +3359,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr ...@@ -3352,6 +3359,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
if (record.version <= pReader->verRange.maxVer) { if (record.version <= pReader->verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pDelData, &delData); taosArrayPush(pScanInfo->pDelData, &delData);
pScanInfo->skylineBuilt = false;
} }
} }
...@@ -3728,6 +3736,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { ...@@ -3728,6 +3736,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
} }
initMemDataIterator(*pBlockScanInfo, pReader); initMemDataIterator(*pBlockScanInfo, pReader);
initDelSkylineIterator(*pBlockScanInfo, pReader);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
...@@ -5000,6 +5009,8 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { ...@@ -5000,6 +5009,8 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
// reset current current table's data block scan info, // reset current current table's data block scan info,
pBlockScanInfo->iterInit = false; pBlockScanInfo->iterInit = false;
pBlockScanInfo->skylineBuilt = false;
pBlockScanInfo->iter.hasVal = false; pBlockScanInfo->iter.hasVal = false;
pBlockScanInfo->iiter.hasVal = false; pBlockScanInfo->iiter.hasVal = false;
if (pBlockScanInfo->iter.iter != NULL) { if (pBlockScanInfo->iter.iter != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册