From 0090d02226e18fdbee81f08d4ae5e0e4ec4ef999 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 26 May 2023 23:42:47 +0800 Subject: [PATCH] more code --- include/util/tarray2.h | 12 +- .../src/tsdb/dev/inc/tsdbSttFReaderWriter.h | 40 ++-- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 2 +- .../vnode/src/tsdb/dev/tsdbSttFReaderWriter.c | 212 +++++++++++++++--- 4 files changed, 211 insertions(+), 55 deletions(-) diff --git a/include/util/tarray2.h b/include/util/tarray2.h index db0fc75976..9b67c84563 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -67,13 +67,15 @@ static FORCE_INLINE int32_t tarray2_make_room( // return 0; } -#define TARRAY2_INIT(a) \ - do { \ - (a)->size = 0; \ - (a)->capacity = 0; \ - (a)->data = NULL; \ +#define TARRAY2_INIT_EX(a, size_, capacity_, data_) \ + do { \ + (a)->size = (size_); \ + (a)->capacity = (capacity_); \ + (a)->data = (data_); \ } while (0) +#define TARRAY2_INIT(a) TARRAY2_INIT_EX(a, 0, 0, NULL) + #define TARRAY2_FREE(a) \ do { \ if ((a)->data) { \ diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index 65e95aa88f..cf7d0765eb 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -13,43 +13,49 @@ * along with this program. If not, see . */ +#include "tsdbFS.h" +#include "tsdbUtil.h" + #ifndef _TSDB_STT_FILE_WRITER_H #define _TSDB_STT_FILE_WRITER_H -#include "tsdbFS.h" - #ifdef __cplusplus extern "C" { #endif typedef TARRAY2(SSttBlk) TSttBlkArray; +typedef TARRAY2(SDelBlk) TDelBlkArray; +typedef TARRAY2(STbStatisBlk) TStatisBlkArray; // SSttFileReader ========================================== -typedef struct SSttFSegReader SSttFSegReader; typedef struct SSttFileReader SSttFileReader; typedef struct SSttFileReaderConfig SSttFileReaderConfig; +typedef struct SSttSegReader SSttSegReader; +typedef TARRAY2(SSttSegReader *) TSttSegReaderArray; // SSttFileReader -int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader); -int32_t tsdbSttFReaderClose(SSttFileReader **ppReader); +int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader); +int32_t tsdbSttFReaderClose(SSttFileReader **reader); +int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **segReaderArray); + +// SSttSegReader +int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter); -// SSttFSegReader -int32_t tsdbSttFSegReaderOpen(SSttFileReader *pReader, SSttFSegReader **ppSegReader, int32_t nSegment); -int32_t tsdbSttFSegReaderClose(SSttFSegReader **ppSegReader); -int32_t tsdbSttFSegReadBloomFilter(SSttFSegReader *pSegReader, const void *pFilter); -int32_t tsdbSttFSegReadStatisBlk(SSttFSegReader *pSegReader, const SArray *pStatis); -int32_t tsdbSttFSegReadDelBlk(SSttFSegReader *pSegReader, const SArray *pDelBlk); -int32_t tsdbSttFSegReadSttBlk(SSttFSegReader *pSegReader, const SArray *pSttBlk); -int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBlock); -int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock); -int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock); +int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray); +int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray); +int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray); + +int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData); +int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData); +int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData); struct SSttFileReaderConfig { - STsdb *pTsdb; + STsdb *tsdb; SSkmInfo *pSkmTb; SSkmInfo *pSkmRow; uint8_t **aBuf; - // TODO + int32_t szPage; + STFile file; }; // SSttFileWriter ========================================== diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 69a726f7d1..b4176d9526 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -199,7 +199,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { // open the reader SSttFileReader *reader; SSttFileReaderConfig config = { - .pTsdb = merger->tsdb, + .tsdb = merger->tsdb, // TODO }; code = tsdbSttFReaderOpen(&config, &reader); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index e2f241d8f4..669368c861 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "dev.h" +#include "inc/tsdbSttFReaderWriter.h" typedef struct { int64_t prevFooter; @@ -23,73 +23,221 @@ typedef struct { // SSttFReader ============================================================ struct SSttFileReader { - SSttFileReaderConfig *config; - // TODO + SSttFileReaderConfig config; + TSttSegReaderArray segReaderArray; + STsdbFD *fd; +}; + +struct SSttSegReader { + SSttFileReader *reader; + + struct { + bool bloomFilterLoaded; + bool sttBlkLoaded; + bool delBlkLoaded; + bool statisBlkLoaded; + } ctx; + + SFSttFooter footer; + void *bloomFilter; + TSttBlkArray sttBlkArray; + TDelBlkArray delBlkArray; + TStatisBlkArray statisBlkArray; }; // SSttFileReader -int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader) { +static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) { int32_t code = 0; - // TODO + int32_t lino = 0; + int32_t vid = TD_VID(reader->config.tsdb->pVnode); + + ASSERT(offset >= TSDB_FHDR_SIZE); + + segReader[0] = taosMemoryCalloc(1, sizeof(*segReader[0])); + if (segReader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + segReader[0]->reader = reader; + code = tsdbReadFile(reader->fd, offset, (uint8_t *)(&segReader[0]->footer), sizeof(segReader[0]->footer)); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + taosMemoryFree(segReader[0]); + segReader[0] = NULL; + } return code; } -int32_t tsdbSttFReaderClose(SSttFileReader **ppReader) { - int32_t code = 0; - // TODO - return code; +static int32_t tsdbSttSegReaderClose(SSttSegReader **segReader) { + if (!segReader[0]) return 0; + + if (segReader[0]->ctx.bloomFilterLoaded) { + // TODO + } + if (segReader[0]->ctx.sttBlkLoaded) { + TARRAY2_FREE(&segReader[0]->sttBlkArray); + } + if (segReader[0]->ctx.delBlkLoaded) { + TARRAY2_FREE(&segReader[0]->delBlkArray); + } + if (segReader[0]->ctx.statisBlkLoaded) { + TARRAY2_FREE(&segReader[0]->statisBlkArray); + } + taosMemoryFree(segReader[0]); + segReader[0] = NULL; + return 0; } -// SSttFSegReader -int32_t tsdbSttFSegReaderOpen(SSttFileReader *pReader, SSttFSegReader **ppSegReader, int32_t nSegment) { +int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader) { int32_t code = 0; - // TODO + int32_t lino = 0; + int32_t vid = TD_VID(config->tsdb->pVnode); + + reader[0] = taosMemoryCalloc(1, sizeof(*reader[0])); + if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + reader[0]->config = config[0]; + TARRAY2_INIT(&reader[0]->segReaderArray); + + // open file + char fname[TSDB_FILENAME_LEN]; + tsdbTFileName(config->tsdb, &config->file, fname); + code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + + // open each segment reader + int64_t size = config->file.size; + while (size > 0) { + SSttSegReader *segReader; + + code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SFSttFooter), &segReader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(&reader[0]->segReaderArray, segReader); + TSDB_CHECK_CODE(code, lino, _exit); + + size = segReader->footer.prevFooter; + } + + ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt.nseg); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + tsdbSttFReaderClose(reader); + } return code; } -int32_t tsdbSttFSegReaderClose(SSttFSegReader **ppSegReader) { - int32_t code = 0; - // TODO - return code; +int32_t tsdbSttFReaderClose(SSttFileReader **reader) { + tsdbCloseFile(&reader[0]->fd); + TARRAY2_CLEAR_FREE(&reader[0]->segReaderArray, tsdbSttSegReaderClose); + taosMemoryFree(reader[0]); + reader[0] = NULL; + return 0; } -int32_t tsdbSttFSegReadBloomFilter(SSttFSegReader *pSegReader, const void *pFilter) { - int32_t code = 0; - // TODO - return code; +int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **segReaderArray) { + segReaderArray[0] = &reader->segReaderArray; + return 0; } -int32_t tsdbSttFSegReadStatisBlk(SSttFSegReader *pSegReader, const SArray *pStatis) { +// SSttFSegReader +int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter) { int32_t code = 0; // TODO return code; } -int32_t tsdbSttFSegReadDelBlk(SSttFSegReader *pSegReader, const SArray *pDelBlk) { - int32_t code = 0; - // TODO - return code; +int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) { + if (!reader->ctx.statisBlkLoaded) { + SFDataPtr fptr = reader->footer.dict[2]; + if (fptr.size > 0) { + ASSERT(fptr.size % sizeof(STbStatisBlk) == 0); + + int32_t size = fptr.size / sizeof(STbStatisBlk); + void *data = taosMemoryMalloc(fptr.size); + if (!data) return TSDB_CODE_OUT_OF_MEMORY; + + int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size); + if (code) return code; + + TARRAY2_INIT_EX(&reader->statisBlkArray, size, size, data); + } else { + TARRAY2_INIT(&reader->statisBlkArray); + } + + reader->ctx.statisBlkLoaded = true; + } + + statisBlkArray[0] = &reader->statisBlkArray; + return 0; } -int32_t tsdbSttFSegReadSttBlk(SSttFSegReader *pSegReader, const SArray *pSttBlk) { - int32_t code = 0; - // TODO - return code; +int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) { + if (!reader->ctx.delBlkLoaded) { + SFDataPtr fptr = reader->footer.dict[3]; + if (fptr.size > 0) { + ASSERT(fptr.size % sizeof(SDelBlk) == 0); + + int32_t size = fptr.size / sizeof(SDelBlk); + void *data = taosMemoryMalloc(fptr.size); + if (!data) return TSDB_CODE_OUT_OF_MEMORY; + + int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size); + if (code) return code; + + TARRAY2_INIT_EX(&reader->delBlkArray, size, size, data); + } else { + TARRAY2_INIT(&reader->delBlkArray); + } + + reader->ctx.delBlkLoaded = true; + } + + delBlkArray[0] = &reader->delBlkArray; + return 0; +} + +int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) { + if (!reader->ctx.sttBlkLoaded) { + SFDataPtr fptr = reader->footer.dict[1]; + if (fptr.size > 0) { + ASSERT(fptr.size % sizeof(SSttBlk) == 0); + + int32_t size = fptr.size / sizeof(SSttBlk); + void *data = taosMemoryMalloc(fptr.size); + if (!data) return TSDB_CODE_OUT_OF_MEMORY; + + int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size); + if (code) return code; + + TARRAY2_INIT_EX(&reader->sttBlkArray, size, size, data); + } else { + TARRAY2_INIT(&reader->sttBlkArray); + } + + reader->ctx.sttBlkLoaded = true; + } + + sttBlkArray[0] = &reader->sttBlkArray; + return 0; } -int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBlock) { +int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) { int32_t code = 0; // TODO return code; } -int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock) { +int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) { int32_t code = 0; // TODO return code; } -int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock) { +int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) { int32_t code = 0; // TODO return code; -- GitLab