From f0caae86af43516394f90f352360fc8b0c68e40b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 22 Jun 2022 12:03:44 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 13 +++++- source/dnode/vnode/src/tsdb/tsdbCommit.c | 54 +++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbFile.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 36 +++++++-------- 4 files changed, 81 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 849b5bd49a..d9288753f3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -71,6 +71,9 @@ typedef struct STsdbFS STsdbFS; #define HAS_NULL ((int8_t)0x2) #define HAS_VALUE ((int8_t)0x4) +#define VERSION_MIN 0 +#define VERSION_MAX INT64_MAX + // tsdbUtil.c ============================================================================================== // TSDBROW #define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow) @@ -277,7 +280,10 @@ struct SDelDataInfo { struct STbData { tb_uid_t suid; tb_uid_t uid; - KEYINFO info; + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; SDelData *pHead; SDelData *pTail; SMemSkipList sl; @@ -287,7 +293,10 @@ struct SMemTable { SRWLatch latch; STsdb *pTsdb; int32_t nRef; - KEYINFO info; + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; int64_t nRow; int64_t nDel; SArray *aTbData; // SArray diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 5c01333fc7..7814d5905b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -830,6 +830,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { memset(pCommitter, 0, sizeof(*pCommitter)); ASSERT(pTsdb->mem && pTsdb->imem == NULL); + // lock(); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; @@ -841,9 +842,50 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + code = tsdbFSBegin(pTsdb->fs); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { + int32_t code = 0; + + pCommitter->pReader = NULL; + pCommitter->oBlockIdxMap = tMapDataInit(); + pCommitter->oBlockMap = tMapDataInit(); + pCommitter->oBlock = tBlockInit(); + pCommitter->pWriter = NULL; + pCommitter->nBlockIdxMap = tMapDataInit(); + pCommitter->nBlockMap = tMapDataInit(); + pCommitter->nBlock = tBlockInit(); + code = tBlockDataInit(&pCommitter->oBlockData); + if (code) goto _exit; + code = tBlockDataInit(&pCommitter->nBlockData); + if (code) { + tBlockDataClear(&pCommitter->oBlockData); + goto _exit; + } + +_exit: return code; } +static void tsdbCommitDataEnd(SCommitter *pCommitter) { + tMapDataClear(&pCommitter->oBlockIdxMap); + tMapDataClear(&pCommitter->oBlockMap); + tBlockClear(&pCommitter->oBlock); + tBlockDataClear(&pCommitter->oBlockData); + tMapDataClear(&pCommitter->nBlockIdxMap); + tMapDataClear(&pCommitter->nBlockMap); + tBlockClear(&pCommitter->nBlock); + tBlockDataClear(&pCommitter->nBlockData); +} + static int32_t tsdbCommitData(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -852,8 +894,12 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { // check if (pMemTable->nRow == 0) goto _exit; - // loop - pCommitter->nextKey = pMemTable->info.minKey.ts; + // start ==================== + code = tsdbCommitDataStart(pCommitter); + if (code) return code; + + // impl ==================== + pCommitter->nextKey = pMemTable->minKey; while (pCommitter->nextKey < TSKEY_MAX) { pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, @@ -862,11 +908,15 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { if (code) goto _err; } + // end ==================== + tsdbCommitDataEnd(pCommitter); + _exit: tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow); return code; _err: + tsdbCommitDataEnd(pCommitter); tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 0e7ac9f0d7..66fdc3bdd0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -19,7 +19,7 @@ static const char *tsdbFileSuffix[] = {".del", ".cache", ".head", ".data", ".las // SHeadFile =============================================== void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]) { - // TODO + // snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/v%df%dver%18d.head", ); } // SDataFile =============================================== diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 39c4f48c27..6b3e682f82 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -42,7 +42,10 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosInitRWLatch(&pMemTable->latch); pMemTable->pTsdb = pTsdb; pMemTable->nRef = 1; - pMemTable->info = tKEYINFOInit(); + pMemTable->minKey = TSKEY_MAX; + pMemTable->maxKey = TSKEY_MIN; + pMemTable->minVersion = VERSION_MAX; + pMemTable->maxVersion = VERSION_MIN; pMemTable->nRow = 0; pMemTable->nDel = 0; pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *)); @@ -174,7 +177,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; - if (tsdbKeyCmprFn(&lastKey, &pTbData->info.maxKey) >= 0) { + if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid); } @@ -324,7 +327,10 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid } pTbData->suid = suid; pTbData->uid = uid; - pTbData->info = tKEYINFOInit(); + pTbData->minKey = TSKEY_MAX; + pTbData->maxKey = TSKEY_MIN; + pTbData->minVersion = VERSION_MAX; + pTbData->maxVersion = VERSION_MIN; pTbData->pHead = NULL; pTbData->pTail = NULL; pTbData->sl.seed = taosRand(); @@ -515,13 +521,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i goto _err; } - if (tsdbKeyCmprFn(&key, &pTbData->info.minKey) < 0) { - pTbData->info.minKey = key; - } - - if (tsdbKeyCmprFn(&key, &pMemTable->info.minKey) < 0) { - pMemTable->info.minKey = key; - } + if (pTbData->minKey > key.ts) pTbData->minKey = key.ts; + if (pMemTable->minKey > key.ts) pMemTable->minKey = key.ts; pLastRow = row.pTSRow; @@ -546,21 +547,18 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i } while (row.pTSRow); } - if (tsdbKeyCmprFn(&key, &pTbData->info.maxKey) > 0) { - pTbData->info.maxKey = key; + if (key.ts > pTbData->maxKey) { + pTbData->maxKey = key.ts; if (pLastRow) { tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow); } } + if (key.ts > pMemTable->maxKey) pMemTable->maxKey = key.ts; + if (pTbData->minVersion > version) pTbData->minVersion = version; + if (pTbData->maxVersion < version) pTbData->maxVersion = version; + pMemTable->nRow += nRow; - if (tsdbKeyCmprFn(&key, &pMemTable->info.maxKey) > 0) { - pMemTable->info.maxKey = key; - } - if (pTbData->info.minVerion > version) pTbData->info.minVerion = version; - if (pTbData->info.maxVersion < version) pTbData->info.maxVersion = version; - - pMemTable->nRef++; pRsp->numOfRows = nRow; pRsp->affectedRows = nRow; -- GitLab