From f2b83dfb4a6d05f4953cbbf207cafd92954359df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Jun 2022 10:35:07 +0800 Subject: [PATCH] fix(query):check null ptr. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 185 ++++++++++++++++++------- source/libs/executor/src/executor.c | 4 + 2 files changed, 138 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 33c96d2125..67a88d0e9f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -118,6 +118,11 @@ typedef struct SFileBlockDumpInfo { int64_t lastKey; } SFileBlockDumpInfo; +typedef struct SVersionRange { + uint64_t minVer; + uint64_t maxVer; +} SVersionRange; + typedef struct SComposedDataBlock { bool composed; int32_t rows; @@ -154,8 +159,7 @@ struct STsdbReader { STSchema* pSchema; SDataFReader* pFileReader; - int64_t startVersion; - int64_t endVersion; + SVersionRange verRange; #if 0 SFileBlockInfo* pDataBlockInfo; SDataCols* pDataCols; // in order to hold current file data block @@ -413,8 +417,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->order = pCond->order; pReader->capacity = 4096; pReader->idStr = strdup(idstr); - pReader->startVersion= pCond->startVersion; - pReader->endVersion = 100000;//pCond->endVersion; // todo for test purpose + pReader->verRange.minVer= pCond->startVersion; + pReader->verRange.maxVer = 100000;//pCond->endVersion; // todo for test purpose pReader->type = pCond->type; pReader->window = *pCond->twindows; @@ -954,6 +958,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ return TSDB_CODE_SUCCESS; } +static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlockData* pBlockData) { + pDumpInfo->rowIndex = pBlockData->nRow; + pDumpInfo->totalRows = pBlockData->nRow; + pDumpInfo->lastKey = pBlockData->aTSKEY[pBlockData->nRow - 1] + 1; // todo step value +} + static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); @@ -966,6 +976,19 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI goto _error; } + SColVal cv = {0}; + for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) { + SColData* pData = (SColData*)taosArrayGet(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]); + SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i); + for(int32_t j = 0; j < pBlockData->nRow; ++j) { + tColDataGetValue(pData, j, &cv); + colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); + } + } + + pReader->pResBlock->info.rows = pBlockData->nRow; + setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); + /* int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pReader)), true); @@ -2081,7 +2104,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte return TSDB_CODE_SUCCESS; } -static bool blockIteratorNext(STsdbReader* pReader, SDataBlockIter* pBlockIter) { +static bool blockIteratorNext(SDataBlockIter* pBlockIter) { if (pBlockIter->index >= pBlockIter->numOfBlocks - 1) { return false; } @@ -2252,6 +2275,15 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts)); } +static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { + return (key.ts >= pBlock->minKey.ts || key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; +} + +static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { + return (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) || + keyOverlapFileBlock(key, pBlock, &pReader->verRange)); +} + static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) { if (pBlockScanInfo->iter != NULL) { pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter); @@ -2446,7 +2478,7 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* } int32_t code = TSDB_CODE_SUCCESS; - TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion}; + TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->verRange.minVer}; STbData* d = NULL; if (pReader->pTsdb->mem != NULL) { @@ -2498,6 +2530,39 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead return key; } +static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { + SReaderStatus* pStatus = &pReader->status; + + while (1) { + bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader); + if (!hasNext) { // no data files on disk + break; + } + + SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); + int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (taosArrayGetSize(pIndexList) > 0) { + uint32_t numOfValidTable = 0; + code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (numOfValidTable > 0) { + break; + } + } + + // no blocks in current file, try next files + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; @@ -2507,35 +2572,13 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { if (pFIter->index < pFIter->numOfFiles) { if (pReader->status.blockIter.index == -1) { int32_t numOfBlocks = 0; - - while (1) { - bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader); - if (!hasNext) { // no data files on disk - break; - } - - SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); - int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (taosArrayGetSize(pIndexList) > 0) { - uint32_t numOfValidTable = 0; - code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, &numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (numOfValidTable > 0) { - break; - } - } - - // no blocks in current file, try next files + int32_t code = moveToNextFile(pReader, &numOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t code = initBlockIterator(pReader, pBlockIter, numOfBlocks); + // initialize the block iterator for a new fileset + code = initBlockIterator(pReader, pBlockIter, numOfBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2546,14 +2589,15 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); - if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) /*|| points overlaps with data block*/) { + if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { SBlockData data = {0}; doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); + // build composed data block buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { // data in memory that are earlier than current file block - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion}; + TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; buildInmemDataBlock(pReader, pScanInfo, &maxKey); // build data block from in-memory buffer data completed. } else { // whole block is required, return it directly @@ -2567,8 +2611,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { pInfo->window.skey = pBlock->minKey.ts; pInfo->window.ekey = pBlock->maxKey.ts; setComposedBlockFlag(pReader, false); - *exists = true; } + *exists = true; } else { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); @@ -2577,14 +2621,53 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { // current block are exhausted, try the next file block if (pDumpInfo->rowIndex >= pBlock->nRow) { - bool hasNext = blockIteratorNext(pReader, &pReader->status.blockIter); - if (!hasNext) { - // current file is exhausted, let's try the next file + bool hasNext = blockIteratorNext(&pReader->status.blockIter); + if (!hasNext) { // current file is exhausted, let's try the next file + int32_t numOfBlocks = 0; + int32_t code = moveToNextFile(pReader, &numOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // initialize the block iterator for a new fileset + code = initBlockIterator(pReader, pBlockIter, numOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + pFBlock = getCurrentBlockInfo(pBlockIter); + pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + + TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); + if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { + SBlockData data = {0}; + doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); + // build composed data block + buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); + } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { + // data in memory that are earlier than current file block + TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + buildInmemDataBlock(pReader, pScanInfo, &maxKey); + // build data block from in-memory buffer data completed. + } else { // whole block is required, return it directly + // todo + // 1. the version of all rows should be less than the endVersion + // 2. current block should not overlap with next neighbor block + // 3. current timestamp should not be overlap with each other + SDataBlockInfo* pInfo = &pReader->pResBlock->info; + pInfo->rows = pBlock->nRow; + pInfo->uid = pScanInfo->uid; + pInfo->window.skey = pBlock->minKey.ts; + pInfo->window.ekey = pBlock->maxKey.ts; + setComposedBlockFlag(pReader, false); + } + *exists = true; } else { // try next data block in current file // 1. check if ts in buffer is overlap with current file data block - TSDBKEY key1 = getCurrentKeyInBuf(pBlockIter, pReader); + blockIteratorNext(pBlockIter); + } } else { @@ -2646,7 +2729,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { return NULL; } - if (key.version <= pReader->endVersion) { + if (key.version <= pReader->verRange.maxVer) { return pRow; } @@ -2664,7 +2747,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { return NULL; } - if (key.version <= pReader->endVersion) { + if (key.version <= pReader->verRange.maxVer) { return pRow; } } @@ -2699,7 +2782,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* int32_t rowIndex = pDumpInfo->rowIndex + 1; while (pBlockData->aTSKEY[rowIndex] == key) { - if (pBlockData->aVersion[rowIndex] > pReader->endVersion) { + if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer) { continue; } @@ -3217,7 +3300,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; initMemIterator(pBlockScanInfo, pReader); - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion}; + TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); if (pReader->pResBlock->info.rows > 0) { return true; @@ -3354,16 +3437,16 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** } SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { - if (pReader->status.composedDataBlock) { + SReaderStatus* pStatus = &pReader->status; + + if (pStatus->composedDataBlock) { return pReader->pResBlock->pDataBlock; } else { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - int32_t code = tBlockDataInit(&pReader->status.fileBlockData); - doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &pReader->status.fileBlockData); -// TSDBROW row = tsdbRowFromBlockData(&pReader->status.fileBlockData, 0); -// doAppendOneRow(pReader->pResBlock, pReader, row.); + int32_t code = tBlockDataInit(&pStatus->fileBlockData); + doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); return pReader->pResBlock->pDataBlock; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9cf6be5eef..290bf2da20 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -187,6 +187,10 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + if (pTaskInfo->schemaVer.sw == NULL) { + return TSDB_CODE_SUCCESS; + } + *sversion = pTaskInfo->schemaVer.sw->version; *tversion = pTaskInfo->schemaVer.tversion; if (pTaskInfo->schemaVer.dbname) { -- GitLab