diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 102b714c1eeae0996b3bc3b518feb95fd0462144..6263703c011d596040572b04d29d1a331ed3e18b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -123,6 +123,7 @@ int32_t tGetBlockL(uint8_t *p, void *ph); int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tCmprBlockIdx(void const *lhs, void const *rhs); +int32_t tCmprBlockL(void const *lhs, void const *rhs); // SColdata void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn); void tColDataReset(SColData *pColData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 24d6b2f385b65f06cfc6676a84ede09235ca2692..b515b028b3aa8d817943677eec1af11bad855a67 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -279,6 +279,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb } } + _invalidate: taosMemoryFreeClear(pTSchema); taosLRUCacheRelease(pCache, h, invalidate); @@ -404,6 +405,178 @@ _err: return code; } +typedef enum { + SFSLASTNEXTROW_FS, + SFSLASTNEXTROW_FILESET, + SFSLASTNEXTROW_BLOCKDATA, + SFSLASTNEXTROW_BLOCKROW +} SFSLASTNEXTROWSTATES; + +typedef struct { + SFSLASTNEXTROWSTATES state; // [input] + STsdb *pTsdb; // [input] + SBlockIdx *pBlockIdxExp; // [input] + STSchema *pTSchema; // [input] + int32_t nFileSet; + int32_t iFileSet; + SArray *aDFileSet; + SDataFReader *pDataFReader; + SArray *aBlockL; + SBlockL *pBlockL; + SBlockData *pBlockDataL; + SBlockData blockDataL; + int32_t nRow; + int32_t iRow; + TSDBROW row; + /* + SArray *aBlockIdx; + SBlockIdx *pBlockIdx; + SMapData blockMap; + int32_t nBlock; + int32_t iBlock; + SBlock block; + */ +} SFSLastNextRowIter; + +static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { + SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; + int32_t code = 0; + + switch (state->state) { + case SFSLASTNEXTROW_FS: + // state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; + state->nFileSet = taosArrayGetSize(state->aDFileSet); + state->iFileSet = state->nFileSet; + + state->pBlockDataL = NULL; + + case SFSLASTNEXTROW_FILESET: { + SDFileSet *pFileSet = NULL; + _next_fileset: + if (--state->iFileSet >= 0) { + pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); + } else { + if (state->pBlockDataL) { + tBlockDataDestroy(state->pBlockDataL, 1); + state->pBlockDataL = NULL; + } + + *ppRow = NULL; + return code; + } + + code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); + if (code) goto _err; + + if (!state->aBlockL) { + state->aBlockL = taosArrayInit(0, sizeof(SBlockIdx)); + } else { + taosArrayClear(state->aBlockL); + } + + code = tsdbReadBlockL(state->pDataFReader, state->aBlockL); + if (code) goto _err; + + // SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL); + + state->pBlockL = taosArraySearch(state->aBlockL, state->pBlockIdxExp, tCmprBlockL, TD_EQ); + if (!state->pBlockL) { + goto _next_fileset; + } + + int64_t suid = state->pBlockL->suid; + int64_t uid = state->pBlockL->maxUid; + + if (!state->pBlockDataL) { + state->pBlockDataL = &state->blockDataL; + } + code = tBlockDataInit(state->pBlockDataL, suid, suid ? 0 : uid, state->pTSchema); + if (code) goto _err; + } + case SFSLASTNEXTROW_BLOCKDATA: + code = tsdbReadLastBlock(state->pDataFReader, state->pBlockL, state->pBlockDataL); + if (code) goto _err; + + state->nRow = state->blockDataL.nRow; + state->iRow = state->nRow - 1; + + if (!state->pBlockDataL->uid) { + while (state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) { + --state->iRow; + } + } + + state->state = SFSLASTNEXTROW_BLOCKROW; + case SFSLASTNEXTROW_BLOCKROW: + if (state->pBlockDataL->uid) { + if (state->iRow >= 0) { + state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow); + *ppRow = &state->row; + + if (--state->iRow < 0) { + state->state = SFSLASTNEXTROW_FILESET; + } + } + } else { + if (state->iRow >= 0 && state->pBlockIdxExp->uid == state->pBlockDataL->aUid[state->iRow]) { + state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow); + *ppRow = &state->row; + + if (--state->iRow < 0 || state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) { + state->state = SFSLASTNEXTROW_FILESET; + } + } + } + + return code; + default: + ASSERT(0); + break; + } + +_err: + if (state->pDataFReader) { + tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; + } + if (state->aBlockL) { + taosArrayDestroy(state->aBlockL); + state->aBlockL = NULL; + } + if (state->pBlockDataL) { + tBlockDataDestroy(state->pBlockDataL, 1); + state->pBlockDataL = NULL; + } + + *ppRow = NULL; + + return code; +} + +int32_t clearNextRowFromFSLast(void *iter) { + SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; + int32_t code = 0; + + if (!state) { + return code; + } + + if (state->pDataFReader) { + tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; + } + if (state->aBlockL) { + taosArrayDestroy(state->aBlockL); + state->aBlockL = NULL; + } + if (state->pBlockDataL) { + tBlockDataDestroy(state->pBlockDataL, 1); + state->pBlockDataL = NULL; + } + + return code; +} + typedef enum SFSNEXTROWSTATES { SFSNEXTROW_FS, SFSNEXTROW_FILESET, @@ -722,18 +895,19 @@ typedef struct { SArray *pSkyline; int64_t iSkyline; - SBlockIdx idx; - SMemNextRowIter memState; - SMemNextRowIter imemState; - SFSNextRowIter fsState; - TSDBROW memRow, imemRow, fsRow; + SBlockIdx idx; + SMemNextRowIter memState; + SMemNextRowIter imemState; + SFSLastNextRowIter fsLastState; + SFSNextRowIter fsState; + TSDBROW memRow, imemRow, fsLastRow, fsRow; - TsdbNextRowState input[3]; + TsdbNextRowState input[4]; STsdbReadSnap *pReadSnap; STsdb *pTsdb; } CacheNextRowIter; -static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) { +static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) { int code = 0; tb_uid_t suid = getTableSuidByUid(uid, pTsdb); @@ -779,6 +953,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; + pIter->fsLastState.state = SFSNEXTROW_FS; + pIter->fsLastState.pTsdb = pTsdb; + pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; + pIter->fsLastState.pBlockIdxExp = &pIter->idx; + pIter->fsLastState.pTSchema = pTSchema; + pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.pTsdb = pTsdb; pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; @@ -786,7 +966,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; - pIter->input[2] = + pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow, false, true, &pIter->fsLastState, getNextRowFromFSLast, + clearNextRowFromFSLast}; + pIter->input[3] = (TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; if (pMem) { @@ -811,7 +993,7 @@ _err: static int32_t nextRowIterClose(CacheNextRowIter *pIter) { int code = 0; - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 4; ++i) { if (pIter->input[i].nextRowClearFn) { pIter->input[i].nextRowClearFn(pIter->input[i].iter); } @@ -823,7 +1005,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); - return code; _err: return code; } @@ -832,7 +1013,7 @@ _err: static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { int code = 0; - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 4; ++i) { if (pIter->input[i].next && !pIter->input[i].stop) { code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow); if (code) goto _err; @@ -844,18 +1025,18 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { } } - if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) { + if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) { *ppRow = NULL; return code; } - // select maxpoint(s) from mem, imem, fs - TSDBROW *max[3] = {0}; - int iMax[3] = {-1, -1, -1}; + // select maxpoint(s) from mem, imem, fs and last + TSDBROW *max[4] = {0}; + int iMax[4] = {-1, -1, -1, -1}; int nMax = 0; TSKEY maxKey = TSKEY_MIN; - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 4; ++i) { if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) { TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow); @@ -873,8 +1054,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { } // delete detection - TSDBROW *merge[3] = {0}; - int iMerge[3] = {-1, -1, -1}; + TSDBROW *merge[4] = {0}; + int iMerge[4] = {-1, -1, -1, -1}; int nMerge = 0; for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey = TSDBROW_KEY(max[i]); @@ -915,7 +1096,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb); + nextRowIterOpen(&iter, uid, pTsdb, pTSchema); do { TSDBROW *pRow = NULL; @@ -1012,7 +1193,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb); + nextRowIterOpen(&iter, uid, pTsdb, pTSchema); do { TSDBROW *pRow = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e890c624ec151be88371f9448459dfd573b28e09..9fcfd9c8bd0e5213347200335a066913ec381b31 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -196,6 +196,25 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { return 0; } +int32_t tCmprBlockL(void const *lhs, void const *rhs) { + SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; + SBlockL *rBlockL = (SBlockL *)rhs; + + if (lBlockIdx->suid < rBlockL->suid) { + return -1; + } else if (lBlockIdx->suid > rBlockL->suid) { + return 1; + } + + if (lBlockIdx->uid < rBlockL->minUid) { + return -1; + } else if (lBlockIdx->uid > rBlockL->maxUid) { + return 1; + } + + return 0; +} + // SBlock ====================================================== void tBlockReset(SBlock *pBlock) { *pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; @@ -1944,4 +1963,4 @@ int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t _exit: return code; -} \ No newline at end of file +}