From 65b54f9d7355c1dc41649c0902547b2e5b2d5dfb Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 25 Jun 2022 19:29:16 +0800 Subject: [PATCH] tsdbCache: framework for last row merging --- source/dnode/vnode/src/inc/tsdb.h | 3 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 387 +++++++++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbFS.c | 2 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 19 ++ 4 files changed, 373 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 90fdd60188..a013cc50bb 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -126,6 +126,7 @@ int32_t tBlockCmprFn(const void *p1, const void *p2); void tBlockIdxReset(SBlockIdx *pBlockIdx); int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); +int32_t tCmprBlockIdx(void const *lhs, void const *rhs); // SColdata #define tColDataInit() ((SColData){0}) void tColDataReset(SColData *pColData, int16_t cid, int8_t type); @@ -141,9 +142,9 @@ void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); // SDelIdx -int32_t tCmprDelIdx(void const *lhs, void const *rhs); int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); +int32_t tCmprDelIdx(void const *lhs, void const *rhs); // SDelData int32_t tPutDelData(uint8_t *p, void *ph); int32_t tGetDelData(uint8_t *p, void *ph); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 64ed6bb92e..d09926660a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -102,8 +102,8 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { return suid; } /* -static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow **ppRow) { - int32_t code = 0; +static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow +**ppRow) { int32_t code = 0; if (mem) { STbData *pMem = NULL; @@ -218,20 +218,28 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t _err: return code; } - +#if 0 static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline, - STsdb *pTsdb, STSRow **pLastRow) { + STsdb *pTsdb, STSRow **ppLastRow) { int32_t code = 0; TSDBROW *pMemRow = NULL; TSDBROW *pIMemRow = NULL; + TSDBKEY memKey = TSDBKEY_MIN; + TSDBKEY imemKey = TSDBKEY_MIN; if (iter != NULL) { pMemRow = tsdbTbDataIterGet(iter); + if (pMemRow) { + memKey = tsdbRowKey(pMemRow); + } } if (iter != NULL) { pIMemRow = tsdbTbDataIterGet(iiter); + if (pIMemRow) { + imemKey = tsdbRowKey(pIMemRow); + } } SDataFReader *pDataFReader; @@ -243,42 +251,266 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL); if (code) goto _err; - SBlockData *pBlockData; + SBlockIdx blockIdx = {0}; + tBlockIdxReset(&blockIdx); + code = tMapDataSearch(&blockIdxMap, pBlockIdx, tGetBlockIdx, tCmprBlockIdx, &blockIdx); + if (code) goto _err; + + SMapData blockMap = {0}; + tMapDataReset(&blockMap); + code = tsdbReadBlock(pDataFReader, &blockIdx, &blockMap, NULL); + if (code) goto _err; + + int32_t nBlock = blockMap.nItem; + for (int32_t iBlock = nBlock - 1; iBlock >= 0; --iBlock) { + SBlock block = {0}; + SBlockData blockData = {0}; + + tBlockReset(&block); + tBlockDataReset(&blockData); + + tMapDataGetItemByIdx(&blockMap, iBlock, &block, tGetBlock); + + code = tsdbReadBlockData(pDataFReader, &blockIdx, &block, &blockData, NULL, 0, NULL, NULL); + if (code) goto _err; + + int32_t nRow = blockData.nRow; + for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) { + TSDBROW row = tsdbRowFromBlockData(&blockData, iRow); + + TSDBKEY key = tsdbRowKey(&row); + if (pMemRow != NULL && pIMemRow != NULL) { + int32_t c = tsdbKeyCmprFn(memKey, imemKey); + if (c < 0) { + } else if (c > 0) { + } else { + } + } else if (pMemRow != NULL) { + pMemRow = tsdbTbDataIterGet(iter); + + } else if (pIMemRow != NULL) { + } else { + if (!tsdbKeyDeleted(key, pSkyline)) { + *ppLastRow = buildTsrowFromTsdbrow(&row); + goto _done; + } else { + continue; + } + } + // select current row if outside delete area + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + } + } +_done: tsdbDataFReaderClose(&pDataFReader); + return code; + _err: return code; } +#endif +typedef enum SFSNEXTROWSTATES { + SFSNEXTROW_FS, + SFSNEXTROW_FILESET, + SFSNEXTROW_BLOCKDATA, + SFSNEXTROW_BLOCKROW +} SFSNEXTROWSTATES; + +typedef struct SFSNextRowIter { + SFSNEXTROWSTATES state; // [input] + STsdb *pTsdb; // [input] + SBlockIdx *pBlockIdxExp; // [input] + int32_t nFileSet; + int32_t iFileSet; + SArray *aDFileSet; + SDataFReader *pDataFReader; + SMapData blockIdxMap; + SBlockIdx blockIdx; + SMapData blockMap; + int32_t nBlock; + int32_t iBlock; + SBlock block; + SBlockData blockData; + int32_t nRow; + int32_t iRow; + TSDBROW row; +} SFSNextRowIter; + +static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { + SFSNextRowIter *state = (SFSNextRowIter *)iter; + int32_t code = 0; + + switch (state->state) { + case SFSNEXTROW_FS: + state->aDFileSet = state->pTsdb->fs->cState->aDFileSet; + state->nFileSet = taosArrayGetSize(state->aDFileSet); + state->iFileSet = state->nFileSet; + + case SFSNEXTROW_FILESET: { + SDFileSet *pFileSet = NULL; + if (--state->iFileSet >= 0) { + pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); + } else { + *ppRow = NULL; + return code; + } + + code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); + if (code) goto _err; + + tMapDataReset(&state->blockIdxMap); + code = tsdbReadBlockIdx(state->pDataFReader, &state->blockIdxMap, NULL); + if (code) goto _err; + + tBlockIdxReset(&state->blockIdx); + code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, &state->blockIdx); + if (code) goto _err; + + tMapDataReset(&state->blockMap); + code = tsdbReadBlock(state->pDataFReader, &state->blockIdx, &state->blockMap, NULL); + if (code) goto _err; + + state->nBlock = state->blockMap.nItem; + state->iBlock = state->nBlock - 1; + } + case SFSNEXTROW_BLOCKDATA: + if (state->iBlock >= 0) { + SBlock block = {0}; + + tBlockReset(&block); + tBlockDataReset(&state->blockData); + + tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock); + code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, 0, NULL, NULL); + if (code) goto _err; + + state->nRow = state->blockData.nRow; + state->iRow = state->nRow - 1; + + state->state = SFSNEXTROW_BLOCKROW; + } + case SFSNEXTROW_BLOCKROW: + if (state->iRow >= 0) { + state->row = tsdbRowFromBlockData(&state->blockData, state->iRow); + *ppRow = &state->row; + + if (--state->iRow < 0) { + state->state = SFSNEXTROW_BLOCKDATA; + if (--state->iBlock < 0) { + tsdbDataFReaderClose(&state->pDataFReader); + + state->state = SFSNEXTROW_FILESET; + } + } + } + + return code; + default: + ASSERT(0); + break; + } + +_err: + *ppRow = NULL; + return code; +} + +typedef enum SMEMNEXTROWSTATES { + SMEMNEXTROW_ENTER, + SMEMNEXTROW_NEXT, +} SMEMNEXTROWSTATES; + +typedef struct SMemNextRowIter { + SMEMNEXTROWSTATES state; + STbData *pMem; // [input] + STbDataIter iter; // mem buffer skip list iterator +} SMemNextRowIter; + +static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) { + SMemNextRowIter *state = (SMemNextRowIter *)iter; + int32_t code = 0; + + switch (state->state) { + case SMEMNEXTROW_ENTER: { + if (state->pMem != NULL) { + tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); + + TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); + if (pMemRow) { + *ppRow = pMemRow; + state->state = SMEMNEXTROW_NEXT; + + return code; + } + } + + *ppRow = NULL; + + return code; + } + case SMEMNEXTROW_NEXT: + if (tsdbTbDataIterNext(&state->iter)) { + *ppRow = tsdbTbDataIterGet(&state->iter); + + return code; + } else { + *ppRow = NULL; + + return code; + } + default: + ASSERT(0); + break; + } + +_err: + *ppRow = NULL; + return code; +} + +static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) { + // TODO: new tsrow from tsdbrow + STSRow *ret = NULL; + if (pRow->type == 0) { + return pRow->pTSRow; + } else { + } + + return ret; +} + +typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow); + +typedef struct TsdbNextRowState { + TSDBROW *pRow; + bool stop; + bool next; + void *iter; + _next_row_fn_t nextRowFn; +} TsdbNextRowState; static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; tb_uid_t suid = getTableSuidByUid(uid, pTsdb); - STbData *pMem = NULL; - STbData *pIMem = NULL; - STbDataIter iter; // mem buffer skip list iterator - STbDataIter iiter; // imem buffer skip list iterator - + STbData *pMem = NULL; if (pTsdb->mem) { tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); - if (pMem != NULL) { - tsdbTbDataIterOpen(pMem, NULL, 1, &iter); - } } + STbData *pIMem = NULL; if (pTsdb->imem) { tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); - if (pIMem != NULL) { - tsdbTbDataIterOpen(pIMem, NULL, 1, &iiter); - } } *ppRow = NULL; SDelFReader *pDelFReader; - // code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; SDelIdx delIdx; @@ -288,29 +520,111 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); if (code) goto _err; - /* - SFSIter fsiter; - bool fsHasNext = false; - tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter); - do { - */ - SDFileSet *pFileSet = NULL; - // pFileSet = tsdbFSIterGet(fsiter); + int iSkyline = taosArrayGetSize(pSkyline) - 1; - code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow); - if (code < 0) { - goto _err; - } + tsdbDelFReaderClose(pDelFReader); + + SBlockIdx idx = {.suid = suid, .uid = uid}; + + SFSNextRowIter fsState = {0}; + fsState.state = SFSNEXTROW_FS; + fsState.pTsdb = pTsdb; + fsState.pBlockIdxExp = &idx; - if (*ppRow != NULL) { - // break; + SMemNextRowIter memState = {0}; + SMemNextRowIter imemState = {0}; + TSDBROW memRow, imemRow, fsRow; + + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, + {&imemRow, true, false, &imemState, getNextRowFromMem}, + {&fsRow, false, true, &fsState, getNextRowFromFS}}; + + if (pMem) { + memState.pMem = pMem; + memState.state = SMEMNEXTROW_ENTER; + input[0].stop = false; + input[0].next = true; + } + if (pIMem) { + imemState.pMem = pIMem; + imemState.state = SMEMNEXTROW_ENTER; + input[1].stop = false; + input[1].next = true; } - /* - } while (fsHasNext = tsdbFSIterNext(fsiter)) - */ - tsdbDelFReaderClose(pDelFReader); + do { + for (int i = 0; i < 3; ++i) { + if (input[i].next && !input[i].stop) { + code = input[i].nextRowFn(input[i].iter, &input[i].pRow); + if (code) goto _err; + + if (input[i].pRow == NULL) { + input[i].stop = true; + input[i].next = false; + } + } + } + + if (input[0].stop && input[1].stop && input[2].stop) { + break; + } + + // select maxpoint(s) from mem, imem, fs + TSDBROW *max[3] = {0}; + int iMax[3] = {-1, -1, -1}; + int nMax = 0; + for (int i = 0; i < 3; ++i) { + if (input[i].pRow != NULL) { + TSDBKEY key = TSDBROW_KEY(input[i].pRow); + TSDBKEY maxKey = TSDBROW_KEY(max[nMax]); + + // merging & deduplicating on client side + if (maxKey.ts <= key.ts) { + if (maxKey.ts < key.ts) { + nMax = 0; + } + + iMax[nMax] = i; + max[nMax++] = input[i].pRow; + } + } + } + + // delete detection + TSDBROW *merge[3] = {0}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey = TSDBROW_KEY(max[i]); + + bool deleted = false; + // bool deleted = tsdbKeyDeleted(maxKey, pSkyline, &iSkyline); + if (!deleted) { + merge[nMerge++] = max[i]; + } else { + input[iMax[i]].next = true; + } + } + + // merge if nMerge > 1 + if (nMerge > 0) { + if (nMerge == 1) { + *ppRow = tsRowFromTsdbRow(merge[nMerge]); + } else { + // merge 2 or 3 rows + SRowMerger merger = {0}; + + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + + tRowMergerInit(&merger, merge[0], pTSchema); + for (int i = 1; i < nMerge; ++i) { + tRowMerge(&merger, merge[i]); + } + tRowMergerGetRow(&merger, ppRow); + tRowMergerClear(&merger); + } + } + } while (*ppRow == NULL); return code; @@ -349,7 +663,8 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { taosLRUCacheRelease(pCache, h, true); - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t + // keyLen); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 326d75a83a..27c2f3fa7a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -627,4 +627,4 @@ SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) { return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 73df7ae80f..fbba287bef 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -321,6 +321,25 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { return n; } +int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { + SBlockIdx *lBlockIdx = *(SBlockIdx **)lhs; + SBlockIdx *rBlockIdx = *(SBlockIdx **)rhs; + + if (lBlockIdx->suid < lBlockIdx->suid) { + return -1; + } else if (lBlockIdx->suid > lBlockIdx->suid) { + return 1; + } + + if (lBlockIdx->uid < lBlockIdx->uid) { + return -1; + } else if (lBlockIdx->uid > lBlockIdx->uid) { + return 1; + } + + return 0; +} + // SBlock ====================================================== void tBlockReset(SBlock *pBlock) { pBlock->minKey = TSDBKEY_MAX; -- GitLab