From 71c967d6030a6bc62cb4c47f81bddf53114ebeef Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 23 Mar 2023 13:15:14 +0800 Subject: [PATCH] more code --- source/dnode/vnode/CMakeLists.txt | 4 + source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c | 140 +++++++++--------- .../vnode/src/tsdb/dev/tsdbReaderWriter2.c | 28 ++-- 3 files changed, 85 insertions(+), 87 deletions(-) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8dc3f46ae3..3fbb705a4e 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -54,6 +54,10 @@ target_sources( "src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbMergeTree.c" "src/tsdb/tsdbDataIter.c" + # # dev + "src/tsdb/dev/tsdbCommit2.c" + "src/tsdb/dev/tsdbMerge.c" + "src/tsdb/dev/tsdbReaderWriter2.c" # tq "src/tq/tq.c" diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c index 04a29fe76e..3c6c5450a6 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c @@ -15,6 +15,13 @@ #include "tsdb.h" +// extern dependencies +typedef struct SSttFWriter SSttFWriter; + +extern int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter); +extern int32_t tsdbSttFWriterClose(SSttFWriter *pWritter); +extern int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow); + typedef struct { STsdb *pTsdb; // config @@ -26,65 +33,81 @@ typedef struct { int8_t sttTrigger; SArray *aTbDataP; // context - TSKEY nextKey; - int32_t fid; - int32_t expLevel; - TSKEY minKey; - TSKEY maxKey; - int64_t cid; // commit id - SSkmInfo skmTable; - SSkmInfo skmRow; - SBlockData bData; - SColData aColData[4]; // - SArray *aSttBlk; // SArray - SArray *aDelBlk; // SArray + TSKEY nextKey; + int32_t fid; + int32_t expLevel; + TSKEY minKey; + TSKEY maxKey; + // writer + SSttFWriter *pWriter; } SCommitter; -static int32_t tsdbRowIsDeleted(SCommitter *pCommitter, TSDBROW *pRow) { +static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) { + int32_t code = 0; // TODO - ASSERT(0); + return code; +} + +static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64_t uid, TSDBROW *pRow) { + int32_t code = 0; + int32_t lino; + + if (pCommitter->pWriter == NULL) { + code = tsdbCommitOpenWriter(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbSttFWriteRow(pCommitter->pWriter, suid, uid, pRow); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + } else { + tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, + TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, suid, uid, TSDBROW_KEY(pRow).ts, + TSDBROW_KEY(pRow).version); + } return 0; } +static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey, + int64_t eKey) { + int32_t code = 0; + // TODO + return code; +} + static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) { int32_t code = 0; int32_t lino; + int64_t nRow = 0; SMemTable *pMem = pCommitter->pTsdb->imem; - if (pMem->nRow == 0) goto _exit; + if (pMem->nRow == 0) { // no time-series data to commit + goto _exit; + } + TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN}; for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); - - // TODO: prepare commit next table - STbDataIter iter; - TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN}; + STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); + tsdbTbDataIterOpen(pTbData, &from, 0, &iter); for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) { TSDBKEY rowKey = TSDBROW_KEY(pRow); if (rowKey.ts > pCommitter->maxKey) { - pCommitter->nextKey = TMIN(rowKey.ts, pCommitter->nextKey); + pCommitter->nextKey = TMIN(pCommitter->nextKey, rowKey.ts); break; } - if (pRow->type == TSDBROW_ROW_FMT) { - // code = tsdbUpdateSkmInfo(&pCommitter->skmRow, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); - TSDB_CHECK_CODE(code, lino, _exit); - } + nRow++; - code = tBlockDataAppendRow(&pCommitter->bData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); + code = tsdbCommitWriteTSData(pCommitter, pTbData->suid, pTbData->uid, pRow); TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->bData.nRow >= pCommitter->maxRow) { - // code = tsdbWriteSttBlock(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataClear(&pCommitter->bData); - } } } @@ -93,35 +116,35 @@ _exit: tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); } else { tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, - pMem->nRow); + nRow); } return code; } -static int32_t tsdbCommitTombstoneData(SCommitter *pCommitter) { +static int32_t tsdbCommitDelData(SCommitter *pCommitter) { int32_t code = 0; int32_t lino; + int64_t nDel = 0; SMemTable *pMem = pCommitter->pTsdb->imem; - if (pMem->nDel == 0) goto _exit; + if (pMem->nDel == 0) { // no del data + goto _exit; + } for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); - if (pTbData->pHead == NULL) continue; - for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) { - if (pDelData->sKey > pCommitter->maxKey || pDelData->eKey < pCommitter->minKey) continue; + if (pDelData->eKey < pCommitter->minKey) continue; + if (pDelData->sKey > pCommitter->maxKey) { + pCommitter->nextKey = TMIN(pCommitter->nextKey, pDelData->sKey); + continue; + } - // code = tsdbAppendDelData(pCommitter, pTbData->suid, pTbData->uid, TMAX(pDelData->sKey, pCommitter->minKey), - // TMIN(pDelData->eKey, pCommitter->maxKey), pDelData->version); + code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version, + pDelData->sKey /* TODO */, pDelData->eKey /* TODO */); TSDB_CHECK_CODE(code, lino, _exit); - - if (/* TODO */ 0 > pCommitter->maxRow) { - // code = tsdbWriteDelBlock(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } } } @@ -135,19 +158,6 @@ _exit: return code; } -static int32_t tsdbCommitDelData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - // TODO - -_exit: - if (code) { - tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); - } - return code; -} - static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; @@ -156,11 +166,6 @@ static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) { tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, &pCommitter->maxKey); pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); -#if 0 - // pCommitter->cid = tsdbFileSetNextCid(STsdb * pTsdb, pCommitter->fid); -#else - pCommitter->cid = 0; -#endif // TODO @@ -188,8 +193,6 @@ static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - // fset commit start code = tsdbCommitFSetStart(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); @@ -198,7 +201,7 @@ static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) { code = tsdbCommitTimeSeriesData(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbCommitTombstoneData(pCommitter); + code = tsdbCommitDelData(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); // fset commit end @@ -207,7 +210,8 @@ static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c b/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c index 1f78d23580..8a6bbe897c 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c @@ -6,9 +6,11 @@ typedef struct SSttFReader SSttFReader; extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD); struct SSttFWriter { - STsdb *pTsdb; - STsdbFD *pFd; - SSttFile file; + STsdb *pTsdb; + STsdbFD *pFd; + SSttFile file; + SBlockData bData; + SArray *aSttBlk; }; int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) { @@ -42,22 +44,10 @@ int32_t tsdbSttFWriterClose(SSttFWriter *pWritter) { return 0; } -int32_t tsdbWriteSttBlockData(SSttFWriter *pWritter, SBlockData *pBlockData, SSttBlk *pSttBlk) { - // TODO - return 0; -} - -int32_t tsdbWriteSttBlockIdx(SSttFWriter *pWriter, SArray *aSttBlk) { - // TODO - return 0; -} +int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow) { + int32_t code = 0; + int32_t lino = 0; -int32_t tsdbWriteSttDelData(SSttFWriter *pWriter) { - // TODO + // TODO write row return 0; } - -int32_t tsdbWriteSttDelIdx(SSttFWriter *pWriter) { - // TODO - return 0; -} \ No newline at end of file -- GitLab