提交 8ab10a8d 编写于 作者: H Haojun Liao

fix(tsdb): fix error in check tomb record. #TD-25117

上级 37ff3a41
......@@ -61,16 +61,15 @@ typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SMapData mapData; // block info (compressed)
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pDelData; // SArray<SDelData>
SArray* pMemDelData; // SArray<SDelData>
SArray* pfileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t lastBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
bool skylineBuilt; // load current stt block
} STableBlockScanInfo;
typedef struct SBlockOrderWrapper {
......@@ -473,7 +472,7 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
pInfo->skylineBuilt = false;
// pInfo->skylineBuilt = false;
pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false;
......@@ -494,8 +493,6 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t
static void clearBlockScanInfo(STableBlockScanInfo* p) {
p->iterInit = false;
p->skylineBuilt = false;
p->iter.hasVal = false;
p->iiter.hasVal = false;
......@@ -509,7 +506,8 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList);
p->pDelData = taosArrayDestroy(p->pDelData);
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
p->pfileDelData = taosArrayDestroy(p->pfileDelData);
}
static void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
......@@ -566,19 +564,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
// if (pLReader->pInfo == NULL) {
// // here we ignore the first column, which is always be the primary timestamp column
// SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;
// // todo dynamic number of stt
// int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
// pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
// if (pLReader->pInfo == NULL) {
// tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
// return terrno;
// }
// }
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -923,11 +908,6 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid);
// if (pBrinBlk->maxTbid.uid < pList->tableUidList[j]) {
// i += 1;
// continue;
// }
// this block belongs to a table that is not queried.
STableBlockScanInfo* pScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pList->tableUidList[j], pReader->idStr);
......@@ -967,6 +947,8 @@ _end:
static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) {
// reset the index in last block when handing a new file
taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->delSkyline); // built delete skyline for each fileset
taosArrayClear(pScanInfo->pfileDelData); // del data from each file set
}
static void cleanupTableScanInfo(SReaderStatus* pStatus) {
......@@ -1033,7 +1015,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
cleanupTableScanInfo(&pReader->status);
// set the flag for the new file
int32_t i = 0, k = 0;
int32_t k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
......@@ -2721,8 +2703,8 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
}
static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
if (pScanInfo->pMemDelData == NULL) {
pScanInfo->pMemDelData = taosArrayInit(4, sizeof(SDelData));
}
SDelData* p = NULL;
......@@ -2730,7 +2712,7 @@ static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbDat
p = pMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pScanInfo->pDelData, p);
taosArrayPush(pScanInfo->pMemDelData, p);
}
p = p->pNext;
......@@ -2741,7 +2723,7 @@ static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbDat
p = piMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pScanInfo->pDelData, p);
taosArrayPush(pScanInfo->pMemDelData, p);
}
p = p->pNext;
}
......@@ -2811,7 +2793,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) {
STombRecord record = {0};
for (int32_t j = 0; j < pBlock->suid->size; ++j) {
int32_t code = tTombBlockGet(pBlock, j, &record);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2822,13 +2803,13 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t
continue;
}
if (record.uid != uid) {
continue;
if (record.suid > suid || (record.suid == suid && record.uid > uid)) {
break;
}
// if (record.uid > uid) {
// break;
// }
if (record.uid < uid) {
continue;
}
if (record.version <= maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
......@@ -2839,20 +2820,18 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t
return TSDB_CODE_SUCCESS;
}
static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo,
uint64_t maxVer) {
static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid,
STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) {
int32_t size = taosArrayGetSize(pLDataIterList);
if (size <= 0) {
return TSDB_CODE_SUCCESS;
}
uint64_t uid = pBlockScanInfo->uid;
if (pBlockScanInfo->pDelData == NULL) {
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
if (pBlockScanInfo->pfileDelData == NULL) {
pBlockScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
pBlockScanInfo->skylineBuilt = false;
for(int32_t i = 0; i < size; ++i) {
SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
......@@ -2869,7 +2848,7 @@ static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t s
for (int32_t k = 0; k < numOfBlocks; ++k) {
STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k);
int32_t code = checkTombBlockRecords(pBlockScanInfo->pDelData, pBlock, suid, uid, maxVer);
int32_t code = checkTombBlockRecords(pBlockScanInfo->pfileDelData, pBlock, suid, uid, maxVer);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2880,6 +2859,93 @@ static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t s
return TSDB_CODE_SUCCESS;
}
static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfTables) {
if (pReader->status.pCurrentFileset == NULL) {
return TSDB_CODE_SUCCESS;
}
STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3];
if (pTombFileObj == NULL) {
return TSDB_CODE_SUCCESS;
}
const TTombBlkArray* pBlkArray = NULL;
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// todo find the correct start position.
int32_t i = 0, j = 0;
while (i < pBlkArray->size && j < numOfTables) {
STombBlock block = {0};
code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
uint64_t uid = pReader->status.uidList.tableUidList[j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
STombRecord record = {0};
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) {
code = tTombBlockGet(&block, k, &record);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (record.suid < pReader->suid) {
continue;
}
if (record.suid > pReader->suid) {
tTombBlockDestroy(&block);
return TSDB_CODE_SUCCESS;
}
ASSERT(record.suid == pReader->suid);
if (record.uid < uid) {
continue;
}
bool newTable = false;
while (uid < record.uid && j < (numOfTables - 1)) {
j += 1;
uid = pReader->status.uidList.tableUidList[j];
newTable = true;
}
if (uid != record.uid) {
tTombBlockDestroy(&block);
return TSDB_CODE_SUCCESS;
} else {
if (newTable) {
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
}
}
ASSERT(record.uid == uid);
if (record.version <= pReader->verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pfileDelData, &delData);
}
}
i += 1;
tTombBlockDestroy(&block);
}
return TSDB_CODE_SUCCESS;
}
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table.
if (pLBlockReader->uid == pScanInfo->uid) {
......@@ -3014,7 +3080,6 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1;
// SBlockIndex nxtBIndex = {0};
*loadNeighbor = false;
......@@ -3193,23 +3258,25 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
int32_t code = 0;
if (pBlockScanInfo->skylineBuilt) {
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pfileDelData);
if (newDelDataInFile == 0) {
return code;
}
int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData);
if (numOfElems > 0) {
int32_t delInFile = taosArrayGetSize(pBlockScanInfo->pfileDelData);
if (delInFile > 0) {
if (pBlockScanInfo->delSkyline != NULL) {
taosArrayClear(pBlockScanInfo->delSkyline);
} else {
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
}
code = tsdbBuildDeleteSkyline(pBlockScanInfo->pDelData, 0, numOfElems - 1, pBlockScanInfo->delSkyline);
}
pBlockScanInfo->skylineBuilt = true;
taosArrayAddAll(pBlockScanInfo->pfileDelData, pBlockScanInfo->pMemDelData);
int32_t total = taosArrayGetSize(pBlockScanInfo->pfileDelData);
code = tsdbBuildDeleteSkyline(pBlockScanInfo->pfileDelData, 0, total - 1, pBlockScanInfo->delSkyline);
taosArrayClear(pBlockScanInfo->pfileDelData);
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);
pBlockScanInfo->iter.index = index;
......@@ -3222,8 +3289,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pReader->order);
// TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
bool hasKey = false, hasIKey = false;
......@@ -3305,71 +3370,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
}
taosArrayDestroy(pIndexList);
if (pReader->status.pCurrentFileset != NULL) {
STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3];
if (pTombFileObj != NULL) {
const TTombBlkArray* pBlkArray = NULL;
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t i = 0, j = 0;
// todo find the correct start position.
while (i < pBlkArray->size && j < numOfTables) {
STombBlock block = {0};
code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
uint64_t uid = pReader->status.uidList.tableUidList[j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
}
STombRecord record = {0};
for (int32_t k = 0; k < block.suid->size; ++k) {
code = tTombBlockGet(&block, k, &record);
{
while (record.uid > uid) {
j += 1;
uid = pReader->status.uidList.tableUidList[j];
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.uid < uid) {
continue;
}
}
ASSERT(record.suid == pReader->suid);
if (record.version <= pReader->verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pDelData, &delData);
pScanInfo->skylineBuilt = false;
}
}
i += 1;
tTombBlockDestroy(&block);
}
}
}
return TSDB_CODE_SUCCESS;
return loadTombRecordsFromDataFiles(pReader, numOfTables);
}
static void resetTableListIndex(SReaderStatus* pStatus) {
......@@ -3605,13 +3606,11 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
}
STableBlockScanInfo* pScanInfo = *p;
tMapDataReset(&pScanInfo->mapData);
SDataBlk block = {0};
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
pReader->rowsNum += block.nRow;
}
// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
// pReader->rowsNum += block.nRow;
// }
}
_end:
......@@ -4977,6 +4976,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->pfileDelData = taosArrayDestroy(pInfo->pfileDelData);
}
} else {
// resetDataBlockScanInfo excluding lastKey
......@@ -5009,7 +5009,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;
pBlockScanInfo->skylineBuilt = false;
pBlockScanInfo->iter.hasVal = false;
pBlockScanInfo->iiter.hasVal = false;
......@@ -5022,7 +5021,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
}
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
tMapDataClear(&pBlockScanInfo->mapData);
// TODO: keep skyline for reuse
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
}
......@@ -5343,16 +5341,14 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
int64_t st = taosGetTimestampUs();
TARRAY2_CLEAR(&pSup->colAggArray, 0);
// if (pFBlock->record.smaSize > 0) {
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pFBlock->record, &pSup->colAggArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
pReader->idStr);
return code;
}
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pFBlock->record, &pSup->colAggArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
pReader->idStr);
return code;
}
if (pSup->colAggArray.size > 0) {
*allHave = true;
......@@ -5394,7 +5390,6 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
int32_t i = 0, j = 0;
while (j < numOfCols && i < size) {
// SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
SColumnDataAgg* pAgg = &pSup->colAggArray.data[i];
if (pAgg->colId == pSup->colId[j]) {
pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册