diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 030f7a09509ae56e8fc8973646cec4719f2c4fff..06397fc724e4125bb82ad93738ae9f8991806085 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -368,20 +368,20 @@ typedef struct { } SCacheFlushState; struct STsdb { - char *path; - SVnode *pVnode; - STsdbKeepCfg keepCfg; - TdThreadRwlock rwLock; - SMemTable *mem; - SMemTable *imem; - STsdbFS fs; // old - SLRUCache *lruCache; - SCacheFlushState flushState; - TdThreadMutex lruMutex; - SLRUCache *biCache; - TdThreadMutex biMutex; + char *path; + SVnode *pVnode; + STsdbKeepCfg keepCfg; + TdThreadRwlock rwLock; + SMemTable *mem; + SMemTable *imem; + STsdbFS fs; // old + SLRUCache *lruCache; + SCacheFlushState flushState; + TdThreadMutex lruMutex; + SLRUCache *biCache; + TdThreadMutex biMutex; struct STFileSystem *pFS; // new - SRocksCache rCache; + SRocksCache rCache; }; struct TSDBKEY { @@ -659,12 +659,19 @@ struct SDelFWriter { uint8_t *aBuf[1]; }; +#include "tarray2.h" +//#include "tsdbFS2.h" +// struct STFileSet; +typedef struct STFileSet STFileSet; +typedef TARRAY2(STFileSet *) TFileSetArray; + struct STsdbReadSnap { - SMemTable *pMem; - SQueryNode *pNode; - SMemTable *pIMem; - SQueryNode *pINode; - STsdbFS fs; + SMemTable *pMem; + SQueryNode *pNode; + SMemTable *pIMem; + SQueryNode *pINode; + TFileSetArray *pfSetArray; + STsdbFS fs; }; struct SDataFWriter { @@ -777,7 +784,7 @@ struct SDiskDataBuilder { typedef struct SLDataIter { SRBTreeNode node; SSttBlk *pSttBlk; - int32_t iStt; // for debug purpose + int32_t iStt; // for debug purpose int8_t backward; int32_t iSttBlk; int32_t iRow; @@ -796,22 +803,22 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); typedef struct { - int8_t backward; - STsdb *pTsdb; - uint64_t suid; - uint64_t uid; - STimeWindow timewindow; - SVersionRange verRange; - bool strictTimeRange; - SArray *pSttFileBlockIterArray; - void *pCurrentFileset; - STSchema *pSchema; - int16_t *pCols; - int32_t numOfCols; - void *pReader; - void *idstr; + int8_t backward; + STsdb *pTsdb; + uint64_t suid; + uint64_t uid; + STimeWindow timewindow; + SVersionRange verRange; + bool strictTimeRange; + SArray *pSttFileBlockIterArray; + void *pCurrentFileset; + STSchema *pSchema; + int16_t *pCols; + int32_t numOfCols; + void *pReader; + void *idstr; } SMergeTreeConf; -int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf); +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); @@ -827,29 +834,28 @@ void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *block // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { - STsdb *pTsdb; - SVersionRange verRange; - TdThreadMutex readerMutex; - SVnode *pVnode; - STSchema *pSchema; - STSchema *pCurrSchema; - uint64_t uid; - uint64_t suid; - char **transferBuf; // todo remove it soon - int32_t numOfCols; - SArray *pCidList; - int32_t *pSlotIds; - int32_t type; - int32_t tableIndex; // currently returned result tables - STableKeyInfo *pTableList; // table id list - int32_t numOfTables; - SSttBlockLoadInfo *pLoadInfo; - SLDataIter *pDataIter; - STsdbReadSnap *pReadSnap; - SDataFReader *pDataFReader; - SDataFReader *pDataFReaderLast; - const char *idstr; - int64_t lastTs; + STsdb *pTsdb; + SVersionRange verRange; + TdThreadMutex readerMutex; + SVnode *pVnode; + STSchema *pSchema; + STSchema *pCurrSchema; + uint64_t uid; + uint64_t suid; + char **transferBuf; // todo remove it soon + int32_t numOfCols; + SArray *pCidList; + int32_t *pSlotIds; + int32_t type; + int32_t tableIndex; // currently returned result tables + STableKeyInfo *pTableList; // table id list + int32_t numOfTables; + SArray *pLDataIterArray; + STsdbReadSnap *pReadSnap; + SDataFReader *pDataFReader; + SDataFReader *pDataFReaderLast; + char *idstr; + int64_t lastTs; } SCacheRowsReader; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 52ad923fcaeb1b9f4748731a673707a7cd8a2219..d9e7c1883a4363fa0eca7e00250b889b1d3474ad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "tsdb.h" +#include "tsdbDataFileRW.h" #include "vnd.h" #define ROCKS_BATCH_SIZE (4096) @@ -1714,201 +1715,327 @@ _err: return code; } */ -typedef enum { - SFSLASTNEXTROW_FS, - SFSLASTNEXTROW_FILESET, - SFSLASTNEXTROW_BLOCKDATA, - SFSLASTNEXTROW_BLOCKROW -} SFSLASTNEXTROWSTATES; - typedef struct { - SFSLASTNEXTROWSTATES state; // [input] - STsdb *pTsdb; // [input] - STSchema *pTSchema; // [input] - tb_uid_t suid; - tb_uid_t uid; - int32_t nFileSet; - int32_t iFileSet; - SArray *aDFileSet; - SDataFReader **pDataFReader; - TSDBROW row; - - bool checkRemainingRow; - SMergeTree mergeTree; - SMergeTree *pMergeTree; - SSttBlockLoadInfo *pLoadInfo; - SLDataIter *pDataIter; - int64_t lastTs; -} SFSLastNextRowIter; - -static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols) { - SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; - int32_t code = 0; - bool checkRemainingRow = true; + SMergeTree mergeTree; + SMergeTree *pMergeTree; +} SFSLastIter; - switch (state->state) { - case SFSLASTNEXTROW_FS: - state->nFileSet = taosArrayGetSize(state->aDFileSet); - state->iFileSet = state->nFileSet; +static int32_t lastIterClose(SFSLastIter **iter) { + int32_t code = 0; - case SFSLASTNEXTROW_FILESET: { - SDFileSet *pFileSet = NULL; - _next_fileset: - if (state->pMergeTree != NULL) { - tMergeTreeClose(state->pMergeTree); - state->pMergeTree = NULL; - } + if (iter->pMergeTree) { + tMergeTreeClose(iter->pMergeTree); + iter->pMergeTree = NULL; + } - if (--state->iFileSet >= 0) { - pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); - } else { - *ppRow = NULL; - return code; - } + *iter = NULL; - if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) { - if (*state->pDataFReader != NULL) { - tsdbDataFReaderClose(state->pDataFReader); + return code; +} - resetLastBlockLoadInfo(state->pLoadInfo); - } +static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, + tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, STFileSet *pFileSet, int16_t *aCols, + int nCols) { + int32_t code = 0; - code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); - if (code) goto _err; - } + if (!iter->pMergeTree) { + int64_t loadBlocks = 0; + double elapse = 0; + pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse); + pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + + SMergeTreeConf conf = { + .uid = uid, + .suid = suid, + .pTsdb = pTsdb, + .timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX}, + .verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, + .strictTimeRange = false, + .pSchema = pTSchema, + .pCurrentFileset = pFileSet, + .backward = 1, + .pSttFileBlockIterArray = pr->pLDataIterArray, + .pCols = aCols, + .numOfCols = nCols, + .pReader = pr, + .idstr = pr->idstr, + }; + + code = tMergeTreeOpen2(&iter->mergeTree, &conf); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } + + iter->pMergeTree = &iter->mergeTree; + } + + // retrieve next row + bool hasVal = tMergeTreeNext(iter->pMergeTree); + if (!hasVal) { + *ppRow = NULL; + return code; + } + + *ppRow = tMergeTreeGetRow(iter->pMergeTree); + + return code; +} + +typedef enum SFSNEXTROWSTATES { + SFSNEXTROW_FS, + SFSNEXTROW_FILESET, + SFSNEXTROW_INDEXLIST, + SFSNEXTROW_BRINBLOCK, + SFSNEXTROW_BRINRECORD, + SFSNEXTROW_BLOCKDATA, + SFSNEXTROW_BLOCKROW +} SFSNEXTROWSTATES; + +struct CacheNextRowIter; + +typedef struct SFSNextRowIter { + SFSNEXTROWSTATES state; // [input] + STsdb *pTsdb; // [input] + SBlockIdx *pBlockIdxExp; // [input] + STSchema *pTSchema; // [input] + tb_uid_t suid; + tb_uid_t uid; + int32_t nFileSet; + int32_t iFileSet; + STFileSet *pFileSet; + TFileSetArray *aDFileSet; + SDataFileReader *pFileReader; + SArray *pIndexList; + int32_t iBrinIndex; + SBrinBlock brinBlock; + int32_t iBrinRecord; + SBrinRecord brinRecord; + SBlockData blockData; + SBlockData *pBlockData; + int32_t nRow; + int32_t iRow; + TSDBROW row; + int64_t lastTs; + SFSLastIter lastIter; + SFSLastIter *pLastIter; + TSDBROW *pLastRow; + struct CacheNextRowIter *pRowIter; +} SFSNextRowIter; + +static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { + SFSNextRowIter *state = (SFSNextRowIter *)iter; + int32_t code = 0; + + if (SFSNEXTROW_FS == state->state) { + state->nFileSet = TARRAY2_SIZE(state->aDFileSet); + state->iFileSet = state->nFileSet; + + state->state = SFSNEXTROW_FILESET; + } - int nTmpCols = nCols; - bool hasTs = false; - if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { - --nTmpCols; - hasTs = true; + if (SFSNEXTROW_FILESET == state->state) { + _next_fileset: + if (--state->iFileSet < 0) { // no fileset left, cleanup and return NULL row + + } else { + state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet); + } + + STFileObj **pFileObj = pFileSet->farr; + if (pFileObj[0] != NULL || pFileObj[3] != NULL) { + SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage}; + const char *filesName[4] = {0}; + if (pFileObj[0] != NULL) { + conf.files[0].file = *pFileObj[0]->f; + conf.files[0].exist = true; + filesName[0] = pFileObj[0]->fname; + + conf.files[1].file = *pFileObj[1]->f; + conf.files[1].exist = true; + filesName[1] = pFileObj[1]->fname; + + conf.files[2].file = *pFileObj[2]->f; + conf.files[2].exist = true; + filesName[2] = pFileObj[2]->fname; } - for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) { - state->pLoadInfo[i].colIds = hasTs ? aCols + 1 : aCols; - state->pLoadInfo[i].numOfCols = nTmpCols; - state->pLoadInfo[i].isLast = isLast; + + if (pFileObj[3] != NULL) { + conf.files[3].exist = true; + conf.files[3].file = *pFileObj[3]->f; + filesName[3] = pFileObj[3]->fname; } - tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, - &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, - state->pDataIter); - state->pMergeTree = &state->mergeTree; - state->state = SFSLASTNEXTROW_BLOCKROW; - } - case SFSLASTNEXTROW_BLOCKROW: { - if (nCols != state->pLoadInfo->numOfCols) { - for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) { - state->pLoadInfo[i].numOfCols = nCols; - state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow; - } + code = tsdbDataFileReaderOpen(filesName, &conf, &state->pFileReader); + if (code != TSDB_CODE_SUCCESS) { + goto _err; } - bool hasVal = tMergeTreeNext(&state->mergeTree); - if (!hasVal) { - if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) { - *pIgnoreEarlierTs = true; - *ppRow = NULL; - return code; - } - state->state = SFSLASTNEXTROW_FILESET; - goto _next_fileset; + + // TODO: load tomb data from data and sttt + state->pLastIter = &state->lastIter; + + if (!state->pIndexList) { + state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk)); + } else { + taosArrayClear(state->pIndexList); } - state->row = *tMergeTreeGetRow(&state->mergeTree); - *ppRow = &state->row; + const TBrinBlkArray *pBlkArray = NULL; - if (TSDBROW_TS(&state->row) <= state->lastTs) { - *pIgnoreEarlierTs = true; - *ppRow = NULL; - return code; + int32_t code = tsdbDataFileReadBrinBlk(state->pFileReader, &pBlkArray); + if (code != TSDB_CODE_SUCCESS) { + goto _err; } - *pIgnoreEarlierTs = false; - /* - if (!hasVal) { - state->state = SFSLASTNEXTROW_FILESET; + for (int i = TARRAY2_SIZE(pBlkArray); i >= 0; --i) { + SBrinBlk *pBrinBlk = &pBlkArray->data[i]; + if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) { + if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) { + taosArrayPush(state->pIndexList, pBrinBlk); + } + } else if (state->suid > pBrinBlk->maxTbid.suid || + (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) { + break; + } } - */ - if (!state->checkRemainingRow) { - state->checkRemainingRow = true; + + int indexSize = TARRAY_SIZE(state->pIndexList); + if (indexSize <= 0) { + // goto next fileset } - return code; + + state->state = SFSNEXTROW_INDEXLIST; + state->iBrinIndex = indexSize; + } else { // empty fileset, goto next fileset } - default: - ASSERT(0); - break; } -_err: - /*if (state->pDataFReader) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; - }*/ - if (state->pMergeTree != NULL) { - tMergeTreeClose(state->pMergeTree); - state->pMergeTree = NULL; + if (SFSNEXTROW_INDEXLIST == state->state) { + SBrinBlk *pBrinBlk = NULL; + _next_brinindex: + if (--state->iBrinIndex < 0) { // no index left, goto next fileset + } else { + pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex); + } + + code = tsdbDataFileReadBrinBlock(state->pFileReader, pBrinBlk, &state->brinBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + state->iBrinRecord = BRIN_BLOCK_SIZE(&state->brinBlock) - 1; + state->state = SFSNEXTROW_BRINBLOCK; } - *ppRow = NULL; + if (SFSNEXTROW_BRINBLOCK == state->state) { + _next_brinrecord: + if (state->iBrinRecord < 0) { // empty brin block, goto _next_brinindex + tBrinBlockClear(&state->brinBlock); + goto _next_brinindex; + } + code = tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } - return code; -} + SBrinRecord *pRecord = &state->brinRecord; + if (pRecord->uid != state->uid) { + // TODO: goto next brin block early + --state->iBrinRecord; + goto _next_brinrecord; + } -int32_t clearNextRowFromFSLast(void *iter) { - SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; - int32_t code = 0; + state->state = SFSNEXTROW_BRINRECORD; + } - if (!state) { - return code; + if (SFSNEXTROW_BRINRECORD == state->state) { + SBrinRecord *pRecord = &state->brinRecord; + + if (!state->pBlockData) { + state->pBlockData = &state->blockData; + code = tBlockDataCreate(&state->blockData); + if (code) goto _err; + } else { + tBlockDataReset(state->pBlockData); + } + + if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { + --nCols; + ++aCols; + } + code = tsdbDataFileReadBlockDataByColumn(state->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols, + nCols); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + state->nRow = state->blockData.nRow; + state->iRow = state->nRow - 1; + + state->state = SFSNEXTROW_BLOCKROW; } - /* - if (state->pDataFReader) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; + + if (SFSNEXTROW_BLOCKROW == state->state) { + if (state->iRow < 0) { + --state->iBrinRecord; + goto _next_brinrecord; + } + + state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow); + if (!state->pLastIter) { + *ppRow = &state->row; + --state->iRow; + return code; + } + + if (!state->pLastRow) { + // get next row from fslast and process with fs row, --state->Row if select fs row + code = lastIterNext(&state->lastIter, &state->pLastRow, state->pTsdb, state->pTSchema, state->suid, state->uid, + state->pr, state->lastTs, state->pFileSet, aCols, nCols); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + } + + if (!state->pLastRow) { + lastIterClose(&state->pLastIter); + + *ppRow = &state->row; + --state->iRow; + return code; + } + + // process state->pLastRow & state->row + TSKEY rowTs = TSDBROW_TS(&state->row); + TSKEY lastRowTs = TSDBROW_TS(state->pLastRow); + if (lastRowTs > rowTs) { + *ppRow = state->pLastRow; + state->pLastRow = NULL; + return code; + } else if (lastRowTs < rowTs) { + *ppRow = &state->row; + --state->iRow; + return code; + } else { + // TODO: merge rows and *ppRow = mergedRow + } } - */ - if (state->pMergeTree != NULL) { - tMergeTreeClose(state->pMergeTree); - state->pMergeTree = NULL; + +_err: + // TODO: cleanup when error occurs + if (state->pLastIter) { + lastIterClose(&state->pLastIter); } - return code; -} + if (state->pBlockData) { + tBlockDataDestroy(state->pBlockData); + } -typedef enum SFSNEXTROWSTATES { - SFSNEXTROW_FS, - SFSNEXTROW_FILESET, - SFSNEXTROW_BLOCKDATA, - SFSNEXTROW_BLOCKROW -} SFSNEXTROWSTATES; + *ppRow = NULL; -typedef struct SFSNextRowIter { - SFSNEXTROWSTATES state; // [input] - STsdb *pTsdb; // [input] - SBlockIdx *pBlockIdxExp; // [input] - STSchema *pTSchema; // [input] - tb_uid_t suid; - tb_uid_t uid; - int32_t nFileSet; - int32_t iFileSet; - SArray *aDFileSet; - SDataFReader **pDataFReader; - SArray *aBlockIdx; - LRUHandle *aBlockIdxHandle; - SBlockIdx *pBlockIdx; - SMapData blockMap; - int32_t nBlock; - int32_t iBlock; - SDataBlk block; - SBlockData blockData; - SBlockData *pBlockData; - int32_t nRow; - int32_t iRow; - TSDBROW row; - SSttBlockLoadInfo *pLoadInfo; - int64_t lastTs; -} SFSNextRowIter; + return code; +} +#if 0 static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, int nCols) { SFSNextRowIter *state = (SFSNextRowIter *)iter; @@ -1917,17 +2044,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie switch (state->state) { case SFSNEXTROW_FS: - // state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; - state->nFileSet = taosArrayGetSize(state->aDFileSet); + state->nFileSet = TARRAY2_SIZE(state->aDFileSet); state->iFileSet = state->nFileSet; state->pBlockData = NULL; case SFSNEXTROW_FILESET: { - SDFileSet *pFileSet = NULL; + STFileSet *pFileSet = NULL; _next_fileset: if (--state->iFileSet >= 0) { - pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); + pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet); } else { // tBlockDataDestroy(&state->blockData, 1); if (state->pBlockData) { @@ -2187,6 +2313,8 @@ _err: return code; } +#endif + int32_t clearNextRowFromFS(void *iter) { int32_t code = 0; @@ -2365,24 +2493,23 @@ typedef struct { _next_row_clear_fn_t nextRowClearFn; } TsdbNextRowState; -typedef struct { +typedef struct CacheNextRowIter { SArray *pSkyline; int64_t iSkyline; - SBlockIdx idx; - SMemNextRowIter memState; - SMemNextRowIter imemState; - SFSLastNextRowIter fsLastState; - SFSNextRowIter fsState; - TSDBROW memRow, imemRow, fsLastRow, fsRow; + SBlockIdx idx; + SMemNextRowIter memState; + SMemNextRowIter imemState; + SFSNextRowIter fsState; + TSDBROW memRow, imemRow, fsLastRow, fsRow; TsdbNextRowState input[4]; STsdb *pTsdb; } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, - SSttBlockLoadInfo *pLoadInfo, SLDataIter *pLDataIter, STsdbReadSnap *pReadSnap, - SDataFReader **pDataFReader, SDataFReader **pDataFReaderLast, int64_t lastTs) { + SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader, + SDataFReader **pDataFReaderLast, int64_t lastTs) { int code = 0; STbData *pMem = NULL; @@ -2398,7 +2525,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->pTsdb = pTsdb; pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); - +#if 0 SDelFile *pDelFile = pReadSnap->fs.pDelFile; if (pDelFile) { SDelFReader *pDelFReader; @@ -2424,20 +2551,20 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs goto _err; } - taosArrayDestroy(pDelIdxArray); + taosArrayDestroy(pDelIdxArray); tsdbDelFReaderClose(&pDelFReader); } else { code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline); if (code) goto _err; } - +#endif pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; - + /* pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.pTsdb = pTsdb; - pIter->fsLastState.aDFileSet = pReadSnap->fs.aDFileSet; + pIter->fsLastState.aDFileSet = pReadSnap->pfSetArray; pIter->fsLastState.pTSchema = pTSchema; pIter->fsLastState.suid = suid; pIter->fsLastState.uid = uid; @@ -2445,22 +2572,24 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsLastState.pDataFReader = pDataFReaderLast; pIter->fsLastState.lastTs = lastTs; pIter->fsLastState.pDataIter = pLDataIter; - + */ + pIter->fsState.pRowIter = pIter; pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.pTsdb = pTsdb; - pIter->fsState.aDFileSet = pReadSnap->fs.aDFileSet; + pIter->fsState.aDFileSet = pReadSnap->pfSetArray; pIter->fsState.pBlockIdxExp = &pIter->idx; pIter->fsState.pTSchema = pTSchema; pIter->fsState.suid = suid; pIter->fsState.uid = uid; - pIter->fsState.pLoadInfo = pLoadInfo; pIter->fsState.pDataFReader = pDataFReader; pIter->fsState.lastTs = lastTs; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; - pIter->input[2] = (TsdbNextRowState){ + /* +pIter->input[2] = (TsdbNextRowState){ &pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast}; + */ pIter->input[3] = (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; @@ -2632,321 +2761,6 @@ static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64 return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema); } -static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) { - STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); - int16_t nLastCol = pTSchema->numOfCols; - int16_t iCol = 0; - int16_t noneCol = 0; - bool setNoneCol = false; - bool hasRow = false; - bool ignoreEarlierTs = false; - SArray *pColArray = NULL; - SColVal *pColVal = &(SColVal){0}; - - int32_t code = initLastColArray(pTSchema, &pColArray); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - TSKEY lastRowTs = TSKEY_MAX; - - CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, - &pr->pDataFReaderLast, pr->lastTs); - - do { - TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, NULL, 0); - - if (!pRow) { - break; - } - - hasRow = true; - - int32_t sversion = TSDBROW_SVERSION(pRow); - if (sversion != -1) { - code = updateTSchema(sversion, pr, uid); - if (TSDB_CODE_SUCCESS != code) { - goto _err; - } - pTSchema = pr->pCurrSchema; - } - int16_t nCol = pTSchema->numOfCols; - - TSKEY rowTs = TSDBROW_TS(pRow); - - if (lastRowTs == TSKEY_MAX) { - lastRowTs = rowTs; - STColumn *pTColumn = &pTSchema->columns[0]; - - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs}); - taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}); - - for (iCol = 1; iCol < nCol; ++iCol) { - if (iCol >= nLastCol) { - break; - } - SLastCol *pCol = taosArrayGet(pColArray, iCol); - if (pCol->colVal.cid != pTSchema->columns[iCol].colId) { - continue; - } - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - - *pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { - pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); - if (pCol->colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - - if (COL_VAL_IS_NONE(pColVal) && !setNoneCol) { - noneCol = iCol; - setNoneCol = true; - } - } - if (!setNoneCol) { - // done, goto return pColArray - break; - } else { - continue; - } - } - - if ((rowTs < lastRowTs)) { - // done, goto return pColArray - break; - } - - // merge into pColArray - setNoneCol = false; - for (iCol = noneCol; iCol < nCol; ++iCol) { - // high version's column value - SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol); - - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); - taosMemoryFree(pLastCol->colVal.value.pData); - - lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - - taosArraySet(pColArray, iCol, &lastCol); - } else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) { - noneCol = iCol; - setNoneCol = true; - } - } - } while (setNoneCol); - - // build the result ts row here - *dup = false; - // if (taosArrayGetSize(pColArray) != nCol) { - //*ppColArray = NULL; - // taosArrayDestroy(pColArray); - //} else { - if (!hasRow) { - if (ignoreEarlierTs) { - taosArrayDestroy(pColArray); - pColArray = NULL; - } else { - taosArrayClear(pColArray); - } - } - *ppColArray = pColArray; - //} - - nextRowIterClose(&iter); - // taosMemoryFreeClear(pTSchema); - return code; - -_err: - nextRowIterClose(&iter); - taosArrayDestroy(pColArray); - // taosMemoryFreeClear(pTSchema); - return code; -} - -static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) { - STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); - int16_t nLastCol = pTSchema->numOfCols; - int16_t noneCol = 0; - bool setNoneCol = false; - bool hasRow = false; - bool ignoreEarlierTs = false; - SArray *pColArray = NULL; - SColVal *pColVal = &(SColVal){0}; - int16_t nCols = nLastCol; - - int32_t code = initLastColArray(pTSchema, &pColArray); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); - if (NULL == aColArray) { - taosArrayDestroy(pColArray); - - return TSDB_CODE_OUT_OF_MEMORY; - } - for (int i = 1; i < pTSchema->numOfCols; ++i) { - taosArrayPush(aColArray, &pTSchema->columns[i].colId); - } - - TSKEY lastRowTs = TSKEY_MAX; - - CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, - &pr->pDataFReaderLast, pr->lastTs); - - do { - TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); - - if (!pRow) { - break; - } - - hasRow = true; - - int32_t sversion = TSDBROW_SVERSION(pRow); - if (sversion != -1) { - code = updateTSchema(sversion, pr, uid); - if (TSDB_CODE_SUCCESS != code) { - goto _err; - } - pTSchema = pr->pCurrSchema; - } - int16_t nCol = pTSchema->numOfCols; - - TSKEY rowTs = TSDBROW_TS(pRow); - - if (lastRowTs == TSKEY_MAX) { - lastRowTs = rowTs; - STColumn *pTColumn = &pTSchema->columns[0]; - - *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs}); - taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}); - - for (int16_t iCol = 1; iCol < nCol; ++iCol) { - if (iCol >= nLastCol) { - break; - } - SLastCol *pCol = taosArrayGet(pColArray, iCol); - if (pCol->colVal.cid != pTSchema->columns[iCol].colId) { - continue; - } - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - - *pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { - pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); - if (pCol->colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - - if (!COL_VAL_IS_VALUE(pColVal)) { - if (!setNoneCol) { - noneCol = iCol; - setNoneCol = true; - } - } else { - int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); - taosArrayRemove(aColArray, aColIndex); - } - } - if (!setNoneCol) { - // done, goto return pColArray - break; - } else { - continue; - } - } - - // merge into pColArray - setNoneCol = false; - for (int16_t iCol = noneCol; iCol < nCol; ++iCol) { - if (iCol >= nLastCol) { - break; - } - // high version's column value - SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); - if (lastColVal->colVal.cid != pTSchema->columns[iCol].colId) { - continue; - } - SColVal *tColVal = &lastColVal->colVal; - - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { - SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { - SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); - taosMemoryFree(pLastCol->colVal.value.pData); - - lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); - if (lastCol.colVal.value.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); - } - - taosArraySet(pColArray, iCol, &lastCol); - int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); - taosArrayRemove(aColArray, aColIndex); - } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { - noneCol = iCol; - setNoneCol = true; - } - } - } while (setNoneCol); - - // if (taosArrayGetSize(pColArray) <= 0) { - //*ppLastArray = NULL; - // taosArrayDestroy(pColArray); - //} else { - if (!hasRow) { - if (ignoreEarlierTs) { - taosArrayDestroy(pColArray); - pColArray = NULL; - } else { - taosArrayClear(pColArray); - } - } - *ppLastArray = pColArray; - //} - - nextRowIterClose(&iter); - taosArrayDestroy(aColArray); - // taosMemoryFreeClear(pTSchema); - return code; - -_err: - nextRowIterClose(&iter); - // taosMemoryFreeClear(pTSchema); - *ppLastArray = NULL; - taosArrayDestroy(pColArray); - taosArrayDestroy(aColArray); - return code; -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds) { STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); @@ -2976,7 +2790,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader, &pr->pDataFReaderLast, pr->lastTs); do { @@ -3146,7 +2960,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader, &pr->pDataFReaderLast, pr->lastTs); do { @@ -3236,92 +3050,6 @@ _err: return code; } -int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { - int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; - - // getTableCacheKeyS(uid, "lr", key, &keyLen); - getTableCacheKey(uid, 0, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (!h) { - STsdb *pTsdb = pr->pVnode->pTsdb; - taosThreadMutexLock(&pTsdb->lruMutex); - - h = taosLRUCacheLookup(pCache, key, keyLen); - if (!h) { - SArray *pArray = NULL; - bool dup = false; // which is always false for now - code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr); - // if table's empty or error or ignore ignore earlier ts, set handle NULL and return - if (code < 0 || pArray == NULL) { - if (!dup && pArray) { - taosArrayDestroy(pArray); - } - - taosThreadMutexUnlock(&pTsdb->lruMutex); - - *handle = NULL; - - return 0; - } - - size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray); - _taos_lru_deleter_t deleter = deleteTableCacheLast; - LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - } - taosThreadMutexUnlock(&pTsdb->lruMutex); - } - - *handle = h; - - return code; -} - -int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { - int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; - - // getTableCacheKeyS(uid, "l", key, &keyLen); - getTableCacheKey(uid, 1, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (!h) { - STsdb *pTsdb = pr->pVnode->pTsdb; - taosThreadMutexLock(&pTsdb->lruMutex); - - h = taosLRUCacheLookup(pCache, key, keyLen); - if (!h) { - SArray *pLastArray = NULL; - code = mergeLast(uid, pTsdb, &pLastArray, pr); - // if table's empty or error or ignore ignore earlier ts, set handle NULL and return - if (code < 0 || pLastArray == NULL) { - taosThreadMutexUnlock(&pTsdb->lruMutex); - - *handle = NULL; - return 0; - } - - size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray); - _taos_lru_deleter_t deleter = deleteTableCacheLast; - LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - } - taosThreadMutexUnlock(&pTsdb->lruMutex); - } - - *handle = h; - - return code; -} - int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 6138b1f7b40184be649a78543d8958fbfe80516f..3491c504d1a69205c90f78d6a56970d6da307d2a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -124,7 +124,10 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf pReader->numOfTables = numOfTables; pReader->lastTs = INT64_MIN; - resetLastBlockLoadInfo(pReader->pLoadInfo); + int64_t blocks; + double elapse; + pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, &blocks, &elapse); + pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); return TSDB_CODE_SUCCESS; } @@ -178,14 +181,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, SVnodeCfg* pCfg = &((SVnode*)pVnode)->config; int32_t numOfStt = pCfg->sttTrigger; - p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt); - if (p->pLoadInfo == NULL) { - tsdbCacherowsReaderClose(p); - return TSDB_CODE_OUT_OF_MEMORY; - } - - p->pDataIter = taosMemoryCalloc(pCfg->sttTrigger, sizeof(SLDataIter)); - if (p->pDataIter == NULL) { + p->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + if (p->pLDataIterArray == NULL) { tsdbCacherowsReaderClose(p); return TSDB_CODE_OUT_OF_MEMORY; } @@ -214,10 +211,11 @@ void* tsdbCacherowsReaderClose(void* pReader) { taosMemoryFree(p->pSchema); } - taosMemoryFree(p->pDataIter); taosMemoryFree(p->pCurrSchema); - destroyLastBlockLoadInfo(p->pLoadInfo); + int64_t loadBlocks = 0; + double elapse = 0; + destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse); taosMemoryFree((void*)p->idstr); taosThreadMutexDestroy(&p->readerMutex); @@ -298,7 +296,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } taosThreadMutexLock(&pr->readerMutex); - code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); + code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _end; } @@ -427,8 +425,12 @@ _end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); - resetLastBlockLoadInfo(pr->pLoadInfo); - tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true); + int64_t loadBlocks = 0; + double elapse = 0; + pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse); + pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + + tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true); taosThreadMutexUnlock(&pr->readerMutex); if (pRes != NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index e740d5b735146c91f4fa4ff6c68d67dc9a6bb8f4..8270581e58e23468fb1f6e782fc1015c27b2e71c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -25,7 +25,7 @@ extern "C" { /* Exposed Handle */ typedef struct STFileSystem STFileSystem; typedef struct STFSBgTask STFSBgTask; -typedef TARRAY2(STFileSet *) TFileSetArray; +// typedef TARRAY2(STFileSet *) TFileSetArray; typedef enum { TSDB_FEDIT_COMMIT = 1, // @@ -107,4 +107,4 @@ struct STFileSystem { } #endif -#endif /*_TSDB_FILE_SYSTEM_H*/ \ No newline at end of file +#endif /*_TSDB_FILE_SYSTEM_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index f753bf34ebc449e76ed6ad39605a1395a3969ad9..e8abaa4e3343930c449d7eaa7d80c8fd4c4121e3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -18,9 +18,9 @@ #include "tsdbDataFileRW.h" #include "tsdbFS2.h" #include "tsdbMerge.h" +#include "tsdbReadUtil.h" #include "tsdbUtil2.h" #include "tsimplehash.h" -#include "tsdbReadUtil.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) @@ -100,7 +100,7 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf i += 1; j += 1; - } else if (pTCol->colId < pSupInfo->colId[j]) { // do nothing + } else if (pTCol->colId < pSupInfo->colId[j]) { // do nothing i += 1; } else { return TSDB_CODE_INVALID_PARA; @@ -3815,7 +3815,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader); + initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); int32_t code = TSDB_CODE_SUCCESS; @@ -3842,7 +3842,6 @@ static void freeSchemaFunc(void* param) { static void clearSharedPtr(STsdbReader* p) { p->status.pTableMap = NULL; p->status.uidList.tableUidList = NULL; - p->status.pfSetArray = NULL; p->info.pSchema = NULL; p->pReadSnap = NULL; p->pSchemaMap = NULL; @@ -3851,7 +3850,8 @@ static void clearSharedPtr(STsdbReader* p) { static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->status.pTableMap = pSrc->status.pTableMap; pDst->status.uidList = pSrc->status.uidList; - pDst->status.pfSetArray = pSrc->status.pfSetArray; + // pDst->status.pfSetArray = pSrc->status.pfSetArray; + pDst->pReadSnap->pfSetArray = pSrc->pReadSnap->pfSetArray; pDst->info.pSchema = pSrc->info.pSchema; pDst->pSchemaMap = pSrc->pSchemaMap; pDst->pReadSnap = pSrc->pReadSnap; @@ -4633,7 +4633,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); - initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader); + initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); resetDataBlockIterator(pBlockIter, pReader->info.order); resetTableListIndex(&pReader->status); @@ -4886,7 +4886,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs } // fs - code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->status.pfSetArray); + code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _exit; @@ -4929,7 +4929,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact if (pSnap->pINode) taosMemoryFree(pSnap->pINode); taosMemoryFree(pSnap); - tsdbFSDestroyRefSnapshot(&pReader->status.pfSetArray); + tsdbFSDestroyRefSnapshot(&pSnap->pfSetArray); } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 9a0b540487434d989e8d1bb13c4da38e7d9ed46d..bb0cf5fbce732e993f70fb4b0307178c03984a90 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -20,8 +20,8 @@ extern "C" { #endif -#include "tsdbUtil2.h" #include "tsdbDataFileRW.h" +#include "tsdbUtil2.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -196,7 +196,6 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data - TFileSetArray* pfSetArray; } SReaderStatus; struct STsdbReader {