提交 9e98c64b 编写于 作者: H Haojun Liao

fix(tsdb): set correct stt load info. update the test case.

上级 450a9f3c
...@@ -816,7 +816,7 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi ...@@ -816,7 +816,7 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void* destroySttBlockReader(SArray* pLDataIterArray); void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef struct SCacheRowsReader { typedef struct SCacheRowsReader {
......
...@@ -134,14 +134,13 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { ...@@ -134,14 +134,13 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
return NULL; return NULL;
} }
static void destroyLDataIterFn(void* param) { static void destroyLDataIter(SLDataIter* pIter) {
SLDataIter** pIter = (SLDataIter**) param; tLDataIterClose2(pIter);
tLDataIterClose2(*pIter); destroyLastBlockLoadInfo(pIter->pBlockLoadInfo);
destroyLastBlockLoadInfo((*pIter)->pBlockLoadInfo); taosMemoryFree(pIter);
taosMemoryFree(*pIter);
} }
void* destroySttBlockReader(SArray* pLDataIterArray) { void* destroySttBlockReader(SArray* pLDataIterArray, int64_t* blocks, double* el) {
if (pLDataIterArray == NULL) { if (pLDataIterArray == NULL) {
return NULL; return NULL;
} }
...@@ -149,7 +148,13 @@ void* destroySttBlockReader(SArray* pLDataIterArray) { ...@@ -149,7 +148,13 @@ void* destroySttBlockReader(SArray* pLDataIterArray) {
int32_t numOfLevel = taosArrayGetSize(pLDataIterArray); int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
for(int32_t i = 0; i < numOfLevel; ++i) { for(int32_t i = 0; i < numOfLevel; ++i) {
SArray* pList = taosArrayGetP(pLDataIterArray, i); SArray* pList = taosArrayGetP(pLDataIterArray, i);
taosArrayDestroyEx(pList, destroyLDataIterFn); for(int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
SLDataIter* pIter = taosArrayGetP(pList, j);
*el += pIter->pBlockLoadInfo->elapsedTime;
*blocks += pIter->pBlockLoadInfo->loadBlocks;
destroyLDataIter(pIter);
}
taosArrayDestroy(pList);
} }
taosArrayDestroy(pLDataIterArray); taosArrayDestroy(pLDataIterArray);
...@@ -499,7 +504,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { ...@@ -499,7 +504,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
if (index != -1) { if (index != -1) {
pIter->iSttBlk = index; pIter->iSttBlk = index;
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk, tsdbDebug("try next last file block:%d from stt fileIdx:%d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
oldIndex, pIter->uid, pIter->iStt, idStr); oldIndex, pIter->uid, pIter->iStt, idStr);
} else { } else {
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr); tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
......
...@@ -564,17 +564,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA ...@@ -564,17 +564,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
pLReader->uid = 0; pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
if (pLReader->pInfo == NULL) { // if (pLReader->pInfo == NULL) {
// here we ignore the first column, which is always be the primary timestamp column // // here we ignore the first column, which is always be the primary timestamp column
SBlockLoadSuppInfo* pInfo = &pReader->suppInfo; // SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;
// todo dynamic number of stt // // todo dynamic number of stt
int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger; // int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt); // pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
if (pLReader->pInfo == NULL) { // if (pLReader->pInfo == NULL) {
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); // tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
return terrno; // return terrno;
} // }
} // }
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -592,16 +592,13 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo ...@@ -592,16 +592,13 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
} }
SIOCostSummary* pSum = &pReader->cost; SIOCostSummary* pSum = &pReader->cost;
getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);
pIter->pLastBlockReader->uid = 0; pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
// check file the time range of coverage // check file the time range of coverage
STimeWindow win = {0}; STimeWindow win = {0};
...@@ -1580,17 +1577,11 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -1580,17 +1577,11 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
} }
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
TABLEID tid = {.suid = pReader->suid, .uid = uid};
code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord* pRecord = &pBlockInfo->record; SBrinRecord* pRecord = &pBlockInfo->record;
code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pReader->pSchema, &pSup->colId[1], code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1],
pSup->numOfCols - 1); pSup->numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
...@@ -2010,8 +2001,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* ...@@ -2010,8 +2001,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
// 4. output buffer should be large enough to hold all rows in current block // 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data // 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts // 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
SDataBlockToLoadInfo info = {0}; SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
...@@ -2022,8 +2013,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock ...@@ -2022,8 +2013,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
// log the reason why load the datablock for profile // log the reason why load the datablock for profile
if (loadDataBlock) { if (loadDataBlock) {
tsdbDebug("%p uid:%" PRIu64 tsdbDebug("%p uid:%" PRIu64
" need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, " " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired, pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock, info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
pReader->idStr); pReader->idStr);
...@@ -3566,8 +3557,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -3566,8 +3557,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table // update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64 tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
...@@ -3893,8 +3883,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -3893,8 +3883,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else { } else {
// all data blocks in files are checked, let's check the data in last files. // all data blocks in files are checked, let's check the data in last files.
// ASSERT(pReader->status.pCurrentFileset->nSttF > 0);
// data blocks in current file are exhausted, let's try the next file now // data blocks in current file are exhausted, let's try the next file now
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
if (pBlockData->uid != 0) { if (pBlockData->uid != 0) {
...@@ -4913,13 +4901,10 @@ void tsdbReaderClose2(STsdbReader* pReader) { ...@@ -4913,13 +4901,10 @@ void tsdbReaderClose2(STsdbReader* pReader) {
if (pFilesetIter->pLastBlockReader != NULL) { if (pFilesetIter->pLastBlockReader != NULL) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
taosMemoryFree(pLReader); taosMemoryFree(pLReader);
} }
destroySttBlockReader(pReader->status.pLDataIterArray); destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
taosMemoryFreeClear(pReader->status.uidList.tableUidList); taosMemoryFreeClear(pReader->status.uidList.tableUidList);
tsdbDebug( tsdbDebug(
......
...@@ -127,7 +127,7 @@ class TDTestCase: ...@@ -127,7 +127,7 @@ class TDTestCase:
self.c2Sum = None self.c2Sum = None
# create database db # create database db
sql = f"create database db vgroups 5 replica 3" sql = f"create database db vgroups 5 replica 3 stt_trigger 1"
tdLog.info(sql) tdLog.info(sql)
tdSql.execute(sql) tdSql.execute(sql)
sql = f"use db" sql = f"use db"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册