From b881b87218ccdacff1cc305c47c8d08d524e784b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 25 Jun 2023 18:02:31 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 26 +-- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 12 +- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 184 +++++++++++++------ 3 files changed, 147 insertions(+), 75 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 16559c83ab..7f5d9c83f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -15,10 +15,9 @@ #include "tsdbDataFileRW.h" -extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr); -extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, - int64_t *fileSize); +extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); // SDataFileReader ============================================= struct SDataFileReader { @@ -1161,8 +1160,8 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, - &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr); + code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, + &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -1178,8 +1177,8 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr, - &writer->files[TSDB_FTYPE_TOMB].size); + code = tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr, + &writer->files[TSDB_FTYPE_TOMB].size); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -1189,14 +1188,19 @@ _exit: return code; } +int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize) { + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); + if (code) return code; + *fileSize += sizeof(*footer); + return 0; +} + static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, - (const uint8_t *)writer->tombFooter, sizeof(STombFooter)); + code = tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter, &writer->files[TSDB_FTYPE_TOMB].size); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_TOMB].size += sizeof(STombFooter); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 674802a6ca..b09ab71e08 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -522,8 +522,8 @@ _exit: return code; } -int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr) { +int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr) { int32_t code; if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; @@ -584,8 +584,8 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, - writer->tombBlkArray, writer->config->bufArr); + code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, + writer->tombBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -644,7 +644,7 @@ _exit: return code; } -int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) { +int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) { ptr->size = TARRAY2_DATA_LEN(tombBlkArray); if (ptr->size > 0) { ptr->offset = *fileSize; @@ -663,7 +663,7 @@ static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size); + code = tsdbFileWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index fdb3183c20..8efe87ef62 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -29,6 +29,10 @@ extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHe extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize); +extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); +extern int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize); static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; @@ -362,35 +366,102 @@ _exit: return code; } +static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **fd, STFileObj **fobj, bool *toStt) { + int32_t code = 0; + int32_t lino = 0; + + if (TARRAY2_SIZE(fset->lvlArr) == 0) { // to .tomb file + *toStt = false; + + STFile file = { + .type = TSDB_FTYPE_TOMB, + .did = fset->farr[TSDB_FTYPE_HEAD]->f->did, + .fid = fset->fid, + .cid = 0, + .size = 0, + }; + + code = tsdbTFileObjInit(tsdb, &file, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + + fset->farr[TSDB_FTYPE_TOMB] = *fobj; + } else { // to .stt file + *toStt = true; + SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, 0); + + STFile file = { + .type = TSDB_FTYPE_STT, + .did = TARRAY2_GET(lvl->fobjArr, 0)->f->did, + .fid = fset->fid, + .cid = 0, + .size = 0, + }; + + code = tsdbTFileObjInit(tsdb, &file, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(lvl->fobjArr, fobj[0]); + TSDB_CHECK_CODE(code, lino, _exit); + } + + char fname[TSDB_FILENAME_LEN] = {0}; + code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize, + TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); + TSDB_CHECK_CODE(code, lino, _exit); + + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; + code = tsdbWriteFile(fd[0], 0, hdr, TSDB_FHDR_SIZE); + TSDB_CHECK_CODE(code, lino, _exit); + fobj[0]->f->size += TSDB_FHDR_SIZE; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; - SArray *aDelData = NULL; - int64_t minKey, maxKey; - STombBlock tombBlock[1] = {0}; - TTombBlkArray tombBlkArray[1] = {0}; - STsdbFD *fd = NULL; + struct { + // context + bool toStt; + int8_t cmprAlg; + int32_t maxRow; + int64_t minKey; + int64_t maxKey; + uint8_t *bufArr[8]; + // reader + SArray *aDelData; + // writer + STsdbFD *fd; + STFileObj *fobj; + STombBlock tombBlock[1]; + TTombBlkArray tombBlkArray[1]; + STombFooter tombFooter[1]; + SSttFooter sttFooter[1]; + } ctx[1] = {{ + .maxRow = tsdb->pVnode->config.tsdbCfg.maxRows, + .cmprAlg = tsdb->pVnode->config.tsdbCfg.compression, + }}; - tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); + tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &ctx->minKey, &ctx->maxKey); - if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { + if ((ctx->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) { - SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i); + for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { + SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(aDelIdx, iDelIdx); - code = tsdbReadDelData(reader, pDelIdx, aDelData); + code = tsdbReadDelData(reader, pDelIdx, ctx->aDelData); TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) { - SDelData *pDelData = taosArrayGet(aDelData, j); - - if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { - continue; - } + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(ctx->aDelData); iDelData++) { + SDelData *pDelData = (SDelData *)taosArrayGet(ctx->aDelData, iDelData); STombRecord record = { .suid = pDelIdx->suid, @@ -400,64 +471,62 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * .ekey = pDelData->eKey, }; - code = tTombBlockPut(tombBlock, &record); + code = tTombBlockPut(ctx->tombBlock, &record); TSDB_CHECK_CODE(code, lino, _exit); - if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { - if (fd == NULL) { - STFile file = { - .type = TSDB_FTYPE_TOMB, - .did = {0}, // TODO - .fid = fset->fid, - .cid = 0, // TODO - }; - - code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]); + if (TOMB_BLOCK_SIZE(ctx->tombBlock) > ctx->maxRow) { + if (ctx->fd == NULL) { + code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize, - TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd); - TSDB_CHECK_CODE(code, lino, _exit); - - uint8_t hdr[TSDB_FHDR_SIZE] = {0}; - code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE); - TSDB_CHECK_CODE(code, lino, _exit); - fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr); } - - // TODO - tTombBlockClear(tombBlock); + code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, + ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); } } } - if (TOMB_BLOCK_SIZE(tombBlock) > 0) { - // TODO - tTombBlockClear(tombBlock); + if (TOMB_BLOCK_SIZE(ctx->tombBlock) > 0) { + if (ctx->fd == NULL) { + code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); + TSDB_CHECK_CODE(code, lino, _exit); + } + code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, + ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); } - if (TARRAY2_SIZE(tombBlkArray) > 0) { - // TODO - } + if (ctx->fd != NULL) { + if (ctx->toStt) { + code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); - if (fd) { - // write footer + code = tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); - // sync and close - code = tsdbFsyncFile(fd); + code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbFsyncFile(ctx->fd); TSDB_CHECK_CODE(code, lino, _exit); - tsdbCloseFile(&fd); - } - // clear - TARRAY2_DESTROY(tombBlkArray, NULL); - tTombBlockDestroy(tombBlock); - taosArrayDestroy(aDelData); + tsdbCloseFile(&ctx->fd); + } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } + for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); i++) { + tFree(ctx->bufArr[i]); + } + TARRAY2_DESTROY(ctx->tombBlkArray, NULL); + tTombBlockDestroy(ctx->tombBlock); + taosArrayDestroy(ctx->aDelData); return code; } @@ -487,13 +556,12 @@ static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArra } } - tsdbDelFReaderClose(&reader); - taosArrayDestroy(aDelIdx); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } + tsdbDelFReaderClose(&reader); + taosArrayDestroy(aDelIdx); return code; } -- GitLab