diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 9f0a34f0db8950d63a36404c6b372d0ce84cf8aa..45c3de234765e301c96a8173594ea2b9456838df 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -44,11 +44,24 @@ typedef struct SDataFileReaderConfig { int32_t tsdbDataFileReaderOpen(const char *fname[/* TSDB_FTYPE_MAX */], const SDataFileReaderConfig *config, SDataFileReader **reader); int32_t tsdbDataFileReaderClose(SDataFileReader **reader); +// .head +int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray); +int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock); +// .data +int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData); +int32_t tsdbDataFileReadBlockDataByCol(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, + STSchema *pTSchema, int32_t cidArr[], int32_t numCid); +// .sma +int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader); +// .tomb +int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray); +int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData); + +#if 1 int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray); int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray); int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData); -int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray); -int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData); +#endif // SDataFileWriter ============================================= typedef struct SDataFileWriter SDataFileWriter; @@ -72,9 +85,11 @@ typedef struct SDataFileWriterConfig { int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr); + int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row); int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer); + int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index 9a74cc6494fbaeaf6d0bf45b7a1307902699dc89..69226477bab1e327f08669a159f0e507d86b94e3 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -165,13 +165,19 @@ typedef union { } SBrinBlock; typedef struct { + SFDataPtr dp[1]; TABLEID minTbid; TABLEID maxTbid; int64_t minVer; int64_t maxVer; - SFDataPtr dp[1]; + int32_t numRec; + int32_t size[15]; + int8_t cmprAlg; + int8_t rsvd[7]; } SBrinBlk; +typedef TARRAY2(SBrinBlk) TBrinBlkArray; + #define BRIN_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) int32_t tBrinBlockInit(SBrinBlock *brinBlock); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index 7ce63b72e46c0f3ea288bb579d0459cbc7d6edbb..cf5b93a7b61c3b178cb298a22740df0cd85636d0 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -16,7 +16,10 @@ #include "inc/tsdbDataFileRW.h" typedef struct { + SFDataPtr brinBlkPtr[1]; +#if 1 SFDataPtr blockIdxPtr[1]; +#endif SFDataPtr rsrvd[2]; } SHeadFooter; @@ -27,40 +30,48 @@ typedef struct { // SDataFileReader ============================================= struct SDataFileReader { - struct SDataFileReaderConfig config[1]; + SDataFileReaderConfig config[1]; uint8_t *bufArr[5]; struct { - bool headFooterLoaded; - bool tombFooterLoaded; - bool blockIdxLoaded; - bool tombBlkLoaded; + bool headFooterLoaded; + bool tombFooterLoaded; + bool brinBlkLoaded; + bool tombBlkLoaded; + +#if 1 TABLEID tbid[1]; + bool blockIdxLoaded; +#endif } ctx[1]; STsdbFD *fd[TSDB_FTYPE_MAX]; - SHeadFooter headFooter[1]; - STombFooter tombFooter[1]; - TBlockIdxArray blockIdxArray[1]; + SHeadFooter headFooter[1]; + STombFooter tombFooter[1]; + TBrinBlkArray brinBlkArray[1]; + TTombBlkArray tombBlkArray[1]; + +#if 1 TDataBlkArray dataBlkArray[1]; - TTombBlkArray tombBlkArray[1]; + TBlockIdxArray blockIdxArray[1]; +#endif }; static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) { - if (reader->fd[TSDB_FTYPE_HEAD] == NULL // - || reader->ctx->headFooterLoaded) { - return 0; - } + if (reader->ctx->headFooterLoaded) return 0; int32_t code = 0; int32_t lino = 0; - code = - tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->config->files[TSDB_FTYPE_HEAD].file.size - sizeof(SHeadFooter), - (uint8_t *)reader->headFooter, sizeof(SHeadFooter)); - TSDB_CHECK_CODE(code, lino, _exit); + int32_t ftype = TSDB_FTYPE_HEAD; + if (reader->fd[ftype]) { + code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter), + (uint8_t *)reader->headFooter, sizeof(SHeadFooter)); + TSDB_CHECK_CODE(code, lino, _exit); + } + reader->ctx->headFooterLoaded = true; _exit: @@ -71,18 +82,17 @@ _exit: } static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) { - if (reader->fd[TSDB_FTYPE_TOMB] == NULL // - || reader->ctx->tombFooterLoaded) { - return 0; - } + if (reader->ctx->tombFooterLoaded) return 0; int32_t code = 0; int32_t lino = 0; - code = - tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->config->files[TSDB_FTYPE_TOMB].file.size - sizeof(STombFooter), - (uint8_t *)reader->tombFooter, sizeof(STombFooter)); - TSDB_CHECK_CODE(code, lino, _exit); + int32_t ftype = TSDB_FTYPE_TOMB; + if (reader->fd[ftype]) { + code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter), + (uint8_t *)reader->tombFooter, sizeof(STombFooter)); + TSDB_CHECK_CODE(code, lino, _exit); + } reader->ctx->tombFooterLoaded = true; _exit: @@ -97,13 +107,13 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig int32_t lino = 0; reader[0] = taosMemoryCalloc(1, sizeof(**reader)); - if (!reader[0]) { + if (reader[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } reader[0]->config[0] = config[0]; - if (!reader[0]->config->bufArr) { + if (reader[0]->config->bufArr == NULL) { reader[0]->config->bufArr = reader[0]->bufArr; } @@ -135,22 +145,158 @@ _exit: int32_t tsdbDataFileReaderClose(SDataFileReader **reader) { if (reader[0] == NULL) return 0; + TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL); + TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL); + +#if 1 TARRAY2_DESTROY(reader[0]->dataBlkArray, NULL); TARRAY2_DESTROY(reader[0]->blockIdxArray, NULL); +#endif for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { - tsdbCloseFile(&reader[0]->fd[i]); + if (reader[0]->fd[i]) { + tsdbCloseFile(&reader[0]->fd[i]); + } } for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) { tFree(reader[0]->bufArr[i]); } + taosMemoryFree(reader[0]); reader[0] = NULL; - return 0; } +int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) { + int32_t code = 0; + int32_t lino = 0; + + if (!reader->ctx->brinBlkLoaded) { + code = tsdbDataFileReadHeadFooter(reader); + TSDB_CHECK_CODE(code, lino, _exit); + + if (reader->headFooter->brinBlkPtr->size > 0) { + void *data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size); + if (data == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data, + reader->headFooter->brinBlkPtr->size); + if (code) { + taosMemoryFree(data); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk); + TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data); + } else { + TARRAY2_INIT(reader->brinBlkArray); + } + + reader->ctx->brinBlkLoaded = true; + } + brinBlkArray[0] = reader->brinBlkArray; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) { + int32_t code = 0; + int32_t lino = 0; + + code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size); + TSDB_CHECK_CODE(code, lino, _exit); + + int32_t size = 0; + tBrinBlockClear(brinBlock); + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { + code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[i], TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, + &reader->config->bufArr[1], brinBlk->numRec * sizeof(int64_t), &reader->config->bufArr[2]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr1[i], reader->config->bufArr[1], brinBlk->numRec); + TSDB_CHECK_CODE(code, lino, _exit); + + size += brinBlk->size[i]; + } + + for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) { + code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[j], TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, + &reader->config->bufArr[1], brinBlk->numRec * sizeof(int32_t), &reader->config->bufArr[2]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr2[i], reader->config->bufArr[1], brinBlk->numRec); + TSDB_CHECK_CODE(code, lino, _exit); + + size += brinBlk->size[j]; + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) { + int32_t code = 0; + int32_t lino = 0; + + code = tRealloc(&reader->config->bufArr[0], record->blockSize); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileReadBlockDataByCol(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, + STSchema *pTSchema, int32_t cidArr[], int32_t numCid) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + ASSERT(0); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + ASSERT(0); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray) { int32_t code = 0; int32_t lino = 0; @@ -251,32 +397,32 @@ int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **t int32_t code = 0; int32_t lino = 0; - code = tsdbDataFileReadTombFooter(reader); - TSDB_CHECK_CODE(code, lino, _exit); - - if (reader->fd[TSDB_FTYPE_TOMB] && !reader->ctx->tombBlkLoaded) { + if (!reader->ctx->tombBlkLoaded) { code = tsdbDataFileReadTombFooter(reader); TSDB_CHECK_CODE(code, lino, _exit); - TARRAY2_CLEAR(reader->tombBlkArray, NULL); - if (reader->tombFooter->tombBlkPtr->size) { - code = tRealloc(&reader->config->bufArr[0], reader->tombFooter->tombBlkPtr->size); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, - reader->config->bufArr[0], reader->tombFooter->tombBlkPtr->size); - TSDB_CHECK_CODE(code, lino, _exit); + if (reader->tombFooter->tombBlkPtr->size > 0) { + void *data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size); + if (data == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk); - for (int32_t i = 0; i < size; ++i) { - code = TARRAY2_APPEND_PTR(reader->tombBlkArray, ((STombBlk *)reader->config->bufArr[0]) + i); + code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data, + reader->tombFooter->tombBlkPtr->size); + if (code) { + taosMemoryFree(data); TSDB_CHECK_CODE(code, lino, _exit); } + + int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk); + TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data); + } else { + TARRAY2_INIT(reader->tombBlkArray); } reader->ctx->tombBlkLoaded = true; } - tombBlkArray[0] = reader->tombBlkArray; _exit: @@ -296,19 +442,19 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); - tTombBlockClear(tData); - int32_t size = 0; + tTombBlockClear(tData); for (int32_t i = 0; i < ARRAY_SIZE(tData->dataArr); ++i) { - code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, - &reader->config->bufArr[1], 0, &reader->config->bufArr[2]); + code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, + &reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t j = 0; j < tombBlk->numRec; j++) { - code = TARRAY2_APPEND_PTR(tData->dataArr + i, ((int64_t *)reader->config->bufArr[1]) + j); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = TARRAY2_APPEND_BATCH(&tData->dataArr[i], reader->config->bufArr[1], tombBlk->numRec); + TSDB_CHECK_CODE(code, lino, _exit); + + size += tombBlk->size[i]; } + ASSERT(size == tombBlk->dp->size); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 350fa233d6712306e92276ed95f642e7121345f0..bcc20dcfb69238b481d6cdb3fe018431a5ee5451 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -254,8 +254,6 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, int32_t code = 0; int32_t lino = 0; - tTombBlockClear(dData); - code = tRealloc(&reader->reader->config->bufArr[0], tombBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); @@ -263,6 +261,7 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, if (code) TSDB_CHECK_CODE(code, lino, _exit); int64_t size = 0; + tTombBlockClear(dData); for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) { code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,