diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 094f18b7465a6827b9c894b37926389712ce2e77..461504facfd90f34d4c57e38579ac49cfb5b30d1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -327,15 +327,16 @@ typedef struct SFSNextRowIter { SArray *aBlockIdx; // SMapData blockIdxMap; // SBlockIdx blockIdx; - SBlockIdx *pBlockIdx; - SMapData blockMap; - int32_t nBlock; - int32_t iBlock; - SBlock block; - SBlockData blockData; - int32_t nRow; - int32_t iRow; - TSDBROW row; + SBlockIdx *pBlockIdx; + SMapData blockMap; + int32_t nBlock; + int32_t iBlock; + SBlock block; + SBlockData blockData; + SBlockData *pBlockData; + int32_t nRow; + int32_t iRow; + TSDBROW row; } SFSNextRowIter; static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { @@ -346,14 +347,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { case SFSNEXTROW_FS: state->aDFileSet = state->pTsdb->fs->cState->aDFileSet; state->nFileSet = taosArrayGetSize(state->aDFileSet); - state->iFileSet = state->nFileSet; + state->iFileSet = state->nFileSet - 1; + + state->pBlockData = NULL; case SFSNEXTROW_FILESET: { SDFileSet *pFileSet = NULL; if (--state->iFileSet >= 0) { pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); } else { - tBlockDataClear(&state->blockData); + // tBlockDataClear(&state->blockData); + if (state->pBlockData) { + tBlockDataClear(state->pBlockData); + state->pBlockData = NULL; + } *ppRow = NULL; return code; @@ -390,18 +397,23 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { state->nBlock = state->blockMap.nItem; state->iBlock = state->nBlock - 1; - tBlockDataInit(&state->blockData); + if (!state->pBlockData) { + state->pBlockData = &state->blockData; + + tBlockDataInit(&state->blockData); + } } case SFSNEXTROW_BLOCKDATA: if (state->iBlock >= 0) { SBlock block = {0}; tBlockReset(&block); - tBlockDataReset(&state->blockData); + // tBlockDataReset(&state->blockData); + tBlockDataReset(state->pBlockData); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock); /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ - code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, &state->blockData, NULL, NULL); + code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, state->pBlockData, NULL, NULL); if (code) goto _err; state->nRow = state->blockData.nRow; @@ -411,15 +423,18 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } case SFSNEXTROW_BLOCKROW: if (state->iRow >= 0) { - state->row = tsdbRowFromBlockData(&state->blockData, state->iRow); + state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow); *ppRow = &state->row; if (--state->iRow < 0) { state->state = SFSNEXTROW_BLOCKDATA; if (--state->iBlock < 0) { tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; + if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); + state->aBlockIdx = NULL; } state->state = SFSNEXTROW_FILESET; @@ -436,17 +451,48 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { _err: if (state->pDataFReader) { tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; } if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); + state->aBlockIdx = NULL; + } + if (state->pBlockData) { + // tBlockDataClear(&state->blockData); + tBlockDataClear(state->pBlockData); + state->pBlockData = NULL; } - tBlockDataClear(&state->blockData); *ppRow = NULL; return code; } +int32_t clearNextRowFromFS(void *iter) { + int32_t code = 0; + + SFSNextRowIter *state = (SFSNextRowIter *)iter; + if (!state) { + return code; + } + + if (state->pDataFReader) { + tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; + } + if (state->aBlockIdx) { + taosArrayDestroy(state->aBlockIdx); + state->aBlockIdx = NULL; + } + if (state->pBlockData) { + // tBlockDataClear(&state->blockData); + tBlockDataClear(state->pBlockData); + state->pBlockData = NULL; + } + + return code; +} + typedef enum SMEMNEXTROWSTATES { SMEMNEXTROW_ENTER, SMEMNEXTROW_NEXT, @@ -566,17 +612,20 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) { } typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow); +typedef int32_t (*_next_row_clear_fn_t)(void *iter); typedef struct TsdbNextRowState { - TSDBROW *pRow; - bool stop; - bool next; - void *iter; - _next_row_fn_t nextRowFn; + TSDBROW *pRow; + bool stop; + bool next; + void *iter; + _next_row_fn_t nextRowFn; + _next_row_clear_fn_t nextRowClearFn; } TsdbNextRowState; static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) { int32_t code = 0; + SArray *pSkyline = NULL; STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); int16_t nCol = pTSchema->numOfCols; @@ -596,7 +645,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo *ppRow = NULL; - SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); SDelIdx delIdx; @@ -632,9 +681,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo SMemNextRowIter imemState = {0}; TSDBROW memRow, imemRow, fsRow; - TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, - {&imemRow, true, false, &imemState, getNextRowFromMem}, - {&fsRow, false, true, &fsState, getNextRowFromFS}}; + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL}, + {&imemRow, true, false, &imemState, getNextRowFromMem, NULL}, + {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}}; if (pMem) { memState.pMem = pMem; @@ -732,10 +781,21 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo } while (*ppRow == NULL); + for (int i = 0; i < 3; ++i) { + if (input[i].nextRowClearFn) { + input[i].nextRowClearFn(input[i].iter); + } + } + if (pSkyline) { + taosArrayDestroy(pSkyline); + } taosMemoryFreeClear(pTSchema); return code; _err: + if (pSkyline) { + taosArrayDestroy(pSkyline); + } taosMemoryFreeClear(pTSchema); tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; @@ -798,9 +858,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { SMemNextRowIter imemState = {0}; TSDBROW memRow, imemRow, fsRow; - TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, - {&imemRow, true, false, &imemState, getNextRowFromMem}, - {&fsRow, false, true, &fsState, getNextRowFromFS}}; + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL}, + {&imemRow, true, false, &imemState, getNextRowFromMem, NULL}, + {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}}; if (pMem) { memState.pMem = pMem; @@ -1008,7 +1068,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH code = mergeLastRow(uid, pTsdb, &dup, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { - if (!dup) { + if (!dup && pRow) { taosMemoryFree(pRow); }