diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index bc94e27cc36442b21bae01e5b1632bead1f9821c..37046f311211763531399d2d9eb676b2eccaf7f0 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -30,7 +30,8 @@ typedef TARRAY2(SSttLvl *) TSttLvlArray; typedef TARRAY2(STFileOp) TFileOpArray; typedef enum { - TSDB_FOP_CREATE = 1, + TSDB_FOP_NONE = 0, + TSDB_FOP_CREATE, TSDB_FOP_REMOVE, TSDB_FOP_MODIFY, } tsdb_fop_t; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index 6d71508363d0e238efa54a14735de5a7b606b862..683c9e9e696e5e6b5a9c35091af52ad31bd871d1 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -56,7 +56,8 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig; int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **ppWriter); int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op); -int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); +int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, SRowInfo *pRowInfo); +int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData); int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData); struct SSttFileWriterConfig { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index e9778b6022f24e126e9ef23cf79108a3778733a5..430281dac7b2189f6f73f148566f3460ff964f68 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -133,7 +133,7 @@ static int32_t open_committer_writer(SCommitter *pCommitter) { } } -static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) { +static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, SRowInfo *pRowInfo) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(pCommitter->pTsdb->pVnode); @@ -143,15 +143,12 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow); + code = tsdbSttFWriteTSData(pCommitter->pWriter, pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code)); - } else { - tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, vid, __func__, - pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts, TSDBROW_KEY(pRow).version); } return 0; } @@ -177,6 +174,7 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) { for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { STbDataIter iter; STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); + SRowInfo rowInfo = {.suid = pTbData->suid, .uid = pTbData->uid}; tsdbTbDataIterOpen(pTbData, &from, 0, &iter); @@ -188,7 +186,8 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) { break; } - code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow); + rowInfo.row = *pRow; + code = tsdbCommitWriteTSData(pCommitter, &rowInfo); TSDB_CHECK_CODE(code, lino, _exit); nRow++; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 7efb63f216ffbba3448e22268dba75d30c01e26a..8d8f06e90c59ec59affb14c6ef5f8546d1de9a43 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -20,10 +20,14 @@ typedef struct { bool toData; int32_t level; STFileSet *fset; + SRowInfo *pRowInfo; + SBlockData bData; } SMergeCtx; typedef struct { STsdb *tsdb; + // context + SMergeCtx ctx; // config int32_t maxRow; int32_t szPage; @@ -32,9 +36,6 @@ typedef struct { SSkmInfo skmTb; SSkmInfo skmRow; uint8_t *aBuf[5]; - - // context - SMergeCtx ctx; // reader TARRAY2(SSttFileReader *) sttReaderArr; SDataFileReader *dataReader; @@ -76,11 +77,62 @@ _exit: return 0; } -static int32_t tsdbDoMergeFileSet(SMerger *merger) { +static int32_t tsdbMergeNextRow(SMerger *merger) { // TODO return 0; } +static int32_t tsdbMergeToData(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + for (;;) { + code = tsdbMergeNextRow(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + if (!merger->ctx.pRowInfo) break; + + code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (merger->ctx.bData.nRow >= merger->maxRow) { + // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData); + // TSDB_CHECK_CODE(code, lino, _exit); + + tBlockDataReset(&merger->ctx.bData); + } + } + +_exit: + if (code) { + tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbMergeToUpperLevel(SMerger *merger) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + for (;;) { + code = tsdbMergeNextRow(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + if (!merger->ctx.pRowInfo) break; + + code = tsdbSttFWriteTSData(merger->sttWriter, merger->ctx.pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t tsdbMergeFileSetBegin(SMerger *merger) { int32_t code = 0; int32_t lino = 0; @@ -164,8 +216,29 @@ _exit: return code; } static int32_t tsdbMergeFileSetEnd(SMerger *merger) { - // TODO - return 0; + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + + STFileOp op; + code = tsdbSttFWriterClose(&merger->sttWriter, 0, &op); + TSDB_CHECK_CODE(code, lino, _exit); + + if (op.optype != TSDB_FOP_NONE) { + code = TARRAY2_APPEND(&merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (merger->ctx.toData) { + // code = tsdbDataFWriterClose(); + // TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } + return code; } static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { int32_t code = 0; @@ -181,8 +254,14 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { code = tsdbMergeFileSetBegin(merger); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbDoMergeFileSet(merger); - TSDB_CHECK_CODE(code, lino, _exit); + // do merge + if (merger->ctx.toData) { + code = tsdbMergeToData(merger); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbMergeToUpperLevel(merger); + TSDB_CHECK_CODE(code, lino, _exit); + } code = tsdbMergeFileSetEnd(merger); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index 855f92799d0536ce62df4cee9d094d159b7cb64a..d793a257447bedb359a5e4dc28f37e9c97cad2f4 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -601,25 +601,23 @@ _exit: return code; } -int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { +int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, SRowInfo *pRowInfo) { int32_t code = 0; int32_t lino; - TSDBKEY key = TSDBROW_KEY(pRow); + TABLEID *tbid = (TABLEID *)pRowInfo; + TSDBROW *pRow = &pRowInfo->row; + TSDBKEY key = TSDBROW_KEY(pRow); if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { if (pWriter->bData.nRow > 0) { - TSDB_CHECK_CODE( // - code = write_timeseries_block(pWriter), // - lino, // - _exit); + code = write_timeseries_block(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } if (pWriter->sData.nRow >= pWriter->config.maxRow) { - TSDB_CHECK_CODE( // - code = write_statistics_block(pWriter), // - lino, // - _exit); + code = write_statistics_block(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid @@ -631,54 +629,28 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRo pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count pWriter->sData.nRow++; - TSDB_CHECK_CODE( // - code = tsdbUpdateSkmTb( // - pWriter->config.pTsdb, // - tbid, // - pWriter->config.pSkmTb), // - lino, // - _exit); + code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb); + TSDB_CHECK_CODE(code, lino, _exit); - TABLEID id = {.suid = tbid->suid, // - .uid = tbid->suid // - ? 0 - : tbid->uid}; - TSDB_CHECK_CODE( // - code = tBlockDataInit( // - &pWriter->bData, // - &id, // - pWriter->config.pSkmTb->pTSchema, // - NULL, // - 0), // - lino, // - _exit); + TABLEID id = { + .suid = tbid->suid, + .uid = tbid->uid ? 0 : tbid->uid, + }; + code = tBlockDataInit(&pWriter->bData, &id, pWriter->config.pSkmTb->pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); } - if (pRow->type == TSDBROW_ROW_FMT) { - TSDB_CHECK_CODE( // - code = tsdbUpdateSkmRow( // - pWriter->config.pTsdb, // - tbid, // - TSDBROW_SVERSION(pRow), // - pWriter->config.pSkmRow), // - lino, // - _exit); + if (pRowInfo->row.type == TSDBROW_ROW_FMT) { + code = tsdbUpdateSkmRow(pWriter->config.pTsdb, tbid, TSDBROW_SVERSION(pRow), pWriter->config.pSkmRow); + TSDB_CHECK_CODE(code, lino, _exit); } - TSDB_CHECK_CODE( // - code = tBlockDataAppendRow( // - &pWriter->bData, // - pRow, // - pWriter->config.pSkmRow->pTSchema, // - tbid->uid), // - lino, // - _exit); + code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->config.pSkmRow->pTSchema, tbid->uid); + TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->bData.nRow >= pWriter->config.maxRow) { - TSDB_CHECK_CODE( // - code = write_timeseries_block(pWriter), // - lino, // - _exit); + code = write_timeseries_block(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) { @@ -694,16 +666,17 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRo _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } +int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData) { + // TODO + return 0; +} + int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData) { ASSERTS(0, "TODO: Not implemented yet");