diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 180421fec173296418afbaeb30b3258b77b473fd..cab47cb211e6b5365e8d724d5f8df14659b7f9c4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -704,20 +704,19 @@ typedef struct { typedef struct SSttBlockLoadInfo { SBlockData blockData[2]; void *pBlockArray; - - SArray *aSttBlk; - SArray *pTombBlockArray; // tomb block array list - int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. - int32_t currentLoadBlockIndex; - int32_t loadBlocks; - double elapsedTime; - STSchema *pSchema; - int16_t *colIds; - int32_t numOfCols; - bool checkRemainingRow; - bool isLast; - bool sttBlockLoaded; - int32_t numOfStt; + SArray *aSttBlk; + SArray *pTombBlockArray; // tomb block array list + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t currentLoadBlockIndex; + int32_t loadBlocks; + double elapsedTime; + STSchema *pSchema; + int16_t *colIds; + int32_t numOfCols; + bool checkRemainingRow; + bool isLast; + bool sttBlockLoaded; + int32_t numOfStt; // keep the last access position, this position may be used to reduce the binary times for // starting last block data for a new table @@ -778,7 +777,7 @@ struct SDiskDataBuilder { typedef struct SLDataIter { SRBTreeNode node; SSttBlk *pSttBlk; - int32_t iStt; + int32_t iStt; // for debug purpose int8_t backward; int32_t iSttBlk; int32_t iRow; @@ -797,9 +796,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter, - void *pCurrentFileSet); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, + bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, + int16_t* pCols, int32_t numOfCols); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); @@ -807,10 +806,11 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt); +SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); -void destroySttBlockReader(SLDataIter *pLDataIter, int32_t numOfIter); +void* destroySttBlockReader(SArray* pLDataIterArray); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index b8f368ed472009d9407d2bb736626360e161330c..b3bf7598754305a118dd5ba4c3c7c7411863367c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -54,6 +54,36 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, return pLoadInfo; } +SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) { + SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo)); + if (pLoadInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pLoadInfo->numOfStt = 1; + + pLoadInfo->blockIndex[0] = -1; + pLoadInfo->blockIndex[1] = -1; + pLoadInfo->currentLoadBlockIndex = 1; + + int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]); + if (code) { + terrno = code; + } + + code = tBlockDataCreate(&pLoadInfo->blockData[1]); + if (code) { + terrno = code; + } + + pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); + pLoadInfo->pSchema = pSchema; + pLoadInfo->colIds = colList; + pLoadInfo->numOfCols = numOfCols; + + return pLoadInfo; +} + void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { pLoadInfo[i].currentLoadBlockIndex = 1; @@ -103,14 +133,26 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { return NULL; } -void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter) { - if (pLDataIter == NULL) { - return; +static void destroyLDataIterFn(void* param) { + SLDataIter** pIter = (SLDataIter**) param; + tLDataIterClose2(*pIter); + destroyLastBlockLoadInfo((*pIter)->pBlockLoadInfo); + taosMemoryFree(*pIter); +} + +void* destroySttBlockReader(SArray* pLDataIterArray) { + if (pLDataIterArray == NULL) { + return NULL; } - for(int32_t i = 0; i < numOfIter; ++i) { - tLDataIterClose2(&pLDataIter[i]); + int32_t numOfLevel = taosArrayGetSize(pLDataIterArray); + for(int32_t i = 0; i < numOfLevel; ++i) { + SArray* pList = taosArrayGetP(pLDataIterArray, i); + taosArrayDestroyEx(pList, destroyLDataIterFn); } + + taosArrayDestroy(pLDataIterArray); + return NULL; } static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { @@ -371,7 +413,6 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32 pIter->timeWindow.skey = pTimeWindow->skey; pIter->timeWindow.ekey = pTimeWindow->ekey; pIter->pReader = pReader; - pIter->pBlockLoadInfo = pBlockLoadInfo; if (!pBlockLoadInfo->sttBlockLoaded) { @@ -686,9 +727,9 @@ _end: } int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, - bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter, - void *pCurrentFileSet) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, + bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, + int16_t* pCols, int32_t numOfCols) { int32_t code = TSDB_CODE_SUCCESS; pMTree->backward = backward; @@ -701,45 +742,71 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn); } - pMTree->pLoadInfo = pBlockLoadInfo; - pMTree->destroyLoadInfo = destroyLoadInfo; +// pMTree->pLoadInfo = pBlockLoadInfo; +// pMTree->destroyLoadInfo = true; pMTree->ignoreEarlierTs = false; // todo handle other level of stt files, here only deal with the first level stt - int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr[0].size; + int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr->size; if (size == 0) { goto _end; } - SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr[0].data[0]; - ASSERT(pSttLevel->level == 0); + while (taosArrayGetSize(pSttFileBlockIterArray) < size) { + SArray* pList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pSttFileBlockIterArray, &pList); + } - for (int32_t i = 0; i < pSttLevel->fobjArr[0].size; ++i) { // open all last file - SSttFileReader* pSttFileReader = pLDataIter[i].pReader; - memset(&pLDataIter[i], 0, sizeof(SLDataIter)); + for(int32_t j = 0; j < size; ++j) { + SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr->data[j]; + ASSERT(pSttLevel->level == j); - if (pSttFileReader == NULL) { - SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage}; - conf.file[0] = *pSttLevel->fobjArr[0].data[i]->f; + SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j); + int32_t numOfIter = taosArrayGetSize(pList); - code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pSttFileReader); - if (code != TSDB_CODE_SUCCESS) { - return code; + if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) { + int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter; + for(int32_t k = 0; k < inc; ++k) { + SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); + taosArrayPush(pList, &pIter); } } - code = tLDataIterOpen2(&pLDataIter[i], pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, - &pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange); - if (code != TSDB_CODE_SUCCESS) { - goto _end; - } + for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file + SLDataIter* pIter = taosArrayGetP(pList, i); - bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr); - if (hasVal) { - tMergeTreeAddIter(pMTree, &pLDataIter[i]); - } else { - if (!pMTree->ignoreEarlierTs) { - pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs; + SSttFileReader *pSttFileReader = pIter->pReader; + SSttBlockLoadInfo* pLoadInfo = pIter->pBlockLoadInfo; + + // open stt file reader if not + if (pSttFileReader == NULL) { + SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage}; + conf.file[0] = *pSttLevel->fobjArr->data[i]->f; + + code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + if (pLoadInfo == NULL) { + pLoadInfo = tCreateOneLastBlockLoadInfo(pSchema, pCols, numOfCols); + } + + memset(pIter, 0, sizeof(SLDataIter)); + code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, + pLoadInfo, pMTree->idStr, strictTimeRange); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + + bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); + if (hasVal) { + tMergeTreeAddIter(pMTree, pIter); + } else { + if (!pMTree->ignoreEarlierTs) { + pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; + } } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 30bdd339c5a9d7f97d14af1f125207d5ba799b9f..9409ff1d0fd95e31a7d2f3a2efde459e0ccfb4ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -172,7 +172,8 @@ typedef struct SReaderStatus { SBlockData fileBlockData; SFilesetIter fileIter; SDataBlockIter blockIter; - SLDataIter* pLDataIter; + SArray* pLDataIterArray; +// SLDataIter* pLDataIter; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data } SReaderStatus; @@ -596,7 +597,9 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); - destroySttBlockReader(pReader->status.pLDataIter, pReader->pTsdb->pVnode->config.sttTrigger); + + pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray); + pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo); @@ -959,15 +962,10 @@ _end: static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) { // reset the index in last block when handing a new file - tMapDataClear(&pScanInfo->mapData); taosArrayClear(pScanInfo->pBlockList); } static void cleanupTableScanInfo(SReaderStatus* pStatus) { -// if (pStatus->mapDataCleaned) { -// return; -// } - SSHashObj* pTableMap = pStatus->pTableMap; STableBlockScanInfo** px = NULL; int32_t iter = 0; @@ -2856,10 +2854,10 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); - int32_t code = - tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, - pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, - pReader->idStr, false, pReader->status.pLDataIter, pReader->status.pCurrentFileset); + int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false, + pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->pSchema, + pReader->suppInfo.colId, pReader->suppInfo.numOfCols); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -3381,8 +3379,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file doCleanupTableScanInfo(pScanInfo); -// pStatus->mapDataCleaned = true; - bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -4760,8 +4756,8 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi goto _err; } - pReader->status.pLDataIter = taosMemoryCalloc(pConf->sttTrigger, sizeof(SLDataIter)); - if (pReader->status.pLDataIter == NULL) { + pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + if (pReader->status.pLDataIterArray == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -4789,7 +4785,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } static void clearSharedPtr(STsdbReader* p) { - p->status.pLDataIter = NULL; + p->status.pLDataIterArray = NULL; p->status.pTableMap = NULL; p->status.uidList.tableUidList = NULL; p->pReadSnap = NULL; @@ -4800,7 +4796,7 @@ static void clearSharedPtr(STsdbReader* p) { static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->status.pTableMap = pSrc->status.pTableMap; - pDst->status.pLDataIter = pSrc->status.pLDataIter; + pDst->status.pLDataIterArray = pSrc->status.pLDataIterArray; pDst->status.uidList = pSrc->status.uidList; pDst->pSchema = pSrc->pSchema; pDst->pSchemaMap = pSrc->pSchemaMap; @@ -4884,15 +4880,11 @@ void tsdbReaderClose2(STsdbReader* pReader) { tMergeTreeClose(&pLReader->mergeTree); getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime); - pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo); - - // todo dynamic allocate the number of stt data iter - destroySttBlockReader(pReader->status.pLDataIter, pReader->pTsdb->pVnode->config.sttTrigger); taosMemoryFree(pLReader); } - taosMemoryFreeClear(pReader->status.pLDataIter); + destroySttBlockReader(pReader->status.pLDataIterArray); taosMemoryFreeClear(pReader->status.uidList.tableUidList); tsdbDebug(