From 41459a92c0df720452819c510fe4550c6ec58871 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 29 May 2023 18:42:31 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 7 +- .../vnode/src/tsdb/dev/inc/tsdbDataFileRW.h | 7 +- .../dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c | 116 ++++++++++++++++-- 3 files changed, 110 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b1052c7abb..4f9648ef6d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -76,9 +76,8 @@ typedef struct STsdbFilterInfo STsdbFilterInfo; #define TSDBROW_ROW_FMT ((int8_t)0x0) #define TSDBROW_COL_FMT ((int8_t)0x1) -#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) -#define TSDB_MAX_SUBBLOCKS 8 -#define TSDB_FHDR_SIZE 512 +#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) +#define TSDB_FHDR_SIZE 512 #define VERSION_MIN 0 #define VERSION_MAX INT64_MAX @@ -498,7 +497,7 @@ struct SDataBlk { int32_t nRow; int8_t hasDup; int8_t nSubBlock; - SBlockInfo aSubBlock[TSDB_MAX_SUBBLOCKS]; + SBlockInfo aSubBlock[1]; SSmaInfo smaInfo; }; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 3973eaa3c0..4dc5ce67a0 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -47,13 +47,14 @@ int32_t tsdbDataFileReadDelData(SDataFileReader *reader, const SDelBlk *delBlk, // SDataFileWriter ============================================= typedef struct SDataFileWriter SDataFileWriter; typedef struct SDataFileWriterConfig { - STsdb *tsdb; - STFile f[TSDB_FTYPE_MAX]; + STsdb *tsdb; + STFile f[TSDB_FTYPE_MAX]; + int32_t maxRow; } SDataFileWriterConfig; int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]); -int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData); +int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index 3231e78331..8cd191927d 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -76,16 +76,21 @@ int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockI return 0; } +int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData) { + // TODO + return 0; +} + // SDataFileWriter ============================================= struct SDataFileWriter { SDataFileWriterConfig config[1]; struct { bool opened; - bool tbHasOldData; SDataFileReader *reader; const TBlockIdxArray *blockIdxArray; int32_t blockIdxArrayIdx; + bool tbHasOldData; TABLEID tbid[1]; const TDataBlkArray *dataBlkArray; int32_t dataBlkArrayIdx; @@ -244,6 +249,29 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx); TSDB_CHECK_CODE(code, lino, _exit); +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} +static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(writer->config->tsdb->pVnode); + + if (row->type == TSDBROW_ROW_FMT) { + // TODO: udpate row schema + } + + code = tBlockDataAppendRow(writer->ctx->bData, row, NULL /* TODO */, writer->ctx->tbid->uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (writer->ctx->bData->nRow >= writer->config->maxRow) { + code = tsdbDataFileWriteBlockData(writer, writer->bData); + TSDB_CHECK_CODE(code, lino, _exit); + } + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code); @@ -261,7 +289,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { TSDBROW row[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; - code = tBlockDataAppendRow(writer->bData, row, NULL, writer->ctx->tbid->uid); + code = tsdbDataFileDoWriteTSRow(writer, row); TSDB_CHECK_CODE(code, lino, _exit); } @@ -292,6 +320,8 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA int32_t lino = 0; int32_t vid = TD_VID(writer->config->tsdb->pVnode); + writer->ctx->tbHasOldData = false; + for (; writer->ctx->blockIdxArrayIdx < TARRAY2_SIZE(writer->ctx->blockIdxArray); writer->ctx->blockIdxArrayIdx++) { const SBlockIdx *blockIdx = TARRAY2_GET_PTR(writer->ctx->blockIdxArray, writer->ctx->blockIdxArrayIdx); @@ -314,12 +344,14 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); TSDB_CHECK_CODE(code, lino, _exit); writer->ctx->dataBlkArrayIdx = 0; - } else { - writer->ctx->tbHasOldData = false; + tBlockDataReset(writer->ctx->bData); + writer->ctx->iRow = 0; + writer->ctx->blockIdxArrayIdx++; } break; } } + writer->ctx->tbid[0] = tbid[0]; _exit: @@ -328,19 +360,77 @@ _exit: } return code; } -static int32_t tsdbDataFileDoWriteTableData(SDataFileWriter *writer, SBlockData *bData) { +static int32_t tsdbDataFileDoWriteTableDataRow(SDataFileWriter *writer, TSDBROW *row) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(writer->config->tsdb->pVnode); - if (writer->ctx->tbHasOldData) { - if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) { - // TODO + while (writer->ctx->tbHasOldData) { + for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { + TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; + + int32_t c = tsdbRowCmprFn(row, row1); + ASSERT(c); + if (row > 0) { + code = tsdbDataFileDoWriteTSRow(writer, row1); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + goto _write_row; + } } - // TODO + + for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) { + const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx); + SDataBlk dataBlk1[1] = {{.minKey = {}, .maxKey = {}}}; // TODO + + int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1); + if (c < 0) { + code = tsdbDataFileWriteBlockData(writer, writer->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + } else if (c > 0) { + goto _write_row; + } else { + code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->bData); + TSDB_CHECK_CODE(code, lino, _exit); + writer->ctx->iRow = 0; + writer->ctx->dataBlkArrayIdx++; + break; + } + } + + if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray) // + && writer->ctx->iRow >= writer->ctx->bData->nRow) { + writer->ctx->tbHasOldData = false; + } + } + +_write_row: + code = tsdbDataFileDoWriteTSRow(writer, row); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; +} +static int32_t tsdbDataFileDoWriteTableDataBlock(SDataFileWriter *writer, SBlockData *bData) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(writer->config->tsdb->pVnode); + + if (!writer->ctx->tbHasOldData && writer->bData->nRow == 0) { + code = tsdbDataFileWriteBlockData(writer, bData); + TSDB_CHECK_CODE(code, lino, _exit); } else { - // code = tsdbDataFileWriteBlockData(writer, bData); - // TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0; i < bData->nRow; i++) { + TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)}; + code = tsdbDataFileDoWriteTableDataRow(writer, row); + TSDB_CHECK_CODE(code, lino, _exit); + } } _exit: @@ -349,7 +439,7 @@ _exit: } return code; } -int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData) { +int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(writer->config->tsdb->pVnode); @@ -369,7 +459,7 @@ int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbDataFileDoWriteTableData(writer, bData); + code = tsdbDataFileDoWriteTableDataBlock(writer, bData); TSDB_CHECK_CODE(code, lino, _exit); _exit: -- GitLab