diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a4cbfe60f705dda7671351beb6056318d083a362..3b7687359c35b7dd0bc6a190ed735e1c6384fae1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -178,7 +178,6 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); -void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index af1a42d018919b81dac90a8a27124f5eb9a15b65..132b7c797c432502596ff701aa3dc0d133c5d5dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -515,7 +515,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: - return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL); + return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); } SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 717b76046a6f79c75c4f6346b185f21510e881e1..0e9b3c9407baea430dd4b388f4d811c2e6ef72c9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -220,6 +220,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); +static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid); + static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, @@ -998,7 +1000,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo } } -static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { +static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -1015,6 +1017,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; + // no data exists, return directly. + if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) { + tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader, pBlockInfo->uid, + pReader->idStr); + pResBlock->info.rows = 0; + return 0; + } + if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) { if (asc && pReader->window.skey <= pBlock->minKey.ts) { // pDumpInfo->rowIndex = 0; @@ -1128,12 +1138,19 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { + int32_t code = 0; int64_t st = taosGetTimestampUs(); tBlockDataReset(pBlockData); + STSchema* pSchema = getLatestTableSchema(pReader, uid); + if (pSchema == NULL) { + tsdbDebug("%p table uid:%"PRIu64" has been dropped, no data existed, %s", pReader, uid, pReader->idStr); + return code; + } + + SBlockLoadSuppInfo* pSup = &pReader->suppInfo; TABLEID tid = {.suid = pReader->suid, .uid = uid}; - int32_t code = - tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1); + code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1656,6 +1673,19 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas return false; } +static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) { + if (pReader->pSchema != NULL) { + return pReader->pSchema; + } + + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); + if (pReader->pSchema == NULL) { + tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); + } + + return pReader->pSchema; +} + static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { @@ -2459,7 +2489,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && pBlock->nRow <= pReader->capacity) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { - copyBlockDataToSDataBlock(pReader, pBlockScanInfo); + copyBlockDataToSDataBlock(pReader); // record the last key value pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; @@ -2936,8 +2966,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // update the last key for the corresponding table pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " + tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts, pBlock->maxKey.ts, pReader->idStr); @@ -3867,6 +3896,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL // no valid error code set in metaGetTbTSchema, so let's set the error code here. // we should proceed in case of tmq processing. if (pCond->suid != 0) { + taosSsleep(20); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1); if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr); @@ -4029,9 +4060,11 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); + if (pReader->pMemSchema != pReader->pSchema) { taosMemoryFree(pReader->pMemSchema); } + taosMemoryFreeClear(pReader); } @@ -4111,26 +4144,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } -static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { - *rows = pReader->pResBlock->info.rows; - *uid = pReader->pResBlock->info.id.uid; - *pWindow = pReader->pResBlock->info.window; -} - -void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { - if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { - if (pReader->step == EXTERNAL_ROWS_MAIN) { - setBlockInfo(pReader, rows, uid, pWindow); - } else if (pReader->step == EXTERNAL_ROWS_PREV) { - setBlockInfo(pReader->innerReader[0], rows, uid, pWindow); - } else { - setBlockInfo(pReader->innerReader[1], rows, uid, pWindow); - } - } else { - setBlockInfo(pReader, rows, uid, pWindow); - } -} - static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) { // do fill all null column value SMA info int32_t i = 0, j = 0; @@ -4266,7 +4279,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return NULL; } - copyBlockDataToSDataBlock(pReader, pBlockScanInfo); + copyBlockDataToSDataBlock(pReader); return pReader->pResBlock; }