From 8afa2c53540c3c1748f9e9ffbaf1715fc9eaa6e5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 30 Mar 2023 16:06:07 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 4 +- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c | 57 ++++++++++++++----- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h | 4 +- source/dnode/vnode/src/tsdb/dev/tsdbUtil.c | 28 +++++++++ source/dnode/vnode/src/tsdb/dev/tsdbUtil.h | 18 ++++-- 5 files changed, 87 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 1f11062514..d73174eba8 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -36,7 +36,7 @@ typedef struct { struct SSttFWriter *pWriter; } SCommitter; -static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) { +static int32_t open_committer_writer(SCommitter *pCommitter) { int32_t code; int32_t lino; @@ -82,7 +82,7 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB int32_t lino; if (pCommitter->pWriter == NULL) { - code = tsdbCommitOpenWriter(pCommitter); + code = open_committer_writer(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c index 379d29d2b1..076579e7f0 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c @@ -21,12 +21,14 @@ extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size); extern int32_t tsdbFsyncFile(STsdbFD *pFD); +typedef struct SFDataPtr { + int64_t offset; + int64_t size; +} SFDataPtr; + typedef struct { - struct { - int64_t offset; - int64_t size; - } dict[4]; // 0:bloom filter, 1:SSttBlk, 2:SDelBlk, 3:STbStatisBlk - uint8_t reserved[32]; + SFDataPtr dict[4]; // 0:bloom filter, 1:SSttBlk, 2:SDelBlk, 3:STbStatisBlk + uint8_t reserved[32]; } SFSttFooter; struct SSttFWriter { @@ -153,15 +155,38 @@ static int32_t create_stt_fwriter(const struct SSttFWriterConf *pConf, struct SS ppWriter[0]->config.aBuf = ppWriter[0]->aBuf; } + // time-series data block tBlockDataCreate(&ppWriter[0]->bData); - ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk)); - if (ppWriter[0]->aSttBlk == NULL) { + if ((ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + // deleted data block + if ((code = tDelBlockCreate(&ppWriter[0]->dData, pConf->maxRow))) goto _exit; + if ((ppWriter[0]->aDelBlk = taosArrayInit(64, sizeof(SDelBlk))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // statistics data block + if ((code = tTbStatisBlockCreate(&ppWriter[0]->sData, pConf->maxRow))) goto _exit; + if ((ppWriter[0]->aStatisBlk = taosArrayInit(64, sizeof(STbStatisBlock))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // TODO: bloom filter + _exit: if (code && ppWriter[0]) { + // statistics data block + taosArrayDestroy(ppWriter[0]->aStatisBlk); + tTbStatisBlockDestroy(&ppWriter[0]->sData); + // deleted data block + taosArrayDestroy(ppWriter[0]->aDelBlk); + tDelBlockDestroy(&ppWriter[0]->dData); + // time-series data block taosArrayDestroy(ppWriter[0]->aSttBlk); tBlockDataDestroy(&ppWriter[0]->bData); taosMemoryFree(ppWriter[0]); @@ -172,9 +197,16 @@ _exit: static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) { if (pWriter) { - tDestroyTSchema(pWriter->skmTb.pTSchema); + for (int32_t i = 0; ARRAY_SIZE(pWriter->aBuf); i++) tFree(pWriter->aBuf[i]); tDestroyTSchema(pWriter->skmRow.pTSchema); - for (int32_t i = 0; i < sizeof(pWriter->aBuf) / sizeof(pWriter->aBuf[0]); i++) taosMemoryFree(pWriter->aBuf[i]); + tDestroyTSchema(pWriter->skmTb.pTSchema); + // statistics data block + taosArrayDestroy(pWriter->aStatisBlk); + tTbStatisBlockDestroy(&pWriter->sData); + // deleted data block + taosArrayDestroy(pWriter->aDelBlk); + tDelBlockDestroy(&pWriter->dData); + // time-series data block taosArrayDestroy(pWriter->aSttBlk); tBlockDataDestroy(&pWriter->bData); taosMemoryFree(pWriter); @@ -186,7 +218,7 @@ static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; - int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO + int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd); TSDB_CHECK_CODE(code, lino, _exit); @@ -217,10 +249,7 @@ int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWrit _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code)); - if (ppWriter[0]) { - taosMemoryFree(ppWriter[0]); - ppWriter[0] = NULL; - } + if (ppWriter[0]) destroy_stt_fwriter(ppWriter[0]); } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h index 638fe6e03a..42203617f4 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h @@ -33,11 +33,11 @@ int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData struct SSttFWriterConf { STsdb *pTsdb; struct STFile file; - SSkmInfo *pSkmTb; - SSkmInfo *pSkmRow; int32_t maxRow; int32_t szPage; int8_t cmprAlg; + SSkmInfo *pSkmTb; + SSkmInfo *pSkmRow; uint8_t **aBuf; }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c index b411e845aa..8acb6a1d72 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c @@ -15,6 +15,7 @@ #include "dev.h" +// SDelBlock ---------- int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity) { int32_t code; @@ -53,6 +54,33 @@ int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelDat return 0; } +// STbStatisBlock ---------- + +int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity) { + memset(pTbStatisBlock, 0, sizeof(*pTbStatisBlock)); + pTbStatisBlock->capacity = capacity; + for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) { + if (tRealloc((uint8_t **)&pTbStatisBlock->aData[i], sizeof(int64_t) * capacity)) { + for (i--; i >= 0; --i) tFree(pTbStatisBlock->aData[i]); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return 0; +} + +int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) { + tFree(pTbStatisBlock->aData[i]); + } + return 0; +} + +int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock) { + pTbStatisBlock->nRow = 0; + return 0; +} + +// other apis ---------- int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb) { if (tbid->suid) { if (pSkmTb->suid == tbid->suid) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h index dbdb7b7463..6fafd2af5b 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h @@ -22,18 +22,24 @@ extern "C" { #endif -/* Exposed Handle */ -typedef struct SDelBlock SDelBlock; -typedef struct SDelBlk SDelBlk; -typedef struct STbStatisBlock STbStatisBlock; -typedef struct STbStatisBlk STbStatisBlk; +// SDelBlock ---------- +typedef struct SDelBlock SDelBlock; +typedef struct SDelBlk SDelBlk; -/* Exposed APIs */ int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity); int32_t tDelBlockDestroy(SDelBlock *pDelBlock); int32_t tDelBlockClear(SDelBlock *pDelBlock); int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData); +// STbStatisBlock ---------- +typedef struct STbStatisBlock STbStatisBlock; +typedef struct STbStatisBlk STbStatisBlk; + +int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity); +int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock); +int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock); + +// other apis int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb); int32_t tsdbUpdateSkmRow(STsdb *pTsdb, const TABLEID *tbid, int32_t sver, SSkmInfo *pSkmRow); -- GitLab