From 7de0fc7dae79f18ca58037054f913d9f217ba3f8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 26 Aug 2022 19:07:51 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 91 ++++++++++++++++++------ 1 file changed, 70 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index b8af4d9d28..d3b82db3e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -32,7 +32,6 @@ typedef struct { int8_t type; union { struct { - SArray *aTbDataP; int32_t iTbDataP; STbDataIter iter; }; // memory data iter @@ -75,6 +74,7 @@ typedef struct { struct { SDataIter *pIter; SRBTree rbt; + SDataIter aDataIter[TSDB_MAX_LAST_FILE + 1]; }; struct { SDataFWriter *pWriter; @@ -102,6 +102,26 @@ static int32_t tsdbCommitData(SCommitter *pCommitter); static int32_t tsdbCommitDel(SCommitter *pCommitter); static int32_t tsdbCommitCache(SCommitter *pCommitter); static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); +static int32_t tsdbNextCommitRow(SCommitter *pCommitter); + +static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { + SRowInfo *pInfo1 = (SRowInfo *)p1; + SRowInfo *pInfo2 = (SRowInfo *)p2; + + if (pInfo1->suid < pInfo2->suid) { + return -1; + } else if (pInfo1->suid > pInfo2->suid) { + return 1; + } + + if (pInfo1->uid < pInfo2->uid) { + return -1; + } else if (pInfo1->uid > pInfo2->uid) { + return 1; + } + + return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); +} int32_t tsdbBegin(STsdb *pTsdb) { int32_t code = 0; @@ -376,6 +396,49 @@ _exit: return code; } +static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { + int32_t code = 0; + + tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); + pCommitter->pIter = NULL; + + int8_t iIter = 0; + // memory + SDataIter *pIter = &pCommitter->aDataIter[iIter]; + pIter->type = 0; + pIter->iTbDataP = 0; + for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) { + STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); + TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; + tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter); + TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); + + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); + continue; + } + + pIter->r.suid = pTbData->suid; + pIter->r.uid = pTbData->uid; + pIter->r.row = *pRow; + break; + } + + tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); + + // disk + if (0) { + } + + code = tsdbNextCommitRow(pCommitter); + if (code) goto _err; + + return code; + +_err: + return code; +} + static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -464,6 +527,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { tBlockDataReset(&pCommitter->dWriter.bData); tBlockDataReset(&pCommitter->dWriter.bDatal); + // open iter + code = tsdbOpenCommitIter(pCommitter); + if (code) goto _err; + _exit: return code; @@ -1325,24 +1392,6 @@ _err: } // ================================================================================ -static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { - SRowInfo *pInfo1 = (SRowInfo *)p1; - SRowInfo *pInfo2 = (SRowInfo *)p2; - - if (pInfo1->suid < pInfo2->suid) { - return -1; - } else if (pInfo1->suid > pInfo2->suid) { - return 1; - } - - if (pInfo1->uid < pInfo2->uid) { - return -1; - } else if (pInfo1->uid > pInfo2->uid) { - return 1; - } - - return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); -} static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL; @@ -1370,8 +1419,8 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { } pIter->iTbDataP++; - if (pIter->iTbDataP < taosArrayGetSize(pIter->aTbDataP)) { - STbData *pTbData = (STbData *)taosArrayGetP(pIter->aTbDataP, pIter->iTbDataP); + if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) { + STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN}; tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter); pRow = tsdbTbDataIterGet(&pIter->iter); -- GitLab